Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #563] Optimize example and doc for Rust SDK #564

Merged
merged 3 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@ cargo run --example simple_consumer
[codecov-url]: https://app.codecov.io/gh/apache/rocketmq-clients
[crates-image]: https://img.shields.io/crates/v/rocketmq.svg
[crates-url]: https://crates.io/crates/rocketmq
[rust-doc-image]: https://img.shields.io/crates/v/rocketmq.svg
[rust-doc-image]: https://img.shields.io/docsrs/rocketmq
[rust-doc-url]: https://docs.rs/rocketmq
39 changes: 33 additions & 6 deletions rust/examples/transaction_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::collections::HashSet;
use std::sync::Mutex;

use rocketmq::conf::{ClientOption, ProducerOption};
use rocketmq::model::message::MessageBuilder;
use rocketmq::model::transaction::{Transaction, TransactionResolution};
use rocketmq::Producer;

lazy_static::lazy_static! {
static ref MESSAGE_ID_SET: Mutex<HashSet<String>> = Mutex::new(HashSet::new());
}

#[tokio::main]
async fn main() {
// recommend to specify which topic(s) you would like to send message to
Expand All @@ -30,16 +37,28 @@ async fn main() {
let mut client_option = ClientOption::default();
client_option.set_access_url("localhost:8081");

// build and start producer
// build and start transaction producer, which has TransactionChecker
let mut producer = Producer::new_transaction_producer(
producer_option,
client_option,
Box::new(|transaction_id, message| {
println!(
"receive transaction check request: transaction_id: {}, message: {:?}",
transaction_id, message
);
TransactionResolution::COMMIT
if MESSAGE_ID_SET
.lock()
.unwrap()
.contains(message.message_id())
{
println!(
"commit transaction: transaction_id: {}, message_id: {}",
transaction_id, message.message_id()
);
TransactionResolution::COMMIT
} else {
println!(
"rollback transaction due to unknown message: transaction_id: {}, message_id: {}",
transaction_id, message.message_id()
);
TransactionResolution::ROLLBACK
}
}),
)
.unwrap();
Expand All @@ -65,6 +84,14 @@ async fn main() {
transaction.message_id(),
transaction.transaction_id()
);

MESSAGE_ID_SET
.lock()
.unwrap()
.insert(transaction.message_id().to_string());

// commit transaction manually
// delete following two lines so that RocketMQ server will check transaction status periodically
let result = transaction.commit().await;
debug_assert!(result.is_ok(), "commit transaction failed: {:?}", result);
}
17 changes: 13 additions & 4 deletions rust/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::conf::ClientOption;
use crate::error::{ClientError, ErrorKind};
use crate::model::common::{ClientType, Endpoints, Route, RouteStatus, SendReceipt};
use crate::model::message::{AckMessageEntry, MessageView};
use crate::model::transaction::TransactionChecker;
use crate::model::transaction::{TransactionChecker, TransactionResolution};
use crate::pb;
use crate::pb::receive_message_response::Content;
use crate::pb::telemetry_command::Command::RecoverOrphanedTransactionCommand;
Expand Down Expand Up @@ -109,14 +109,18 @@ impl Client {
self.telemetry_command_tx.is_some()
}

pub(crate) fn has_transaction_checker(&self) -> bool {
self.transaction_checker.is_some()
}

pub(crate) fn set_transaction_checker(&mut self, transaction_checker: Box<TransactionChecker>) {
if self.is_started() {
panic!("client {} is started, can not be modified", self.id)
}
self.transaction_checker = Some(transaction_checker);
}

pub(crate) async fn start(&mut self) {
pub(crate) async fn start(&mut self) -> Result<(), ClientError> {
let logger = self.logger.clone();
let session_manager = self.session_manager.clone();

Expand All @@ -127,9 +131,13 @@ impl Client {
// send heartbeat and handle telemetry command
let (tx, mut rx) = mpsc::channel(16);
self.telemetry_command_tx = Some(tx);
let rpc_client = self.get_session().await.unwrap();
let rpc_client = self.get_session().await?;
let endpoints = self.access_endpoints.clone();
let transaction_checker = self.transaction_checker.take();
// give a placeholder
if transaction_checker.is_some() {
self.transaction_checker = Some(Box::new(|_, _| TransactionResolution::UNKNOWN));
}
tokio::spawn(async move {
rpc_client.is_started();
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
Expand Down Expand Up @@ -181,6 +189,7 @@ impl Client {
}
}
});
Ok(())
}

async fn handle_telemetry_command<T: RPCClient + 'static>(
Expand Down Expand Up @@ -690,7 +699,7 @@ pub(crate) mod tests {
.returning(|_, _, _| Ok(Session::mock()));

let mut client = new_client_with_session_manager(session_manager);
client.start().await;
client.start().await?;

// TODO use countdown latch instead sleeping
// wait for run
Expand Down
8 changes: 7 additions & 1 deletion rust/src/model/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,18 @@ pub enum TransactionResolution {
COMMIT = 1,
/// Notify server that current transaction should be roll-backed.
ROLLBACK = 2,
/// Notify the server that the state of this transaction is not sure. You should be cautious before return unknown
/// Notify server that the state of this transaction is not sure. You should be cautious before return unknown
/// because the examination from the server will be performed periodically.
UNKNOWN = 0,
}

/// A closure to check the state of transaction.
/// RocketMQ Server will call producer periodically to check the state of uncommitted transaction.
///
/// # Arguments
///
/// * transaction id
/// * message
pub type TransactionChecker = dyn Fn(String, MessageView) -> TransactionResolution + Send + Sync;

#[cfg(test)]
Expand Down
20 changes: 16 additions & 4 deletions rust/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub struct Producer {

impl Producer {
const OPERATION_SEND_MESSAGE: &'static str = "producer.send_message";
const OPERATION_SEND_TRANSACTION_MESSAGE: &'static str = "producer.send_transaction_message";

/// Create a new producer instance
///
Expand Down Expand Up @@ -79,7 +80,7 @@ impl Producer {
///
/// * `option` - producer option
/// * `client_option` - client option
/// * `transaction_checker` - A closure to check the state of transaction.
/// * `transaction_checker` - handle server query for uncommitted transaction status
pub fn new_transaction_producer(
option: ProducerOption,
client_option: ClientOption,
Expand All @@ -103,7 +104,7 @@ impl Producer {

/// Start the producer
pub async fn start(&mut self) -> Result<(), ClientError> {
self.client.start().await;
self.client.start().await?;
if let Some(topics) = self.option.topics() {
for topic in topics {
self.client.topic_route(topic, true).await?;
Expand Down Expand Up @@ -265,6 +266,13 @@ impl Producer {
&self,
mut message: impl message::Message,
) -> Result<impl Transaction, ClientError> {
if !self.client.has_transaction_checker() {
return Err(ClientError::new(
ErrorKind::InvalidMessage,
"this producer can not send transaction message, please create a transaction producer using producer::new_transaction_producer",
Self::OPERATION_SEND_TRANSACTION_MESSAGE,
));
}
let topic = message.take_topic();
let receipt = self.send(message).await?;
Ok(TransactionImpl::new(
Expand Down Expand Up @@ -313,7 +321,7 @@ mod tests {
queue: vec![],
}))
});
client.expect_start().returning(|| ());
client.expect_start().returning(|| Ok(()));
client
.expect_client_id()
.return_const("fake_id".to_string());
Expand All @@ -340,7 +348,7 @@ mod tests {
queue: vec![],
}))
});
client.expect_start().returning(|| ());
client.expect_start().returning(|| Ok(()));
client.expect_set_transaction_checker().returning(|_| ());
client
.expect_client_id()
Expand Down Expand Up @@ -543,6 +551,10 @@ mod tests {
.client
.expect_get_session()
.return_once(|| Ok(Session::mock()));
producer
.client
.expect_has_transaction_checker()
.return_once(|| true);

let _ = producer
.send_transaction_message(
Expand Down
4 changes: 2 additions & 2 deletions rust/src/simple_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl SimpleConsumer {
Self::OPERATION_START_SIMPLE_CONSUMER,
));
}
self.client.start().await;
self.client.start().await?;
if let Some(topics) = self.option.topics() {
for topic in topics {
self.client.topic_route(topic, true).await?;
Expand Down Expand Up @@ -198,7 +198,7 @@ mod tests {
queue: vec![],
}))
});
client.expect_start().returning(|| ());
client.expect_start().returning(|| Ok(()));
client
.expect_client_id()
.return_const("fake_id".to_string());
Expand Down
Loading