Rust__使用tonic实现一元及流式(Unary and Streaming)RPC的实例
本文展示了如何通过tonic(gRPC的rust实现)实现一元RPC和流式RPC。实例为通过Unary RPC实现对服务器中哈希表的单个读写,以及通过Streaming RPC进行批量读写。流式RPC可以使得客户端一边发送,服务器一边处理,不需要等到客户端全部发送完再处理,适用于数据量较大的批量处理情况。
完整代码Github
//myproto.proto
syntax = "proto3";
package myproto;
message KvPair {
Key key = 1;
Value val = 2;
}
message Key {
string key = 1;
}
message Value {
string val = 1;
}
message ReplyState {
string reply_info = 1;
KvPair kvpair = 2;
}
message RequestState {
string request_info = 1;
}
service MyRpc {
//simple rpc
rpc SetKv(KvPair) returns (ReplyState) {
}
rpc GetKv(Key) returns (ReplyState) {
}
// A server-to-client streaming RPC.
rpc GetKvList(RequestState) returns (stream KvPair) {
}
// A client-to-server streaming RPC.
rpc SetKvList(stream KvPair) returns (ReplyState) {
}
}
通过tonic自带的tonic_build工具(基于prost)将proto数据及方法转换为Rust数据结构及方法,build.rs在构建源文件前进行预构建,生成Rust代码。
fn main() {
tonic_build::configure()
.out_dir("src/pb")
.compile(&["proto/my_proto.proto"], &["."])
.expect("failed to compile protos");
}
构建后会在src/pb
目录下生成myproto.rs
文件,文件部分内容如下所示:
(1)该文件中包含在rust下实现的proto中的数据结构:
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct KvPair {
#[prost(message, optional, tag="1")]
pub key: ::core::option::Option<Key>,
#[prost(message, optional, tag="2")]
pub val: ::core::option::Option<Value>,
}
并创建了2个mod
:
pub mod my_rpc_client
pub mod my_rpc_server
(2)在 mod my_rpc_client
下创建了结构体:
pub struct MyRpcClient<T> {
inner: tonic::client::Grpc<T>,
}
并为该结构体泛型为tonic::transport::Channel
时实现了特化方法:
impl MyRpcClient<tonic::transport::Channel> {
/// Attempt to create a new client by connecting to a given endpoint.
pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
where
D: std::convert::TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
Ok(Self::new(conn))
}
}
以及一些常规方法如:
pub fn new(inner: T) -> Self
,
pub fn send_gzip(mut self) -> Self
,
pub fn accept_gzip(mut self) -> Self
,
......
还有我们自定义的方法:
pub async fn set_kv(
&mut self,
request: impl tonic::IntoRequest<super::KvPair>,
) -> Result<tonic::Response<super::ReplyState>, tonic::Status>{
...}
pub async fn get_kv(
&mut self,
request: impl tonic::IntoRequest<super::Key>,
) -> Result<tonic::Response<super::ReplyState>, tonic::Status>{
...}
pub async fn get_kv_list(
&mut self,
request: impl tonic::IntoRequest<super::RequestState>,
) -> Result<
tonic::Response<tonic::codec::Streaming<super::KvPair>>,
tonic::Status,
> {
...}
pub async fn set_kv_list(
&mut self,
request: impl tonic::IntoStreamingRequest<Message = super::KvPair>,
) -> Result<tonic::Response<super::ReplyState>, tonic::Status> {
...}
(3)在 mod my_rpc_server
下创建了trait
及结构体:
#[async_trait]
pub trait MyRpc: Send + Sync + 'static {
///simple rpc
async fn set_kv(
&self,
request: tonic::Request<super::KvPair>,
) -> Result<tonic::Response<super::ReplyState>, tonic::Status>;
async fn get_kv(
&self,
request: tonic::Request<super::Key>,
) -> Result<tonic::Response<super::ReplyState>, tonic::Status>;
///Server streaming response type for the GetKvList method.
type GetKvListStream: futures_core::Stream<
Item = Result<super::KvPair, tonic::Status>,
>
+ Send
+ 'static;
/// A server-to-client streaming RPC.
async fn get_kv_list(
&self,
request: tonic::Request<super::RequestState>,
) -> Result<tonic::Response<Self::GetKvListStream>, tonic::Status>;
/// A client-to-server streaming RPC.
async fn set_kv_list(
&self,
request: tonic::Request<tonic::Streaming<super::KvPair>>,
) -> Result<tonic::Response<super::ReplyState>, tonic::Status>;
}
#[derive(Debug)]
pub struct MyRpcServer<T: MyRpc> {
inner: _Inner<T>,
accept_compression_encodings: (),
send_compression_encodings: (),
}
struct _Inner<T>(Arc<T>);
为该结构实现:
impl<T, B> tonic::codegen::Service<http::Request<B>> for MyRpcServer<T>
where
T: MyRpc,
B: Body + Send + 'static,
B::Error: Into<StdError> + Send + 'static,
impl<T: MyRpc> Clone for MyRpcServer<T>
impl<T: MyRpc> Clone for _Inner<T>
impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T>
impl<T: MyRpc> tonic::transport::NamedService for MyRpcServer<T>
(1)首先引入myproto.rs
文件
pub mod myproto {
include!("pb/myproto.rs");
}
(2)然后建立RPC服务结构体并实现MyRpc trait
注意:在异步协程环境下要使用futures_util::lock::Mutex;
而不是普通的Mutex,防止死锁。
#[derive(Debug)]
pub struct MyRpcService {
table: Arc<Mutex<HashMap<Key, Value>>>,
}
#[tonic::async_trait]
impl MyRpc for MyRpcService {
async fn set_kv(
&self,
_request: Request<KvPair>,
) -> Result<Response<ReplyState>, Status> {
println!("set_kv = {:?}", _request);
let kvpair = _request.into_inner();
let k = kvpair.key.expect("key should not none.");
let v = kvpair.val.expect("value should not none.");
let tb = self.table.clone();
if let Some(val) =
tb.lock().await.insert(k.clone(), v) {
return Ok(Response::new(ReplyState {
reply_info: "update value, return old.".into(),
kvpair: Some(KvPair {
key: Some(k),
val: Some(val.clone()),
}),
}));
}
Ok(Response::new(
ReplyState{
reply_info: "set new kvpair.".into(),
kvpair: Some(KvPair {
key: Some(k),
val: None,
}),
}))
}
async fn get_kv(
&self,
_request: Request<Key>,
) -> Result<Response<ReplyState>, Status> {
println!("get_kv = {:?}", _request);
let k = _request.into_inner();
let tb = self.table.clone();
if let Some(val) =
tb.lock().await.get(&k) {
return Ok(Response::new(ReplyState {
reply_info: "get success.".into(),
kvpair: Some(KvPair {
key: Some(k),
val: Some(val.clone()),
}),
}))
}
Ok(Response::new(ReplyState {
reply_info: "get failed.".into(),
kvpair: Some(KvPair {
key: Some(k),
val: None,
}),
}))
}
///Server streaming response type for the GetKvList method.
type GetKvListStream = ReceiverStream<Result<KvPair, Status>>;
/// A server-to-client streaming RPC.
async fn get_kv_list(
&self,
_request: Request<RequestState>,
) -> Result<Response<Self::GetKvListStream>, Status> {
println!("get_kv_list = {:?}", _request);
let (tx, rx) = mpsc::channel(10);
let tb = self.table.clone();
tokio::spawn(async move {
for (k, v) in tb.lock().await.iter() {
println!(" => send k = {:?}, v = {:?}", k, v);
tx.send(Ok(KvPair {
key: Some(k.clone()),
val: Some(v.clone()),
})).await.unwrap();
}
println!(" /// done sending");
});
Ok(Response::new(ReceiverStream::new(rx)))
}
/// A client-to-server streaming RPC.
async fn set_kv_list(
&self,
_request: Request<tonic::Streaming<KvPair>>,
) -> Result<Response<ReplyState>, Status> {
println!("set_kv_list = {:?}", _request);
let tb = self.table.clone();
let mut stream = _request.into_inner();
while let Some(kvpair) = stream.next().await {
let kvpair = kvpair?;//stream.next().await -> Option<Result<...>>
let k = kvpair.key;
let v = kvpair.val;
tb.lock().await.insert(k.unwrap(), v.unwrap());
}
Ok(Response::new(ReplyState {
reply_info: "set all kvpair done.".into(),
kvpair: None,
}))
}
}
(3)编写主函数
在主函数中启动RPC服务
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>>{
let addr = "[::1]:10000".parse().unwrap();
let my_rpc = MyRpcService {
table: Arc::new(Mutex::new(HashMap::new())),
};
let svc = MyRpcServer::new(my_rpc);
Server::builder().add_service(svc).serve(addr).await?;
Ok(())
}
(1)客户端代码引入myproto.rs
文件
pub mod myproto {
include!("pb/myproto.rs");
}
(2)实现批量写入和读取函数
async fn print_kv_list(client: &mut MyRpcClient<Channel>) -> Result<(), Box<dyn Error>> {
let rqs = RequestState {
request_info: "get all.".into(),
};
let mut stream = client
.get_kv_list(Request::new(rqs))
.await?
.into_inner();
while let Some(kvpair) = stream.message().await? {
println!("KvPair = {:?}", kvpair);
}
Ok(())
}
async fn run_set_kv_list(client: &mut MyRpcClient<Channel>) -> Result<(), Box<dyn Error>> {
let mut pairs = vec![];
for i in 0..3 {
pairs.push(KvPair {
key: Some(Key{
key: i.to_string()}),
val: Some(Value{
val: i.to_string()})
})
}
println!("pairs num = {:?}", pairs.len());
let request = Request::new(stream::iter(pairs));
match client.set_kv_list(request).await {
Ok(response) => println!("ReplyState = {:?}", response.into_inner()),
Err(e) => println!("something went wrong: {:?}", e),
}
Ok(())
}
(3)客户端主函数
在主函数中调用RPC服务
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = MyRpcClient::connect("http://[::1]:10000").await?;
println!("*** SIMPLE RPC ***");
let response0 = client
.set_kv(Request::new(KvPair {
key: Some(Key{
key: "a".into()}),
val: Some(Value{
val: "1". into()}),
}))
.await?;
println!("RESPONSE0 = {:?}", response0);
let response1 = client
.get_kv(Request::new(Key{
key: "a".into()}))
.await?;
println!("RESPONSE1 = {:?}", response1);
println!("\n*** CLIENT STREAMING ***");
run_set_kv_list(&mut client).await?;
println!("\n*** SERVER STREAMING ***");
print_kv_list(&mut client).await?;
Ok(())
}
(1)服务器打印情况
(2)客户端打印情况
通过标准输出的内容可以看到:
(1)客户端通过set_kv("a", 1)
发送请求到服务器,服务器向客户端返回了响应RESPONSE0
,表示已经成功写入。
(2)客户端调用get_kv("a")
向服务器发送请求,此时客户端收到了RESPONSE1
,得到了结果("a", 1)
。
(3)客户端调用set_kv_list()
,收到了批量写入成功的响应。
(4)客户端调用get_kv_list()
,收到了服务器存储的所有KvPair
。