rust torch_Rust菜鸟教程「建议收藏」

(142) 2024-05-23 08:01:01

Rust__使用tonic实现一元及流式(Unary and Streaming)RPC的实例

  本文展示了如何通过tonic(gRPC的rust实现)实现一元RPC和流式RPC。实例为通过Unary RPC实现对服务器中哈希表的单个读写,以及通过Streaming RPC进行批量读写。流式RPC可以使得客户端一边发送,服务器一边处理,不需要等到客户端全部发送完再处理,适用于数据量较大的批量处理情况。
完整代码Github

1.定义协议

//myproto.proto
syntax = "proto3";
package myproto;

message KvPair { 
   
    Key key = 1;
    Value val = 2;
}

message Key { 
   
    string key = 1;
}

message Value { 
   
    string val = 1;
}

message ReplyState { 
   
    string reply_info = 1;
    KvPair kvpair = 2;
}

message RequestState { 
   
    string request_info = 1;
}

service MyRpc { 
   
    //simple rpc
    rpc SetKv(KvPair) returns (ReplyState) { 
   }
    rpc GetKv(Key) returns (ReplyState) { 
   }

    // A server-to-client streaming RPC.
    rpc GetKvList(RequestState) returns (stream KvPair) { 
   }

    // A client-to-server streaming RPC.
    rpc SetKvList(stream KvPair) returns (ReplyState) { 
   }
}

2.将Proto转换为Rust数据结构

通过tonic自带的tonic_build工具(基于prost)将proto数据及方法转换为Rust数据结构及方法,build.rs在构建源文件前进行预构建,生成Rust代码。

fn main() { 
   
    tonic_build::configure()
        .out_dir("src/pb")
        .compile(&["proto/my_proto.proto"], &["."])
        .expect("failed to compile protos");
}

构建后会在src/pb目录下生成myproto.rs文件,文件部分内容如下所示:

(1)该文件中包含在rust下实现的proto中的数据结构:

#[derive(Clone, PartialEq, ::prost::Message)]
pub struct KvPair { 
   
    #[prost(message, optional, tag="1")]
    pub key: ::core::option::Option<Key>,
    #[prost(message, optional, tag="2")]
    pub val: ::core::option::Option<Value>,
}

并创建了2个mod

pub mod my_rpc_client
pub mod my_rpc_server

(2)在 mod my_rpc_client下创建了结构体:

pub struct MyRpcClient<T> { 
   
    inner: tonic::client::Grpc<T>,
}

并为该结构体泛型为tonic::transport::Channel时实现了特化方法:

impl MyRpcClient<tonic::transport::Channel> { 
   
        /// Attempt to create a new client by connecting to a given endpoint.
        pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
        where
            D: std::convert::TryInto<tonic::transport::Endpoint>,
            D::Error: Into<StdError>,
        { 
   
            let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
            Ok(Self::new(conn))
        }
    }

以及一些常规方法如:
pub fn new(inner: T) -> Self
pub fn send_gzip(mut self) -> Self
pub fn accept_gzip(mut self) -> Self
......

还有我们自定义的方法:

pub async fn set_kv(
            &mut self,
            request: impl tonic::IntoRequest<super::KvPair>,
        ) -> Result<tonic::Response<super::ReplyState>, tonic::Status>{ 
   ...}
pub async fn get_kv(
            &mut self,
            request: impl tonic::IntoRequest<super::Key>,
        ) -> Result<tonic::Response<super::ReplyState>, tonic::Status>{ 
   ...}
pub async fn get_kv_list(
            &mut self,
            request: impl tonic::IntoRequest<super::RequestState>,
        ) -> Result<
            tonic::Response<tonic::codec::Streaming<super::KvPair>>,
            tonic::Status,
        > { 
   ...}
pub async fn set_kv_list(
            &mut self,
            request: impl tonic::IntoStreamingRequest<Message = super::KvPair>,
        ) -> Result<tonic::Response<super::ReplyState>, tonic::Status> { 
   ...}

(3)在 mod my_rpc_server下创建了trait及结构体:

