使用HTTP
首先需要确定如何在Rust 的web服务中使用HTTP。意思是我们的应用服务器必须能够解析HTTP请求,返回响应。其他语言例如python,又Flash,Django这样的框架可以直接使用,对于Rust来说可以使用一个相对底层的框架hyper,来处理HTTP请求,hyper是基于tokio和future的,可以方便的创建一个异步的服务器,同时对于日志的支持使用log和env_log crate进行处理。
首先创建项目,并添加依赖。
[package]
name = "microservice_rs"
version = "0.1.0"
authors = ["you <your@email>"]
[dependencies]
env_logger = "0.5.3"
futures = "0.1.17"
hyper = "0.11.13"
log = "0.4.1"复制代码
现在编写处理http请求的代码。hyper有一个Service
的概念,实现Service
trait 有一个call
的方法,接受Request
对象,处理HTTP请求。由于Hyper是异步的,所以必须返回一个Future
,代码如下:
extern crate hyper;
extern crate futures;
#[macro_use]
extern crate log;
extern crate env_logger;
use hyper::server::{Request, Response, Service}
use futures::future::Future;
struct Microservice;
impl Service for Microservice {
type Request = Request;
type Response = Response;
type Error = hyper::Error;
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
fn call(&self, request: Request) -> Self::Future {
info!("Microserivce received a request: {:?}", request);
Box::new(futures::future::ok(Response::new()))
}
}复制代码
注意,在Microservice中同时定义了一些类型,future返回的是Box类型,因为futures::fugure::Future
是一个trait,我们无法知道其大小。
下面编写启动server的代码。
fn main() {
env_logger::init();
let address = "127.0.0.1:8080".parse().unwrap();
let server = hyper::server::Http::new()
.bind(&address, ||Ok(Microservice{}))
.unwrap();
info!("Running microservice at {}", address);
server.run().unwrap();
}复制代码
运行:
RUST_LOG="microservice=debug" cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
Running `target/debug/microservice`
INFO 2018-01-21T23:35:05Z: microservice: Running microservice at 127.0.0.1:8080复制代码
访问服务:
curl 'localhost:8080'
INFO 2018-01-21T23:35:05Z: microservice: Running microservice at 127.0.0.1:8080
INFO 2018-01-21T23:35:06Z: microservice: Microservice received a request: Request { method: Get, uri: "/", version: Http11, remote_addr: Some(V4(127.0.0.1:61667)), headers: {"Host": "localhost:8080", "User-Agent": "curl/7.54.0", "Accept": "*/*"} }复制代码
RUST_LOG="microservice=debug"
是env_log的参数,可以控制日志的级别。
处理POST请求
下面来编写处理POST请求的逻辑,路径/
接受一个POST请求,请求体包含了username
和message
两个内容。现在重新编写上面的call
方法。
fn call(&self, request: Request) -> Self::Future {
match (request.method(), request.path()) {
(&Post, "/") => {
let future = request
.body()
.concat2()
.and_then(parse_form)
.and_then(write_to_db)
.then(make_post_response);
Box::new(future)
}
_ => Box::new(futures::future::ok(Response::new().with_status(StatusCode:::NotFound))),
}
}复制代码
我们通过match
来处理不同的请求。在这个例子中,请求方法要么是Get,要么是Post。目前唯一有效的路径是/
。如果请求方法是&Post
并且路径是/
,那么就会调用一些函数,parse_form
等进行处理。and_then
组合器会将各个方法处理完毕,返回的结果传递给下一个函数继续处理,最终到then
返回结果。
下面来简单编写上面的例子用到的函数
struct NewMessage {
username: String,
message: String,
}
fn parse_form(form_chunk: Chunk) -> FutureResult<NewMessage, hyper::Error> {
futures::future::ok(NewMessage {
username: String::new(),
message: String::new(),
})
}
fn write_to_db(entry: NewMessage) -> FutureResult<i64, hyper::Error> {
futures::future::ok(0)
}
fn make_post_response (result: Result<i64, hyper::Error>) -> FutureResult<hyper::Response, hyper::Error> {
futures::future::ok(Response::new().with_status(StatusCode::NotFound))
}复制代码
上面代码运行需要导入:
use hyper::{Chunk, StatusCode};
use hyper::Method::{Get, Post};
use hyper::server::{Request, Response, Service};
use futures::Stream;
use futures::future::{Future, FutureResult};复制代码
下面让我们来完善parse_form
函数,这个函数接受一个Chunk
的类型(即消息体) ,同时解析消息体获取username和message。
use std::collections::HashMap;
use std::io;
fn parse_form(form_chunk: Chunk) -> FutureResult<NewMessage, hyper::Error> {
let mut form = url::form_urlencoded::parse(form_chunk.as_ref())
.into_owned()
.collect::<HashMap<String, String>>();
if let Some(message) = form.remove("message") {
let username = form.remove("username").unwrap_or(String::from("anonymous"));
futures::future::ok(NewMessage {
username: username,
message: message,
})
}else {
futures::future::err(hyper::Error::from(io::Error::new(
io::ErrorKind::InvalidInput,
"Missing field message",
)))
}
}复制代码
首先,解析消息体,将其解析道hashmap中,然后通过hashmap获取消息体中的信息。
暂时先不介绍write_to_db 函数,这个函数的作用就是将信息写入到数据库,将会在下一章节进行介绍。
现在来编写 make_post_response
函数。
#[macro_use]
extern crate serde_json;
fn make_post_response(result: Result<i64, hyper::Error>) -> FutureResult<hyper::Response, hyper::Error> {
match result {
Ok(timestamp) => {
let payload = json!({"timestamp": timestamp}).to_string();
let response = Response::new()
.with_header(ContentLength(payload.len() as u64))
.with_header(ContentType::json())
.with_body(payload);
debug!("{:?}", response);
futures::future::ok(response)
}
Err(error) => make_error_response(error.description()),
}
}复制代码
通过match来检查处理是否成功,如果成功了,就返回时间信息,如果不成功返回错误信息,这里用到了serde_json, 记得在Cargo.toml 中添加。
下面编写make_error_response
方法
fn make_error_response(error_message: &str) -> FutureResult<hyper::Response, hyper::Error>{
let payload = json!({"error": error_message}).to_string();
let response = Response::new()
.with_status(StatusCode::InternalServerError)
.with_header(ContentLength(payload.len() as u64))
.with_header(ContentType::json())
.with_body(payload);
debug!("{:?}", response);
futures::future::ok(response)
}复制代码
处理GET请求
接下来我们处理GET请求,通过发送GET请求到server获取消息。GET请求接受两个参数,before和after,server返回这个两个时间戳之间的消息。
(&Get, "/") => {
let time_range = match request.query() {
Some(query) => parse_query(query),
None => Ok(TimeRange{
before: None,
after: None,
}),
};
let response = match time_range {
Ok(time_range) => make_get_response(query_db(time_range)),
Err(error) => make_error_response(&error),
};
Box::new(response)
}复制代码
通过调用request.query()
获取Option<&str>
,通过调用parse_query
处理url参数,TimeRange定义如下:
struct TimeRange {
before: Option<i64>,
after: Option<i64>,
}复制代码
query_db
的作用是从数据库中获取消息信息。下面来实现parse_query 函数
fn parse_query(query: &str) -> Result<TimeRange, String> {
let args = url::form_urlencoded::parse(&query.as_bytes())
.into_owned()
.collect::<HashMap<String,String>>();
let before = args.get("before").map(|value| value.parse::<i64>());
if let Some(ref result) = before {
if let Err(ref error) = *result {
return Err(format!("Error parsing 'before: {}", error));
}
}
let after = args.get("after").map(|value| value.parse::<i64>());
if let Some(ref result) = after {
if let Err(error) = *result {
return Err(format!("Error parsing 'after': {}", error));
}
}
Ok(TimeRange{
before: before.map(|b| b.unwrap()),
after: after.map(|b| b.unwrap()),
})
}复制代码
下面来编写make_get_response
方法
fn make_get_response(messages: Option<Vec<Message>>) -> FutureResult<hyper::Response, hyper::Error> {
let response = match messages {
Some(messages) => {
let body = render_page(messages);
Response::new()
.with_header(ContentLength(body.len() as u64))
.with_body(body)
}
None => Response::new().with_status(StatusCode::InternalServerError),
};
debug!("{:?}", response);
futures::future::ok(response)
}复制代码
添加数据库支持
Rust的diesel
ORM 是目前Rust最好ORM,所以我们将会直接使用diesel
来做数据库操作,我们的数据库选择postgresql
。将下面的代码添加到Cargo.toml中
diesel = {version = "1.0.0", features = ["postgres"]}复制代码
同时安装diesel_cli
cargo install diesel_cli复制代码
首先为我们需要创建数据库表的语句,将其放在schemas/message.sql中
CREATE TABLE messages (
id SERIAL PRIMARY KEY,
username VARCHAR(128) NOT NULL,
message TEXT NOT NULL,
timestamp BIGINT NOT NULL DEFAULT EXTRACT('epoch' FROM CURRENT_TIMESTAMP)
)复制代码
下面我们使用diesel创建schema
export DATABASE_URL=postgres://<user>:<password>@localhost
diesel print-schema | tee src/schema.rs
table! {
messages (id) {
id -> Int4,
username -> Varchar,
message -> Text,
timestamp -> Int8,
}
}复制代码
table! 是diesel定义的宏,用来表示数据的字段对应,其保存在schema.rs文件中,同时需要创建src/models.rs 文件
#[derive(Queryable, Serialize, Debug)]
pub struct Message {
pub id: i32,
pub username: String,
pub message: String,
pub timestamp: i64,
}复制代码
model 的struct就是上面代码使用的Message。现在向main中添加一些需要使用的模块
#[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate diesel;
mod schema;
mod models;复制代码
实现write_to_db 方法
use diesel::prelude::*;
use diesel::pg::PgConnection;
fn write_to_db(new_message: NewMessage, db_connection: &PgConnection) -> FutureResult<i64, hyper::Error> {
use schema::messages;
let timestamp = diesel::insert_into(messages::table)
.values(&new_message)
.returning(messages::timestamp)
.get_result(db_connection);
match timestamp {
Ok(timestamp) => futures::future::ok(timestamp),
Err(error) => {
error!("Error writing to database: {}", error.description());
futures::future::err(hyper::Error::from(io::Error::new(io::ErrorKind::Other, "service error"),))
}
}
}复制代码
Diesel 暴露了非常直观并且类型安全的API。
- 指定需要插入的表
- 指定需要插入的数据
- 指定想要返回的数据
- 调用get_result 执行sql并获取结果
同样的,对于NewMessage,也需要在src/models.rs中进行定义
use schema::messages;
#[derive(Queryable, Serialize, Debug)]
pub struct Message {
pub id: i32,
pub username: String,
pub message: String,
pub timestamp: i64,
}
#[derive(Insertable, Debug)]
#[table_name = "messages"]
pub struct NewMessage {
pub username: String,
pub message: String,
}复制代码
现在我们需要修改一下call方法,其内部需要获取db的链接
use std::env;
const DEFAULT_DATABASE_URL: &'static str = "postgresql://postgres@localhost:5432";
fn connect_to_db() -> Option<PgConnection> {
let database_url = env::var("DATABASE_URL").unwrap_or(String::from(DEFAULT_DATABASE_URL));
match PgConnection::establish(&database_url) {
Ok(connection) => Some(connection),
Err(error) => {
error!("Error connection to database {}", error.description());
None
}
}
}
fn call(&self, request: Request) -> Self::Response {
let db_connection = match connect_to_db() {
Some(connection) => connection,
None => {
return Box::new(futures::future::ok(
Response::new().with_status(StatusCode::IntervalServerError),
));
}
};
}复制代码
下面需要修改一下处理Post和Get请求的match
(&Post, "/") => {
let future = request
.body()
.concat2()
.and_then(parse_form)
.and_then(move |new_message| => write_to_db(new_message, &db_connection))
.then(make_post_response);
Box::new(future)
}
(&Get, "/") => {
(&Post, "/") => {
let future = request
.body()
.concat2()
.and_then(parse_form)
.and_then(move |new_message| write_to_db(new_message, &db_connection))
.then(make_post_response);
Box::new(future)
}
(&Get, "/") => {
let time_range = match request.query() {
Some(query) => parse_query(query),
None => Ok(TimeRange {
before: None,
after: None,
}),
};
let response = match time_range {
Ok(time_range) => make_get_response(query_db(time_range, &db_connection)),
Err(error) => make_error_response(&error),
};
Box::new(response)
}复制代码
实现query_db 方法
fn query_db(time_range: TimeRange, db_connection: &PgConnection) -> Option<Vec<Message>> {
use schema::messages;
let TimeRange {before, after} = time_range;
let query_result = match (before, after) {
(Some(before), Some(after)) => {
messages::table
.filter(messages::timestamp.lt(before as u64))
.filter(messages::timestamp.gt(after as u64))
.load::<Message>(db_connection)
}
(Some(before), _) => {
message::table
.filter(messages::timestamp.lt(before as u64))
.load::<Message>(db_connection)
}
(_, Some(after)) => {
messages::table
.filter(messages::timestamp.gt(after as i64))
.load::<Message>(db_connection)
}
_ => {
messages::table.load::<Message>(db_connection)
};
match query_result {
Ok(result) => Some(result),
Err(error) => {
error!("Error query Db: {}", error);
None
}
}
}复制代码
渲染html
我们使用maud作为html的渲染引擎。在Cargo.toml 中添加
maud = "0.17.2"复制代码
然后在main.rs 中声明
#![feature(proc_macro)]
extern crate maud;复制代码
下面来实现render_html方法
fn render_html(messages: Vec<Message>) -> String {
(html!{
head {
title "microservice"
style "body { font-family: monospace }"
}
body {
ul {
@for message in &messages {
li {
(message.username) " (" (message.timestamp) "): " (message.message)
}
}
}
}
}).into_string()
}复制代码
运行整个工程
DATABASE_URL="postgresql://goldsborough@localhost" RUST_LOG="microservice=debug" cargo run
Compiling microservice v0.1.0 (file:///Users/goldsborough/Documents/Rust/microservice)
Finished dev [unoptimized + debuginfo] target(s) in 12.30 secs
Running `target/debug/microservice`
INFO 2018-01-22T01:22:16Z: microservice: Running microservice at 127.0.0.1:8080复制代码
访问接口
curl 'localhost:8080'
<head><title>microservice</title><style>body { font-family: monospace }</style></head><body><ul><li>peter (1516584255): hi</li><li>mike (1516584282): hi2</li></ul></body>复制代码
使用docker进行打包
docker-compose.yaml
文件如下:
version: '2'
services:
server:
build:
context: .
dockerfile: docker/Dockerfile
networks:
- network
ports:
- "8080:80"
environment:
DATABASE_URL: postgresql://postgres:secret@db:5432
RUST_BACKTRACE: 1
RUST_LOG: microservice=debug
db:
build:
context: .
dockerfile: docker/Dockerfile-db
restart: always
networks:
- network
environment:
POSTGRES_PASSWORD: secret
networks:
network:复制代码
近期评论