From 0bc97568082051f7c0f96f59f7b12a415ce642f7 Mon Sep 17 00:00:00 2001 From: SSpirits Date: Fri, 21 Apr 2023 12:10:23 +0800 Subject: [PATCH] api(rust): change api signature (#487) * api(rust): change api signature * chore(rust): rename SendResult to SendReceipt --- rust/Cargo.toml | 2 +- rust/examples/producer.rs | 2 +- rust/examples/simple_consumer.rs | 51 +++++++++++++++++++------------- rust/src/client.rs | 21 ++++++++----- rust/src/error.rs | 38 +++++++++++++++++++++--- rust/src/lib.rs | 6 ++-- rust/src/model/common.rs | 26 ++++++++++++++++ rust/src/producer.rs | 20 +++++-------- rust/src/session.rs | 27 ++++++++++------- rust/src/simple_consumer.rs | 22 ++++++++++---- 10 files changed, 150 insertions(+), 65 deletions(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 689ab0e5c..fc84e0d83 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -26,7 +26,7 @@ authors = [ license = "MIT/Apache-2.0" readme = "./README.md" -repository = "https://github.com/apache/rocketmq-clients" +repository = "https://github.com/apache/rocketmq-clients/tree/master/rust" documentation = "https://docs.rs/rocketmq" description = "Rust client for Apache RocketMQ" keywords = ["rocketmq", "api", "client", "sdk", "grpc"] diff --git a/rust/examples/producer.rs b/rust/examples/producer.rs index a7ad0ab70..acacabb86 100644 --- a/rust/examples/producer.rs +++ b/rust/examples/producer.rs @@ -46,6 +46,6 @@ async fn main() { debug_assert!(result.is_ok(), "send message failed: {:?}", result); println!( "send message success, message_id={}", - result.unwrap().message_id + result.unwrap().message_id() ); } diff --git a/rust/examples/simple_consumer.rs b/rust/examples/simple_consumer.rs index 4011556d6..33aae9d47 100644 --- a/rust/examples/simple_consumer.rs +++ b/rust/examples/simple_consumer.rs @@ -29,33 +29,42 @@ async fn main() { // set which rocketmq proxy to connect let mut client_option = ClientOption::default(); client_option.set_access_url("localhost:8081"); + client_option.set_enable_tls(false); // build and start simple consumer let consumer = SimpleConsumer::new(consumer_option, client_option).unwrap(); consumer.start().await.unwrap(); - // pop message from rocketmq proxy - let receive_result = consumer - .receive( - "test_topic".to_string(), - &FilterExpression::new(FilterType::Tag, "test_tag"), - ) - .await; - debug_assert!( - receive_result.is_ok(), - "receive message failed: {:?}", - receive_result.unwrap_err() - ); - - let messages = receive_result.unwrap(); - for message in messages { - println!("receive message: {:?}", message); - // ack message to rocketmq proxy - let ack_result = consumer.ack(message).await; + loop { + // pop message from rocketmq proxy + let receive_result = consumer + .receive( + "test_topic".to_string(), + &FilterExpression::new(FilterType::Tag, "test_tag"), + ) + .await; debug_assert!( - ack_result.is_ok(), - "ack message failed: {:?}", - ack_result.unwrap_err() + receive_result.is_ok(), + "receive message failed: {:?}", + receive_result.unwrap_err() ); + + let messages = receive_result.unwrap(); + + if messages.is_empty() { + println!("no message received"); + return; + } + + for message in messages { + println!("receive message: {:?}", message); + // ack message to rocketmq proxy + let ack_result = consumer.ack(&message).await; + debug_assert!( + ack_result.is_ok(), + "ack message failed: {:?}", + ack_result.unwrap_err() + ); + } } } diff --git a/rust/src/client.rs b/rust/src/client.rs index 11e4419ab..99d410f2a 100644 --- a/rust/src/client.rs +++ b/rust/src/client.rs @@ -28,14 +28,14 @@ use tokio::sync::oneshot; use crate::conf::ClientOption; use crate::error::{ClientError, ErrorKind}; -use crate::model::common::{ClientType, Endpoints, Route, RouteStatus}; +use crate::model::common::{ClientType, Endpoints, Route, RouteStatus, SendReceipt}; use crate::model::message::AckMessageEntry; use crate::pb; use crate::pb::receive_message_response::Content; use crate::pb::{ AckMessageRequest, AckMessageResultEntry, Code, FilterExpression, HeartbeatRequest, HeartbeatResponse, Message, MessageQueue, QueryRouteRequest, ReceiveMessageRequest, Resource, - SendMessageRequest, SendResultEntry, Status, TelemetryCommand, + SendMessageRequest, Status, TelemetryCommand, }; #[double] use crate::session::SessionManager; @@ -207,7 +207,7 @@ impl Client { pub(crate) fn topic_route_from_cache(&self, topic: &str) -> Option> { self.route_table.lock().get(topic).and_then(|route_status| { if let RouteStatus::Found(route) = route_status { - debug!(self.logger, "get route for topic={} from cache", topic); + // debug!(self.logger, "get route for topic={} from cache", topic); Some(Arc::clone(route)) } else { None @@ -359,7 +359,7 @@ impl Client { &self, endpoints: &Endpoints, messages: Vec, - ) -> Result, ClientError> { + ) -> Result, ClientError> { self.send_message_inner( self.get_session_with_endpoints(endpoints).await.unwrap(), messages, @@ -371,7 +371,7 @@ impl Client { &self, mut rpc_client: T, messages: Vec, - ) -> Result, ClientError> { + ) -> Result, ClientError> { let message_count = messages.len(); let request = SendMessageRequest { messages }; let response = rpc_client.send_message(request).await?; @@ -381,7 +381,11 @@ impl Client { error!(self.logger, "server do not return illegal send result, this may be a bug. except result count: {}, found: {}", response.entries.len(), message_count); } - Ok(response.entries) + Ok(response + .entries + .iter() + .map(SendReceipt::from_pb_send_result) + .collect()) } #[allow(dead_code)] @@ -431,6 +435,9 @@ impl Client { for response in responses { match response.content.unwrap() { Content::Status(status) => { + if status.code() == Code::MessageNotFound { + return Ok(vec![]); + } Self::handle_response_status(Some(status), OPERATION_RECEIVE_MESSAGE)?; } Content::Message(message) => { @@ -445,7 +452,7 @@ impl Client { #[allow(dead_code)] pub(crate) async fn ack_message( &self, - ack_entry: T, + ack_entry: &T, ) -> Result { let result = self .ack_message_inner( diff --git a/rust/src/error.rs b/rust/src/error.rs index 9eb3d5193..59d1eeb46 100644 --- a/rust/src/error.rs +++ b/rust/src/error.rs @@ -14,10 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +//! Error data model of RocketMQ rust client. + use std::error::Error; use std::fmt; use std::fmt::{Debug, Display, Formatter}; +/// Error type using by [`ClientError`]. #[derive(thiserror::Error, Debug, PartialEq, Eq)] pub enum ErrorKind { #[error("Failed to parse config")] @@ -51,6 +55,7 @@ pub enum ErrorKind { Unknown, } +/// Error returned by producer or consumer. pub struct ClientError { pub(crate) kind: ErrorKind, pub(crate) message: String, @@ -62,7 +67,7 @@ pub struct ClientError { impl Error for ClientError {} impl ClientError { - pub fn new(kind: ErrorKind, message: &str, operation: &'static str) -> Self { + pub(crate) fn new(kind: ErrorKind, message: &str, operation: &'static str) -> Self { Self { kind, message: message.to_string(), @@ -72,7 +77,32 @@ impl ClientError { } } - pub fn with_operation(mut self, operation: &'static str) -> Self { + /// Error type + pub fn kind(&self) -> &ErrorKind { + &self.kind + } + + /// Error message + pub fn message(&self) -> &str { + &self.message + } + + /// Name of operation that produced this error + pub fn operation(&self) -> &str { + self.operation + } + + /// Error context, formatted in key-value pairs + pub fn context(&self) -> &Vec<(&'static str, String)> { + &self.context + } + + /// Source error + pub fn source(&self) -> Option<&anyhow::Error> { + self.source.as_ref() + } + + pub(crate) fn with_operation(mut self, operation: &'static str) -> Self { if !self.operation.is_empty() { self.context.push(("called", self.operation.to_string())); } @@ -81,12 +111,12 @@ impl ClientError { self } - pub fn with_context(mut self, key: &'static str, value: impl Into) -> Self { + pub(crate) fn with_context(mut self, key: &'static str, value: impl Into) -> Self { self.context.push((key, value.into())); self } - pub fn set_source(mut self, src: impl Into) -> Self { + pub(crate) fn set_source(mut self, src: impl Into) -> Self { debug_assert!(self.source.is_none(), "the source error has been set"); self.source = Some(src.into()); diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 1a13a5a96..fcac32026 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -64,7 +64,7 @@ //! debug_assert!(result.is_ok(), "send message failed: {:?}", result); //! println!( //! "send message success, message_id={}", -//! result.unwrap().message_id +//! result.unwrap().message_id() //! ); //! } //! ``` @@ -108,7 +108,7 @@ //! for message in messages { //! println!("receive message: {:?}", message); //! // ack message to rocketmq proxy -//! let ack_result = consumer.ack(message).await; +//! let ack_result = consumer.ack(&message).await; //! debug_assert!( //! ack_result.is_ok(), //! "ack message failed: {:?}", @@ -121,7 +121,7 @@ #[allow(dead_code)] pub mod conf; -mod error; +pub mod error; #[allow(dead_code)] mod log; diff --git a/rust/src/model/common.rs b/rust/src/model/common.rs index 4023b9593..deaa12350 100644 --- a/rust/src/model/common.rs +++ b/rust/src/model/common.rs @@ -228,6 +228,32 @@ impl FilterExpression { } } +/// Send result returned by producer. +#[derive(Clone, Debug)] +pub struct SendReceipt { + pub(crate) message_id: String, + pub(crate) transaction_id: String, +} + +impl SendReceipt { + pub(crate) fn from_pb_send_result(entry: &pb::SendResultEntry) -> Self { + SendReceipt { + message_id: entry.message_id.clone(), + transaction_id: entry.transaction_id.clone(), + } + } + + /// Get message id + pub fn message_id(&self) -> &str { + &self.message_id + } + + /// Get transaction id + pub fn transaction_id(&self) -> &str { + &self.transaction_id + } +} + #[cfg(test)] mod tests { use crate::error::ErrorKind; diff --git a/rust/src/producer.rs b/rust/src/producer.rs index 697b70e30..8de941c80 100644 --- a/rust/src/producer.rs +++ b/rust/src/producer.rs @@ -25,9 +25,9 @@ use slog::{info, Logger}; use crate::client::Client; use crate::conf::{ClientOption, ProducerOption}; use crate::error::{ClientError, ErrorKind}; -use crate::model::common::ClientType; +use crate::model::common::{ClientType, SendReceipt}; use crate::model::message; -use crate::pb::{Encoding, Resource, SendResultEntry, SystemProperties}; +use crate::pb::{Encoding, Resource, SystemProperties}; use crate::util::{ build_endpoints_by_message_queue, build_producer_settings, select_message_queue, select_message_queue_by_message_group, HOST_NAME, @@ -185,7 +185,7 @@ impl Producer { pub async fn send_one( &self, message: impl message::Message, - ) -> Result { + ) -> Result { let results = self.send(vec![message]).await?; Ok(results[0].clone()) } @@ -198,7 +198,7 @@ impl Producer { pub async fn send( &self, messages: Vec, - ) -> Result, ClientError> { + ) -> Result, ClientError> { let (topic, message_group, mut pb_messages) = self.transform_messages_to_protobuf(messages)?; @@ -222,12 +222,13 @@ impl Producer { #[cfg(test)] mod tests { + use std::sync::Arc; + use crate::error::ErrorKind; use crate::log::terminal_logger; use crate::model::common::Route; use crate::model::message::{MessageBuilder, MessageImpl}; - use crate::pb::{Broker, Code, MessageQueue, Status}; - use std::sync::Arc; + use crate::pb::{Broker, MessageQueue}; use super::*; @@ -389,14 +390,9 @@ mod tests { })) }); producer.client.expect_send_message().returning(|_, _| { - Ok(vec![SendResultEntry { - status: Some(Status { - code: Code::Ok as i32, - message: "".to_string(), - }), + Ok(vec![SendReceipt { message_id: "".to_string(), transaction_id: "".to_string(), - offset: 0, }]) }); producer diff --git a/rust/src/session.rs b/rust/src/session.rs index 6ef9b97da..0660931f5 100644 --- a/rust/src/session.rs +++ b/rust/src/session.rs @@ -22,7 +22,7 @@ use slog::{debug, error, info, o, Logger}; use tokio::sync::{mpsc, Mutex}; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; -use tonic::metadata::AsciiMetadataValue; +use tonic::metadata::{AsciiMetadataValue, MetadataMap}; use tonic::transport::{Channel, Endpoint}; use crate::conf::ClientOption; @@ -193,8 +193,15 @@ impl Session { self.endpoints.endpoint_url() } - fn sign(&self, mut request: tonic::Request) -> tonic::Request { + fn sign(&self, message: T) -> tonic::Request { + let mut request = tonic::Request::new(message); let metadata = request.metadata_mut(); + self.sign_without_timeout(metadata); + request.set_timeout(*self.option.timeout()); + request + } + + fn sign_without_timeout(&self, metadata: &mut MetadataMap) { let _ = AsciiMetadataValue::try_from(&self.client_id) .map(|v| metadata.insert("x-mq-client-id", v)); @@ -210,9 +217,6 @@ impl Session { "x-mq-protocol-version", AsciiMetadataValue::from_static(PROTOCOL_VERSION), ); - - request.set_timeout(*self.option.timeout()); - request } pub(crate) async fn start(&mut self, settings: TelemetryCommand) -> Result<(), ClientError> { @@ -226,7 +230,8 @@ impl Session { .set_source(e) })?; - let request = self.sign(tonic::Request::new(ReceiverStream::new(rx))); + let mut request = tonic::Request::new(ReceiverStream::new(rx)); + self.sign_without_timeout(request.metadata_mut()); let response = self.stub.telemetry(request).await.map_err(|e| { ClientError::new( ErrorKind::ClientInternal, @@ -299,7 +304,7 @@ impl RPCClient for Session { &mut self, request: QueryRouteRequest, ) -> Result { - let request = self.sign(tonic::Request::new(request)); + let request = self.sign(request); let response = self.stub.query_route(request).await.map_err(|e| { ClientError::new( ErrorKind::ClientInternal, @@ -315,7 +320,7 @@ impl RPCClient for Session { &mut self, request: HeartbeatRequest, ) -> Result { - let request = self.sign(tonic::Request::new(request)); + let request = self.sign(request); let response = self.stub.heartbeat(request).await.map_err(|e| { ClientError::new( ErrorKind::ClientInternal, @@ -331,7 +336,7 @@ impl RPCClient for Session { &mut self, request: SendMessageRequest, ) -> Result { - let request = self.sign(tonic::Request::new(request)); + let request = self.sign(request); let response = self.stub.send_message(request).await.map_err(|e| { ClientError::new( ErrorKind::ClientInternal, @@ -348,7 +353,7 @@ impl RPCClient for Session { request: ReceiveMessageRequest, ) -> Result, ClientError> { let batch_size = request.batch_size; - let mut request = self.sign(tonic::Request::new(request)); + let mut request = self.sign(request); request.set_timeout(*self.option.long_polling_timeout()); let mut stream = self .stub @@ -383,7 +388,7 @@ impl RPCClient for Session { &mut self, request: AckMessageRequest, ) -> Result { - let request = self.sign(tonic::Request::new(request)); + let request = self.sign(request); let response = self.stub.ack_message(request).await.map_err(|e| { ClientError::new( ErrorKind::ClientInternal, diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs index eefa93b1b..cf74e8201 100644 --- a/rust/src/simple_consumer.rs +++ b/rust/src/simple_consumer.rs @@ -48,6 +48,7 @@ pub struct SimpleConsumer { } impl SimpleConsumer { + const OPERATION_NEW_SIMPLE_CONSUMER: &'static str = "simple_consumer.new"; const OPERATION_START_SIMPLE_CONSUMER: &'static str = "simple_consumer.start"; const OPERATION_RECEIVE_MESSAGE: &'static str = "simple_consumer.receive_message"; @@ -56,6 +57,14 @@ impl SimpleConsumer { option: SimpleConsumerOption, client_option: ClientOption, ) -> Result { + if option.consumer_group().is_empty() { + return Err(ClientError::new( + ErrorKind::Config, + "required option is missing: consumer group is empty", + Self::OPERATION_NEW_SIMPLE_CONSUMER, + )); + } + let client_option = ClientOption { client_type: ClientType::SimpleConsumer, group: option.consumer_group().to_string(), @@ -120,12 +129,12 @@ impl SimpleConsumer { /// * `invisible_duration` - set the invisible duration of messages that return from the server, these messages will not be visible to other consumers unless timeout pub async fn receive_with_batch_size( &self, - topic: &str, + topic: impl AsRef, expression: &FilterExpression, batch_size: i32, invisible_duration: Duration, ) -> Result, ClientError> { - let route = self.client.topic_route(topic, true).await?; + let route = self.client.topic_route(topic.as_ref(), true).await?; let message_queue = select_message_queue(route); let endpoints = build_endpoints_by_message_queue(&message_queue, Self::OPERATION_RECEIVE_MESSAGE)?; @@ -155,7 +164,10 @@ impl SimpleConsumer { /// # Arguments /// /// * `ack_entry` - special message view with handle want to ack - pub async fn ack(&self, ack_entry: impl AckMessageEntry + 'static) -> Result<(), ClientError> { + pub async fn ack( + &self, + ack_entry: &(impl AckMessageEntry + 'static), + ) -> Result<(), ClientError> { self.client.ack_message(ack_entry).await?; Ok(()) } @@ -238,7 +250,7 @@ mod tests { }); client .expect_ack_message() - .returning(|_: MessageView| Ok(AckMessageResultEntry::default())); + .returning(|_: &MessageView| Ok(AckMessageResultEntry::default())); let simple_consumer = SimpleConsumer { option: SimpleConsumerOption::default(), logger: terminal_logger(), @@ -253,7 +265,7 @@ mod tests { .await?; assert_eq!(messages.len(), 1); simple_consumer - .ack(messages.into_iter().next().unwrap()) + .ack(&messages.into_iter().next().unwrap()) .await?; Ok(()) }