#[async_trait]
pub trait MyRpc: Send + Sync + 'static { 
   
        ///simple rpc
        async fn set_kv(
            &self,
            request: tonic::Request<super::KvPair>,
        ) -> Result<tonic::Response<super::ReplyState>, tonic::Status>;
        async fn get_kv(
            &self,
            request: tonic::Request<super::Key>,
        ) -> Result<tonic::Response<super::ReplyState>, tonic::Status>;
        ///Server streaming response type for the GetKvList method.
        type GetKvListStream: futures_core::Stream<
                Item = Result<super::KvPair, tonic::Status>,
            >
            + Send
            + 'static;
        /// A server-to-client streaming RPC.
        async fn get_kv_list(
            &self,
            request: tonic::Request<super::RequestState>,
        ) -> Result<tonic::Response<Self::GetKvListStream>, tonic::Status>;
        /// A client-to-server streaming RPC.
        async fn set_kv_list(
            &self,
            request: tonic::Request<tonic::Streaming<super::KvPair>>,
        ) -> Result<tonic::Response<super::ReplyState>, tonic::Status>;
    }
	#[derive(Debug)]
    pub struct MyRpcServer<T: MyRpc> { 
   
        inner: _Inner<T>,
        accept_compression_encodings: (),
        send_compression_encodings: (),
    }
    struct _Inner<T>(Arc<T>);

为该结构实现:

	impl<T, B> tonic::codegen::Service<http::Request<B>> for MyRpcServer<T>
	where
	    T: MyRpc,
	    B: Body + Send + 'static,
	    B::Error: Into<StdError> + Send + 'static,
	        
	impl<T: MyRpc> Clone for MyRpcServer<T>
	impl<T: MyRpc> Clone for _Inner<T>
	impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T>
	impl<T: MyRpc> tonic::transport::NamedService for MyRpcServer<T>

3.服务器代码

(1)首先引入myproto.rs文件

pub mod myproto { 
   
    include!("pb/myproto.rs");
}

(2)然后建立RPC服务结构体并实现MyRpc trait
注意:在异步协程环境下要使用futures_util::lock::Mutex;而不是普通的Mutex,防止死锁。

#[derive(Debug)]
pub struct MyRpcService { 
   
    table: Arc<Mutex<HashMap<Key, Value>>>,
}

#[tonic::async_trait]
impl MyRpc for MyRpcService { 
   
    async fn set_kv(
        &self,
        _request: Request<KvPair>,
    ) -> Result<Response<ReplyState>, Status> { 
   
        println!("set_kv = {:?}", _request);

        let kvpair = _request.into_inner();
        let k = kvpair.key.expect("key should not none.");
        let v = kvpair.val.expect("value should not none.");
        let tb = self.table.clone();

        if let Some(val) = 
            tb.lock().await.insert(k.clone(), v) { 
   
            return Ok(Response::new(ReplyState { 
   
                reply_info: "update value, return old.".into(),
                kvpair: Some(KvPair { 
   
                    key: Some(k),
                    val: Some(val.clone()),
                }),
            }));
        }

        Ok(Response::new(
            ReplyState{ 
   
                reply_info: "set new kvpair.".into(),
                kvpair: Some(KvPair { 
   
                    key: Some(k),
                    val: None,
                }),
            }))
    }

    async fn get_kv(
        &self,
        _request: Request<Key>,
    ) -> Result<Response<ReplyState>, Status> { 
   
        println!("get_kv = {:?}", _request);

        let k = _request.into_inner();
        let tb = self.table.clone();
        
        if let Some(val) = 
            tb.lock().await.get(&k) { 
   
            return Ok(Response::new(ReplyState { 
   
                reply_info: "get success.".into(),
                kvpair: Some(KvPair { 
   
                    key: Some(k),
                    val: Some(val.clone()),
                }),
            }))
        }

        Ok(Response::new(ReplyState { 
   
            reply_info: "get failed.".into(),
            kvpair: Some(KvPair { 
   
                key: Some(k),
                val: None,
            }),
        }))
    }   

    ///Server streaming response type for the GetKvList method.
    type GetKvListStream = ReceiverStream<Result<KvPair, Status>>;

    /// A server-to-client streaming RPC.
    async fn get_kv_list(
        &self,
        _request: Request<RequestState>,
    ) -> Result<Response<Self::GetKvListStream>, Status> { 
   
        println!("get_kv_list = {:?}", _request);
        let (tx, rx) = mpsc::channel(10);
        let tb = self.table.clone();

        tokio::spawn(async move { 
   
            for (k, v) in tb.lock().await.iter() { 
   
                println!(" => send k = {:?}, v = {:?}", k, v);
                tx.send(Ok(KvPair { 
   
                    key: Some(k.clone()),
                    val: Some(v.clone()),
                })).await.unwrap();
            }
            println!(" /// done sending");
        });

        Ok(Response::new(ReceiverStream::new(rx)))
    }

