Skip to content

Commit

Permalink
feat(rust): support changing invisible duration (#623)
Browse files Browse the repository at this point in the history
* feat(rust): support changing invisible duration

Signed-off-by: SSpirits <[email protected]>

* fix(rust): fix clippy warning

Signed-off-by: SSpirits <[email protected]>

* feat(rust): configure the example branch using features

Signed-off-by: SSpirits <[email protected]>

* fix(rust): fix clippy warning

Signed-off-by: SSpirits <[email protected]>

---------

Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits committed Oct 16, 2023
1 parent 98f4880 commit 9fec02b
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 39 deletions.
15 changes: 10 additions & 5 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ keywords = ["rocketmq", "api", "client", "sdk", "grpc"]

[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-rustls = {version = "0.24.0", features = ["default", "dangerous_configuration"] }
tokio-stream="0.1.12"
tokio-rustls = { version = "0.24.0", features = ["default", "dangerous_configuration"] }
tokio-stream = "0.1.12"
async-trait = "0.1.68"
lazy_static = "1.4"
tonic = {version = "0.9.0", features = ["tls", "default", "channel", "tls-roots"]}
tonic = { version = "0.9.0", features = ["tls", "default", "channel", "tls-roots"] }
prost = "0.11.8"
prost-types = "0.11.8"

Expand All @@ -47,7 +47,7 @@ parking_lot = "0.12"
hostname = "0.3.1"
os_info = "3"

slog = {version = "2.7.0", features=["max_level_trace", "release_max_level_info"]}
slog = { version = "2.7.0", features = ["max_level_trace", "release_max_level_info"] }
slog-term = "2.9.0"
slog-async = "2.7.0"
slog-json = "2.6.1"
Expand All @@ -63,7 +63,7 @@ time = "0.3"
once_cell = "1.9.0"

mockall = "0.11.4"
mockall_double= "0.3.0"
mockall_double = "0.3.0"

siphasher = "0.3.10"
ring = "0.16.20"
Expand All @@ -78,3 +78,8 @@ regex = "1.7.3"
wiremock-grpc = "0.0.3-alpha2"
futures = "0.3"
awaitility = "0.3.0"

[features]
default = ["example_ack"]
example_ack = []
example_change_invisible_duration = []
2 changes: 1 addition & 1 deletion rust/examples/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async fn main() {
// shutdown the producer when you don't need it anymore.
// you should shutdown it manually to gracefully stop and unregister from server
let shutdown_result = producer.shutdown().await;
if shutdown_result.is_ok() {
if shutdown_result.is_err() {
eprintln!(
"producer shutdown failed: {:?}",
shutdown_result.unwrap_err()
Expand Down
42 changes: 35 additions & 7 deletions rust/examples/simple_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#[cfg(feature = "example_change_invisible_duration")]
use std::time::Duration;

use rocketmq::conf::{ClientOption, SimpleConsumerOption};
use rocketmq::model::common::{FilterExpression, FilterType};
use rocketmq::SimpleConsumer;
Expand Down Expand Up @@ -63,14 +66,39 @@ async fn main() {

for message in messages {
println!("receive message: {:?}", message);
// ack message to rocketmq proxy
let ack_result = consumer.ack(&message).await;
if ack_result.is_err() {
eprintln!(
"ack message {} failed: {:?}",
message.message_id(),
ack_result.unwrap_err()

// Do your business logic here
// And then acknowledge the message to the RocketMQ proxy if everything is okay
#[cfg(feature = "example_ack")]
{
println!("ack message {}", message.message_id());
let ack_result = consumer.ack(&message).await;
if ack_result.is_err() {
eprintln!(
"ack message {} failed: {:?}",
message.message_id(),
ack_result.unwrap_err()
);
}
}

// Otherwise, you can retry this message later by changing the invisible duration
#[cfg(feature = "example_change_invisible_duration")]
{
println!(
"Delay next visible time of message {} by 10s",
message.message_id()
);
let change_invisible_duration_result = consumer
.change_invisible_duration(&message, Duration::from_secs(10))
.await;
if change_invisible_duration_result.is_err() {
eprintln!(
"change message {} invisible duration failed: {:?}",
message.message_id(),
change_invisible_duration_result.unwrap_err()
);
}
}
}

Expand Down
98 changes: 83 additions & 15 deletions rust/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ use crate::model::message::{AckMessageEntry, MessageView};
use crate::model::transaction::{TransactionChecker, TransactionResolution};
use crate::pb;
use crate::pb::receive_message_response::Content;
use crate::pb::telemetry_command::Command::RecoverOrphanedTransactionCommand;
use crate::pb::telemetry_command::Command::{RecoverOrphanedTransactionCommand, Settings};
use crate::pb::{
AckMessageRequest, AckMessageResultEntry, Code, EndTransactionRequest, FilterExpression,
HeartbeatRequest, HeartbeatResponse, Message, MessageQueue, NotifyClientTerminationRequest,
QueryRouteRequest, ReceiveMessageRequest, Resource, SendMessageRequest, Status,
TelemetryCommand, TransactionSource,
AckMessageRequest, AckMessageResultEntry, ChangeInvisibleDurationRequest, Code,
EndTransactionRequest, FilterExpression, HeartbeatRequest, HeartbeatResponse, Message,
MessageQueue, NotifyClientTerminationRequest, QueryRouteRequest, ReceiveMessageRequest,
Resource, SendMessageRequest, Status, TelemetryCommand, TransactionSource,
};
#[double]
use crate::session::SessionManager;
Expand Down Expand Up @@ -282,6 +282,7 @@ impl Client {
))
}
}
Settings(_) => Ok(()),
_ => Err(ClientError::new(
ErrorKind::Config,
"receive telemetry command but there is no handler",
Expand All @@ -291,7 +292,6 @@ impl Client {
};
}

#[allow(dead_code)]
pub(crate) fn client_id(&self) -> &str {
&self.id
}
Expand Down Expand Up @@ -378,7 +378,6 @@ impl Client {
})
}

#[allow(dead_code)]
pub(crate) async fn topic_route(
&self,
topic: &str,
Expand Down Expand Up @@ -461,8 +460,7 @@ impl Client {
let result = self.query_topic_route(rpc_client, topic).await;

// send result to all waiters
if result.is_ok() {
let route = result.unwrap();
if let Ok(route) = result {
debug!(
self.logger,
"query route for topic={} success: route={:?}", topic, route
Expand Down Expand Up @@ -518,7 +516,6 @@ impl Client {
Ok(response)
}

#[allow(dead_code)]
pub(crate) async fn send_message(
&self,
endpoints: &Endpoints,
Expand Down Expand Up @@ -547,7 +544,6 @@ impl Client {
.collect())
}

#[allow(dead_code)]
pub(crate) async fn receive_message(
&self,
endpoints: &Endpoints,
Expand Down Expand Up @@ -608,7 +604,6 @@ impl Client {
Ok(messages)
}

#[allow(dead_code)]
pub(crate) async fn ack_message<T: AckMessageEntry + 'static>(
&self,
ack_entry: &T,
Expand Down Expand Up @@ -649,6 +644,51 @@ impl Client {
Self::handle_response_status(response.status, OPERATION_ACK_MESSAGE)?;
Ok(response.entries)
}

pub(crate) async fn change_invisible_duration<T: AckMessageEntry + 'static>(
&self,
ack_entry: &T,
invisible_duration: Duration,
) -> Result<String, ClientError> {
let result = self
.change_invisible_duration_inner(
self.get_session_with_endpoints(ack_entry.endpoints())
.await
.unwrap(),
ack_entry.topic(),
ack_entry.receipt_handle(),
invisible_duration,
ack_entry.message_id(),
)
.await?;
Ok(result)
}

pub(crate) async fn change_invisible_duration_inner<T: RPCClient + 'static>(
&self,
mut rpc_client: T,
topic: String,
receipt_handle: String,
invisible_duration: Duration,
message_id: String,
) -> Result<String, ClientError> {
let request = ChangeInvisibleDurationRequest {
group: Some(Resource {
name: self.option.group.as_ref().unwrap().to_string(),
resource_namespace: self.option.namespace.to_string(),
}),
topic: Some(Resource {
name: topic,
resource_namespace: self.option.namespace.to_string(),
}),
receipt_handle,
invisible_duration: Some(invisible_duration),
message_id,
};
let response = rpc_client.change_invisible_duration(request).await?;
Self::handle_response_status(response.status, OPERATION_ACK_MESSAGE)?;
Ok(response.receipt_handle)
}
}

#[cfg(test)]
Expand All @@ -668,9 +708,10 @@ pub(crate) mod tests {
use crate::model::transaction::TransactionResolution;
use crate::pb::receive_message_response::Content;
use crate::pb::{
AckMessageEntry, AckMessageResponse, Code, EndTransactionResponse, FilterExpression,
HeartbeatResponse, Message, MessageQueue, QueryRouteResponse, ReceiveMessageResponse,
Resource, SendMessageResponse, Status, SystemProperties, TelemetryCommand,
AckMessageEntry, AckMessageResponse, ChangeInvisibleDurationResponse, Code,
EndTransactionResponse, FilterExpression, HeartbeatResponse, Message, MessageQueue,
QueryRouteResponse, ReceiveMessageResponse, Resource, SendMessageResponse, Status,
SystemProperties, TelemetryCommand,
};
use crate::session;

Expand Down Expand Up @@ -1045,6 +1086,33 @@ pub(crate) mod tests {
assert_eq!(ack_result.unwrap().len(), 0);
}

#[tokio::test]
async fn client_change_invisible_duration() {
let response = Ok(ChangeInvisibleDurationResponse {
status: Some(Status {
code: Code::Ok as i32,
message: "Success".to_string(),
}),
receipt_handle: "receipt_handle".to_string(),
});
let mut mock = session::MockRPCClient::new();
mock.expect_change_invisible_duration()
.return_once(|_| Box::pin(futures::future::ready(response)));

let client = new_client_for_test();
let change_invisible_duration_result = client
.change_invisible_duration_inner(
mock,
"test_topic".to_string(),
"receipt_handle".to_string(),
prost_types::Duration::default(),
"message_id".to_string(),
)
.await;
assert!(change_invisible_duration_result.is_ok());
assert_eq!(change_invisible_duration_result.unwrap(), "receipt_handle");
}

#[tokio::test]
async fn client_ack_message_failed() {
let response = Ok(AckMessageResponse {
Expand Down
3 changes: 2 additions & 1 deletion rust/src/model/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ use crate::error::{ClientError, ErrorKind};
use crate::pb;
use crate::pb::{Address, AddressScheme, MessageQueue};

#[allow(dead_code)]
#[derive(Debug, Clone)]
pub(crate) enum ClientType {
Producer = 1,
#[allow(dead_code)]
PushConsumer = 2,
SimpleConsumer = 3,
#[allow(dead_code)]
PullConsumer = 4,
}

Expand Down
6 changes: 3 additions & 3 deletions rust/src/model/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,19 @@ impl Message for MessageImpl {
}

fn take_body(&mut self) -> Vec<u8> {
self.body.take().unwrap_or(vec![])
self.body.take().unwrap_or_default()
}

fn take_tag(&mut self) -> Option<String> {
self.tag.take()
}

fn take_keys(&mut self) -> Vec<String> {
self.keys.take().unwrap_or(vec![])
self.keys.take().unwrap_or_default()
}

fn take_properties(&mut self) -> HashMap<String, String> {
self.properties.take().unwrap_or(HashMap::new())
self.properties.take().unwrap_or_default()
}

fn take_message_group(&mut self) -> Option<String> {
Expand Down
4 changes: 2 additions & 2 deletions rust/src/model/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ impl TransactionImpl {
#[async_trait]
impl Transaction for TransactionImpl {
async fn commit(mut self) -> Result<(), ClientError> {
return self.end_transaction(TransactionResolution::COMMIT).await;
self.end_transaction(TransactionResolution::COMMIT).await
}

async fn rollback(mut self) -> Result<(), ClientError> {
return self.end_transaction(TransactionResolution::ROLLBACK).await;
self.end_transaction(TransactionResolution::ROLLBACK).await
}

fn message_id(&self) -> &str {
Expand Down
Loading

0 comments on commit 9fec02b

Please sign in to comment.