Skip to content

Commit

Permalink
[ISSUE #563] Optimize example and doc for Rust SDK (#564)
Browse files Browse the repository at this point in the history
* feat(rust): optimize for example and doc

Signed-off-by: SSpirits <[email protected]>

* feat(rust): optimize readme

Signed-off-by: SSpirits <[email protected]>

* fix(rust): fix unit test

Signed-off-by: SSpirits <[email protected]>

---------

Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits committed Jul 10, 2023
1 parent 7fa131f commit 29d332b
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 18 deletions.
2 changes: 1 addition & 1 deletion rust/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@ cargo run --example simple_consumer
[codecov-url]: https://app.codecov.io/gh/apache/rocketmq-clients
[crates-image]: https://img.shields.io/crates/v/rocketmq.svg
[crates-url]: https://crates.io/crates/rocketmq
[rust-doc-image]: https://img.shields.io/crates/v/rocketmq.svg
[rust-doc-image]: https://img.shields.io/docsrs/rocketmq
[rust-doc-url]: https://docs.rs/rocketmq
39 changes: 33 additions & 6 deletions rust/examples/transaction_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::collections::HashSet;
use std::sync::Mutex;

use rocketmq::conf::{ClientOption, ProducerOption};
use rocketmq::model::message::MessageBuilder;
use rocketmq::model::transaction::{Transaction, TransactionResolution};
use rocketmq::Producer;

lazy_static::lazy_static! {
static ref MESSAGE_ID_SET: Mutex<HashSet<String>> = Mutex::new(HashSet::new());
}

#[tokio::main]
async fn main() {
// recommend to specify which topic(s) you would like to send message to
Expand All @@ -30,16 +37,28 @@ async fn main() {
let mut client_option = ClientOption::default();
client_option.set_access_url("localhost:8081");

// build and start producer
// build and start transaction producer, which has TransactionChecker
let mut producer = Producer::new_transaction_producer(
producer_option,
client_option,
Box::new(|transaction_id, message| {
println!(
"receive transaction check request: transaction_id: {}, message: {:?}",
transaction_id, message
);
TransactionResolution::COMMIT
if MESSAGE_ID_SET
.lock()
.unwrap()
.contains(message.message_id())
{
println!(
"commit transaction: transaction_id: {}, message_id: {}",
transaction_id, message.message_id()
);
TransactionResolution::COMMIT
} else {
println!(
"rollback transaction due to unknown message: transaction_id: {}, message_id: {}",
transaction_id, message.message_id()
);
TransactionResolution::ROLLBACK
}
}),
)
.unwrap();
Expand All @@ -65,6 +84,14 @@ async fn main() {
transaction.message_id(),
transaction.transaction_id()
);

MESSAGE_ID_SET
.lock()
.unwrap()
.insert(transaction.message_id().to_string());

// commit transaction manually
// delete following two lines so that RocketMQ server will check transaction status periodically
let result = transaction.commit().await;
debug_assert!(result.is_ok(), "commit transaction failed: {:?}", result);
}
17 changes: 13 additions & 4 deletions rust/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::conf::ClientOption;
use crate::error::{ClientError, ErrorKind};
use crate::model::common::{ClientType, Endpoints, Route, RouteStatus, SendReceipt};
use crate::model::message::{AckMessageEntry, MessageView};
use crate::model::transaction::TransactionChecker;
use crate::model::transaction::{TransactionChecker, TransactionResolution};
use crate::pb;
use crate::pb::receive_message_response::Content;
use crate::pb::telemetry_command::Command::RecoverOrphanedTransactionCommand;
Expand Down Expand Up @@ -109,14 +109,18 @@ impl Client {
self.telemetry_command_tx.is_some()
}

pub(crate) fn has_transaction_checker(&self) -> bool {
self.transaction_checker.is_some()
}

pub(crate) fn set_transaction_checker(&mut self, transaction_checker: Box<TransactionChecker>) {
if self.is_started() {
panic!("client {} is started, can not be modified", self.id)
}
self.transaction_checker = Some(transaction_checker);
}

pub(crate) async fn start(&mut self) {
pub(crate) async fn start(&mut self) -> Result<(), ClientError> {
let logger = self.logger.clone();
let session_manager = self.session_manager.clone();

Expand All @@ -127,9 +131,13 @@ impl Client {
// send heartbeat and handle telemetry command
let (tx, mut rx) = mpsc::channel(16);
self.telemetry_command_tx = Some(tx);
let rpc_client = self.get_session().await.unwrap();
let rpc_client = self.get_session().await?;
let endpoints = self.access_endpoints.clone();
let transaction_checker = self.transaction_checker.take();
// give a placeholder
if transaction_checker.is_some() {
self.transaction_checker = Some(Box::new(|_, _| TransactionResolution::UNKNOWN));
}
tokio::spawn(async move {
rpc_client.is_started();
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
Expand Down Expand Up @@ -181,6 +189,7 @@ impl Client {
}
}
});
Ok(())
}

async fn handle_telemetry_command<T: RPCClient + 'static>(
Expand Down Expand Up @@ -690,7 +699,7 @@ pub(crate) mod tests {
.returning(|_, _, _| Ok(Session::mock()));

let mut client = new_client_with_session_manager(session_manager);
client.start().await;
client.start().await?;

// TODO use countdown latch instead sleeping
// wait for run
Expand Down
8 changes: 7 additions & 1 deletion rust/src/model/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,18 @@ pub enum TransactionResolution {
COMMIT = 1,
/// Notify server that current transaction should be roll-backed.
ROLLBACK = 2,
/// Notify the server that the state of this transaction is not sure. You should be cautious before return unknown
/// Notify server that the state of this transaction is not sure. You should be cautious before return unknown
/// because the examination from the server will be performed periodically.
UNKNOWN = 0,
}

/// A closure to check the state of transaction.
/// RocketMQ Server will call producer periodically to check the state of uncommitted transaction.
///
/// # Arguments
///
/// * transaction id
/// * message
pub type TransactionChecker = dyn Fn(String, MessageView) -> TransactionResolution + Send + Sync;

#[cfg(test)]
Expand Down
20 changes: 16 additions & 4 deletions rust/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub struct Producer {

impl Producer {
const OPERATION_SEND_MESSAGE: &'static str = "producer.send_message";
const OPERATION_SEND_TRANSACTION_MESSAGE: &'static str = "producer.send_transaction_message";

/// Create a new producer instance
///
Expand Down Expand Up @@ -79,7 +80,7 @@ impl Producer {
///
/// * `option` - producer option
/// * `client_option` - client option
/// * `transaction_checker` - A closure to check the state of transaction.
/// * `transaction_checker` - handle server query for uncommitted transaction status
pub fn new_transaction_producer(
option: ProducerOption,
client_option: ClientOption,
Expand All @@ -103,7 +104,7 @@ impl Producer {

/// Start the producer
pub async fn start(&mut self) -> Result<(), ClientError> {
self.client.start().await;
self.client.start().await?;
if let Some(topics) = self.option.topics() {
for topic in topics {
self.client.topic_route(topic, true).await?;
Expand Down Expand Up @@ -265,6 +266,13 @@ impl Producer {
&self,
mut message: impl message::Message,
) -> Result<impl Transaction, ClientError> {
if !self.client.has_transaction_checker() {
return Err(ClientError::new(
ErrorKind::InvalidMessage,
"this producer can not send transaction message, please create a transaction producer using producer::new_transaction_producer",
Self::OPERATION_SEND_TRANSACTION_MESSAGE,
));
}
let topic = message.take_topic();
let receipt = self.send(message).await?;
Ok(TransactionImpl::new(
Expand Down Expand Up @@ -313,7 +321,7 @@ mod tests {
queue: vec![],
}))
});
client.expect_start().returning(|| ());
client.expect_start().returning(|| Ok(()));
client
.expect_client_id()
.return_const("fake_id".to_string());
Expand All @@ -340,7 +348,7 @@ mod tests {
queue: vec![],
}))
});
client.expect_start().returning(|| ());
client.expect_start().returning(|| Ok(()));
client.expect_set_transaction_checker().returning(|_| ());
client
.expect_client_id()
Expand Down Expand Up @@ -543,6 +551,10 @@ mod tests {
.client
.expect_get_session()
.return_once(|| Ok(Session::mock()));
producer
.client
.expect_has_transaction_checker()
.return_once(|| true);

let _ = producer
.send_transaction_message(
Expand Down
4 changes: 2 additions & 2 deletions rust/src/simple_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl SimpleConsumer {
Self::OPERATION_START_SIMPLE_CONSUMER,
));
}
self.client.start().await;
self.client.start().await?;
if let Some(topics) = self.option.topics() {
for topic in topics {
self.client.topic_route(topic, true).await?;
Expand Down Expand Up @@ -198,7 +198,7 @@ mod tests {
queue: vec![],
}))
});
client.expect_start().returning(|| ());
client.expect_start().returning(|| Ok(()));
client
.expect_client_id()
.return_const("fake_id".to_string());
Expand Down

0 comments on commit 29d332b

Please sign in to comment.