Skip to content

Commit

Permalink
api(rust): change api signature (#487)
Browse files Browse the repository at this point in the history
* api(rust): change api signature

* chore(rust): rename SendResult to SendReceipt
  • Loading branch information
ShadowySpirits committed Apr 21, 2023
1 parent bc47512 commit 0bc9756
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 65 deletions.
2 changes: 1 addition & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ authors = [

license = "MIT/Apache-2.0"
readme = "./README.md"
repository = "https://github.com/apache/rocketmq-clients"
repository = "https://github.com/apache/rocketmq-clients/tree/master/rust"
documentation = "https://docs.rs/rocketmq"
description = "Rust client for Apache RocketMQ"
keywords = ["rocketmq", "api", "client", "sdk", "grpc"]
Expand Down
2 changes: 1 addition & 1 deletion rust/examples/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,6 @@ async fn main() {
debug_assert!(result.is_ok(), "send message failed: {:?}", result);
println!(
"send message success, message_id={}",
result.unwrap().message_id
result.unwrap().message_id()
);
}
51 changes: 30 additions & 21 deletions rust/examples/simple_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,42 @@ async fn main() {
// set which rocketmq proxy to connect
let mut client_option = ClientOption::default();
client_option.set_access_url("localhost:8081");
client_option.set_enable_tls(false);

// build and start simple consumer
let consumer = SimpleConsumer::new(consumer_option, client_option).unwrap();
consumer.start().await.unwrap();

// pop message from rocketmq proxy
let receive_result = consumer
.receive(
"test_topic".to_string(),
&FilterExpression::new(FilterType::Tag, "test_tag"),
)
.await;
debug_assert!(
receive_result.is_ok(),
"receive message failed: {:?}",
receive_result.unwrap_err()
);

let messages = receive_result.unwrap();
for message in messages {
println!("receive message: {:?}", message);
// ack message to rocketmq proxy
let ack_result = consumer.ack(message).await;
loop {
// pop message from rocketmq proxy
let receive_result = consumer
.receive(
"test_topic".to_string(),
&FilterExpression::new(FilterType::Tag, "test_tag"),
)
.await;
debug_assert!(
ack_result.is_ok(),
"ack message failed: {:?}",
ack_result.unwrap_err()
receive_result.is_ok(),
"receive message failed: {:?}",
receive_result.unwrap_err()
);

let messages = receive_result.unwrap();

if messages.is_empty() {
println!("no message received");
return;
}

for message in messages {
println!("receive message: {:?}", message);
// ack message to rocketmq proxy
let ack_result = consumer.ack(&message).await;
debug_assert!(
ack_result.is_ok(),
"ack message failed: {:?}",
ack_result.unwrap_err()
);
}
}
}
21 changes: 14 additions & 7 deletions rust/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ use tokio::sync::oneshot;

use crate::conf::ClientOption;
use crate::error::{ClientError, ErrorKind};
use crate::model::common::{ClientType, Endpoints, Route, RouteStatus};
use crate::model::common::{ClientType, Endpoints, Route, RouteStatus, SendReceipt};
use crate::model::message::AckMessageEntry;
use crate::pb;
use crate::pb::receive_message_response::Content;
use crate::pb::{
AckMessageRequest, AckMessageResultEntry, Code, FilterExpression, HeartbeatRequest,
HeartbeatResponse, Message, MessageQueue, QueryRouteRequest, ReceiveMessageRequest, Resource,
SendMessageRequest, SendResultEntry, Status, TelemetryCommand,
SendMessageRequest, Status, TelemetryCommand,
};
#[double]
use crate::session::SessionManager;
Expand Down Expand Up @@ -207,7 +207,7 @@ impl Client {
pub(crate) fn topic_route_from_cache(&self, topic: &str) -> Option<Arc<Route>> {
self.route_table.lock().get(topic).and_then(|route_status| {
if let RouteStatus::Found(route) = route_status {
debug!(self.logger, "get route for topic={} from cache", topic);
// debug!(self.logger, "get route for topic={} from cache", topic);
Some(Arc::clone(route))
} else {
None
Expand Down Expand Up @@ -359,7 +359,7 @@ impl Client {
&self,
endpoints: &Endpoints,
messages: Vec<Message>,
) -> Result<Vec<SendResultEntry>, ClientError> {
) -> Result<Vec<SendReceipt>, ClientError> {
self.send_message_inner(
self.get_session_with_endpoints(endpoints).await.unwrap(),
messages,
Expand All @@ -371,7 +371,7 @@ impl Client {
&self,
mut rpc_client: T,
messages: Vec<Message>,
) -> Result<Vec<SendResultEntry>, ClientError> {
) -> Result<Vec<SendReceipt>, ClientError> {
let message_count = messages.len();
let request = SendMessageRequest { messages };
let response = rpc_client.send_message(request).await?;
Expand All @@ -381,7 +381,11 @@ impl Client {
error!(self.logger, "server do not return illegal send result, this may be a bug. except result count: {}, found: {}", response.entries.len(), message_count);
}

Ok(response.entries)
Ok(response
.entries
.iter()
.map(SendReceipt::from_pb_send_result)
.collect())
}

#[allow(dead_code)]
Expand Down Expand Up @@ -431,6 +435,9 @@ impl Client {
for response in responses {
match response.content.unwrap() {
Content::Status(status) => {
if status.code() == Code::MessageNotFound {
return Ok(vec![]);
}
Self::handle_response_status(Some(status), OPERATION_RECEIVE_MESSAGE)?;
}
Content::Message(message) => {
Expand All @@ -445,7 +452,7 @@ impl Client {
#[allow(dead_code)]
pub(crate) async fn ack_message<T: AckMessageEntry + 'static>(
&self,
ack_entry: T,
ack_entry: &T,
) -> Result<AckMessageResultEntry, ClientError> {
let result = self
.ack_message_inner(
Expand Down
38 changes: 34 additions & 4 deletions rust/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

//! Error data model of RocketMQ rust client.

use std::error::Error;
use std::fmt;
use std::fmt::{Debug, Display, Formatter};

/// Error type using by [`ClientError`].
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
pub enum ErrorKind {
#[error("Failed to parse config")]
Expand Down Expand Up @@ -51,6 +55,7 @@ pub enum ErrorKind {
Unknown,
}

/// Error returned by producer or consumer.
pub struct ClientError {
pub(crate) kind: ErrorKind,
pub(crate) message: String,
Expand All @@ -62,7 +67,7 @@ pub struct ClientError {
impl Error for ClientError {}

impl ClientError {
pub fn new(kind: ErrorKind, message: &str, operation: &'static str) -> Self {
pub(crate) fn new(kind: ErrorKind, message: &str, operation: &'static str) -> Self {
Self {
kind,
message: message.to_string(),
Expand All @@ -72,7 +77,32 @@ impl ClientError {
}
}

pub fn with_operation(mut self, operation: &'static str) -> Self {
/// Error type
pub fn kind(&self) -> &ErrorKind {
&self.kind
}

/// Error message
pub fn message(&self) -> &str {
&self.message
}

/// Name of operation that produced this error
pub fn operation(&self) -> &str {
self.operation
}

/// Error context, formatted in key-value pairs
pub fn context(&self) -> &Vec<(&'static str, String)> {
&self.context
}

/// Source error
pub fn source(&self) -> Option<&anyhow::Error> {
self.source.as_ref()
}

pub(crate) fn with_operation(mut self, operation: &'static str) -> Self {
if !self.operation.is_empty() {
self.context.push(("called", self.operation.to_string()));
}
Expand All @@ -81,12 +111,12 @@ impl ClientError {
self
}

pub fn with_context(mut self, key: &'static str, value: impl Into<String>) -> Self {
pub(crate) fn with_context(mut self, key: &'static str, value: impl Into<String>) -> Self {
self.context.push((key, value.into()));
self
}

pub fn set_source(mut self, src: impl Into<anyhow::Error>) -> Self {
pub(crate) fn set_source(mut self, src: impl Into<anyhow::Error>) -> Self {
debug_assert!(self.source.is_none(), "the source error has been set");

self.source = Some(src.into());
Expand Down
6 changes: 3 additions & 3 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
//! debug_assert!(result.is_ok(), "send message failed: {:?}", result);
//! println!(
//! "send message success, message_id={}",
//! result.unwrap().message_id
//! result.unwrap().message_id()
//! );
//! }
//! ```
Expand Down Expand Up @@ -108,7 +108,7 @@
//! for message in messages {
//! println!("receive message: {:?}", message);
//! // ack message to rocketmq proxy
//! let ack_result = consumer.ack(message).await;
//! let ack_result = consumer.ack(&message).await;
//! debug_assert!(
//! ack_result.is_ok(),
//! "ack message failed: {:?}",
Expand All @@ -121,7 +121,7 @@

#[allow(dead_code)]
pub mod conf;
mod error;
pub mod error;
#[allow(dead_code)]
mod log;

Expand Down
26 changes: 26 additions & 0 deletions rust/src/model/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,32 @@ impl FilterExpression {
}
}

/// Send result returned by producer.
#[derive(Clone, Debug)]
pub struct SendReceipt {
pub(crate) message_id: String,
pub(crate) transaction_id: String,
}

impl SendReceipt {
pub(crate) fn from_pb_send_result(entry: &pb::SendResultEntry) -> Self {
SendReceipt {
message_id: entry.message_id.clone(),
transaction_id: entry.transaction_id.clone(),
}
}

/// Get message id
pub fn message_id(&self) -> &str {
&self.message_id
}

/// Get transaction id
pub fn transaction_id(&self) -> &str {
&self.transaction_id
}
}

#[cfg(test)]
mod tests {
use crate::error::ErrorKind;
Expand Down
20 changes: 8 additions & 12 deletions rust/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use slog::{info, Logger};
use crate::client::Client;
use crate::conf::{ClientOption, ProducerOption};
use crate::error::{ClientError, ErrorKind};
use crate::model::common::ClientType;
use crate::model::common::{ClientType, SendReceipt};
use crate::model::message;
use crate::pb::{Encoding, Resource, SendResultEntry, SystemProperties};
use crate::pb::{Encoding, Resource, SystemProperties};
use crate::util::{
build_endpoints_by_message_queue, build_producer_settings, select_message_queue,
select_message_queue_by_message_group, HOST_NAME,
Expand Down Expand Up @@ -185,7 +185,7 @@ impl Producer {
pub async fn send_one(
&self,
message: impl message::Message,
) -> Result<SendResultEntry, ClientError> {
) -> Result<SendReceipt, ClientError> {
let results = self.send(vec![message]).await?;
Ok(results[0].clone())
}
Expand All @@ -198,7 +198,7 @@ impl Producer {
pub async fn send(
&self,
messages: Vec<impl message::Message>,
) -> Result<Vec<SendResultEntry>, ClientError> {
) -> Result<Vec<SendReceipt>, ClientError> {
let (topic, message_group, mut pb_messages) =
self.transform_messages_to_protobuf(messages)?;

Expand All @@ -222,12 +222,13 @@ impl Producer {

#[cfg(test)]
mod tests {
use std::sync::Arc;

use crate::error::ErrorKind;
use crate::log::terminal_logger;
use crate::model::common::Route;
use crate::model::message::{MessageBuilder, MessageImpl};
use crate::pb::{Broker, Code, MessageQueue, Status};
use std::sync::Arc;
use crate::pb::{Broker, MessageQueue};

use super::*;

Expand Down Expand Up @@ -389,14 +390,9 @@ mod tests {
}))
});
producer.client.expect_send_message().returning(|_, _| {
Ok(vec![SendResultEntry {
status: Some(Status {
code: Code::Ok as i32,
message: "".to_string(),
}),
Ok(vec![SendReceipt {
message_id: "".to_string(),
transaction_id: "".to_string(),
offset: 0,
}])
});
producer
Expand Down
Loading

0 comments on commit 0bc9756

Please sign in to comment.