    /// A client-to-server streaming RPC.
    async fn set_kv_list(
        &self,
        _request: Request<tonic::Streaming<KvPair>>,
    ) -> Result<Response<ReplyState>, Status> { 
   
        println!("set_kv_list = {:?}", _request);

        let tb = self.table.clone();
        let mut stream = _request.into_inner();

        while let Some(kvpair) = stream.next().await { 
   
            let kvpair = kvpair?;//stream.next().await -> Option<Result<...>>
            let k = kvpair.key;
            let v = kvpair.val;
            tb.lock().await.insert(k.unwrap(), v.unwrap());
        }

        Ok(Response::new(ReplyState { 
   
            reply_info: "set all kvpair done.".into(),
            kvpair: None,
        }))
    }
}

(3)编写主函数
  在主函数中启动RPC服务

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>>{ 
   
    let addr = "[::1]:10000".parse().unwrap();

    let my_rpc = MyRpcService { 
   
        table: Arc::new(Mutex::new(HashMap::new())),
    };

    let svc = MyRpcServer::new(my_rpc);

    Server::builder().add_service(svc).serve(addr).await?;

    Ok(())
}

4.客户端代码

(1)客户端代码引入myproto.rs文件

pub mod myproto { 
   
    include!("pb/myproto.rs");
}

(2)实现批量写入和读取函数

async fn print_kv_list(client: &mut MyRpcClient<Channel>) -> Result<(), Box<dyn Error>> { 
   
    let rqs = RequestState { 
   
        request_info: "get all.".into(),
    };

    let mut stream = client
        .get_kv_list(Request::new(rqs))
        .await?
        .into_inner();

    while let Some(kvpair) = stream.message().await? { 
   
        println!("KvPair = {:?}", kvpair);
    }

    Ok(())
}

async fn run_set_kv_list(client: &mut MyRpcClient<Channel>) -> Result<(), Box<dyn Error>> { 
   
    let mut pairs = vec![];

    for i in 0..3 { 
   
        pairs.push(KvPair { 
   
            key: Some(Key{ 
   key: i.to_string()}),
            val: Some(Value{ 
   val: i.to_string()})
        })
    }

    println!("pairs num = {:?}", pairs.len());
    let request = Request::new(stream::iter(pairs));

    match client.set_kv_list(request).await { 
   
        Ok(response) => println!("ReplyState = {:?}", response.into_inner()),
        Err(e) => println!("something went wrong: {:?}", e),
    }

    Ok(())
}

(3)客户端主函数

  在主函数中调用RPC服务

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { 
   
    let mut client = MyRpcClient::connect("http://[::1]:10000").await?;
    
    println!("*** SIMPLE RPC ***");
    let response0 = client
    .set_kv(Request::new(KvPair { 
   
        key: Some(Key{ 
   key: "a".into()}),
        val: Some(Value{ 
   val: "1". into()}),
    }))
    .await?;
    println!("RESPONSE0 = {:?}", response0);

    let response1 = client
    .get_kv(Request::new(Key{ 
   key: "a".into()}))
    .await?;
    println!("RESPONSE1 = {:?}", response1);

    println!("\n*** CLIENT STREAMING ***");
    run_set_kv_list(&mut client).await?;

    println!("\n*** SERVER STREAMING ***");
    print_kv_list(&mut client).await?;

    Ok(())
}

5.简单测试

(1)服务器打印情况
rust torch_Rust菜鸟教程「建议收藏」 (https://mushiming.com/)  第1张
(2)客户端打印情况
rust torch_Rust菜鸟教程「建议收藏」 (https://mushiming.com/)  第2张
通过标准输出的内容可以看到:
(1)客户端通过set_kv("a", 1)发送请求到服务器,服务器向客户端返回了响应RESPONSE0,表示已经成功写入。
(2)客户端调用get_kv("a")向服务器发送请求,此时客户端收到了RESPONSE1,得到了结果("a", 1)
(3)客户端调用set_kv_list(),收到了批量写入成功的响应。
(4)客户端调用get_kv_list(),收到了服务器存储的所有KvPair

THE END

发表回复