Skip to content

Commit

Permalink
[ISSUE #555] Support transaction message for Rust SDK (#556)
Browse files Browse the repository at this point in the history
* feat(rust): implement transaction checker

Signed-off-by: SSpirits <[email protected]>

* feat(rust): implement transaction producer

Signed-off-by: SSpirits <[email protected]>

* feat(rust): update readme

Signed-off-by: SSpirits <[email protected]>

* feat(rust): commit transaction in example

Signed-off-by: SSpirits <[email protected]>

---------

Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits committed Jul 3, 2023
1 parent af9c7e2 commit 5aed664
Show file tree
Hide file tree
Showing 18 changed files with 855 additions and 85 deletions.
2 changes: 1 addition & 1 deletion README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
| Producer with standard messages |||||| 🚧 | 🚧 | 🚧 |
| Producer with FIFO messages |||||| 🚧 | 🚧 | 🚧 |
| Producer with timed/delay messages |||||| 🚧 | 🚧 | 🚧 |
| Producer with transactional messages ||||| 🚧 | 🚧 | 🚧 | 🚧 |
| Producer with transactional messages ||||| | 🚧 | 🚧 | 🚧 |
| Simple consumer |||||| 🚧 | 🚧 | 🚧 |
| Push consumer with concurrent message listener ||| 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 |
| Push consumer with FIFO message listener ||| 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 |
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Provide cloud-native and robust solutions for Java, C++, C#, Golang, Rust and al
| Producer with standard messages |||||| 🚧 | 🚧 | 🚧 |
| Producer with FIFO messages |||||| 🚧 | 🚧 | 🚧 |
| Producer with timed/delay messages |||||| 🚧 | 🚧 | 🚧 |
| Producer with transactional messages ||||| 🚧 | 🚧 | 🚧 | 🚧 |
| Producer with transactional messages ||||| | 🚧 | 🚧 | 🚧 |
| Simple consumer |||||| 🚧 | 🚧 | 🚧 |
| Push consumer with concurrent message listener ||| 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 |
| Push consumer with FIFO message listener ||| 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 |
Expand Down
2 changes: 1 addition & 1 deletion rust/.cargo/Cargo.lock.min

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#
[package]
name = "rocketmq"
version = "0.1.1"
version = "0.1.2"
edition = "2021"
rust-version = "1.61"
authors = [
Expand Down
60 changes: 60 additions & 0 deletions rust/examples/delay_producer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::ops::Add;
use std::time::{Duration, SystemTime};

use rocketmq::conf::{ClientOption, ProducerOption};
use rocketmq::model::message::MessageBuilder;
use rocketmq::Producer;

#[tokio::main]
async fn main() {
// recommend to specify which topic(s) you would like to send message to
// producer will prefetch topic route when start and failed fast if topic not exist
let mut producer_option = ProducerOption::default();
producer_option.set_topics(vec!["delay_test"]);

// set which rocketmq proxy to connect
let mut client_option = ClientOption::default();
client_option.set_access_url("localhost:8081");

// build and start producer
let mut producer = Producer::new(producer_option, client_option).unwrap();
producer.start().await.unwrap();

// build message
let message = MessageBuilder::delay_message_builder(
"delay_test",
"hello world".as_bytes().to_vec(),
// deliver in 15 seconds
SystemTime::now()
.add(Duration::from_secs(15))
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as i64,
)
.build()
.unwrap();

// send message to rocketmq proxy
let result = producer.send(message).await;
debug_assert!(result.is_ok(), "send message failed: {:?}", result);
println!(
"send message success, message_id={}",
result.unwrap().message_id()
);
}
53 changes: 53 additions & 0 deletions rust/examples/fifo_producer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use rocketmq::conf::{ClientOption, ProducerOption};
use rocketmq::model::message::MessageBuilder;
use rocketmq::Producer;

#[tokio::main]
async fn main() {
// recommend to specify which topic(s) you would like to send message to
// producer will prefetch topic route when start and failed fast if topic not exist
let mut producer_option = ProducerOption::default();
producer_option.set_topics(vec!["fifo_test"]);

// set which rocketmq proxy to connect
let mut client_option = ClientOption::default();
client_option.set_access_url("localhost:8081");

// build and start producer
let mut producer = Producer::new(producer_option, client_option).unwrap();
producer.start().await.unwrap();

// build message
let message = MessageBuilder::fifo_message_builder(
"fifo_test",
"hello world".as_bytes().to_vec(),
// message partition
"message_group",
)
.build()
.unwrap();

// send message to rocketmq proxy
let result = producer.send(message).await;
debug_assert!(result.is_ok(), "send message failed: {:?}", result);
println!(
"send message success, message_id={}",
result.unwrap().message_id()
);
}
4 changes: 2 additions & 2 deletions rust/examples/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn main() {
client_option.set_access_url("localhost:8081");

// build and start producer
let producer = Producer::new(producer_option, client_option).unwrap();
let mut producer = Producer::new(producer_option, client_option).unwrap();
producer.start().await.unwrap();

// build message
Expand All @@ -42,7 +42,7 @@ async fn main() {
.unwrap();

// send message to rocketmq proxy
let result = producer.send_one(message).await;
let result = producer.send(message).await;
debug_assert!(result.is_ok(), "send message failed: {:?}", result);
println!(
"send message success, message_id={}",
Expand Down
2 changes: 1 addition & 1 deletion rust/examples/simple_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn main() {
client_option.set_enable_tls(false);

// build and start simple consumer
let consumer = SimpleConsumer::new(consumer_option, client_option).unwrap();
let mut consumer = SimpleConsumer::new(consumer_option, client_option).unwrap();
consumer.start().await.unwrap();

loop {
Expand Down
70 changes: 70 additions & 0 deletions rust/examples/transaction_producer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use rocketmq::conf::{ClientOption, ProducerOption};
use rocketmq::model::message::MessageBuilder;
use rocketmq::model::transaction::{Transaction, TransactionResolution};
use rocketmq::Producer;

#[tokio::main]
async fn main() {
// recommend to specify which topic(s) you would like to send message to
// producer will prefetch topic route when start and failed fast if topic not exist
let mut producer_option = ProducerOption::default();
producer_option.set_topics(vec!["transaction_test"]);

// set which rocketmq proxy to connect
let mut client_option = ClientOption::default();
client_option.set_access_url("localhost:8081");

// build and start producer
let mut producer = Producer::new_transaction_producer(
producer_option,
client_option,
Box::new(|transaction_id, message| {
println!(
"receive transaction check request: transaction_id: {}, message: {:?}",
transaction_id, message
);
TransactionResolution::COMMIT
}),
)
.unwrap();
producer.start().await.unwrap();

// build message
let message = MessageBuilder::transaction_message_builder(
"transaction_test",
"hello world".as_bytes().to_vec(),
)
.build()
.unwrap();

// send message to rocketmq proxy
let result = producer.send_transaction_message(message).await;
if let Err(error) = result {
eprintln!("send message failed: {:?}", error);
return;
}
let transaction = result.unwrap();
println!(
"send message success, message_id={}, transaction_id={}",
transaction.message_id(),
transaction.transaction_id()
);
let result = transaction.commit().await;
debug_assert!(result.is_ok(), "commit transaction failed: {:?}", result);
}
Loading

0 comments on commit 5aed664

Please sign in to comment.