Skip to content

Commit

Permalink
doc(rust): add rust doc (#485)
Browse files Browse the repository at this point in the history
* doc(rust): add rust doc

* build(rust): set msrv to 1.61

* doc(rust): update doc following rust doc spec

* ci(rust): update cargo test arg
  • Loading branch information
ShadowySpirits committed Apr 20, 2023
1 parent f99a835 commit bc47512
Show file tree
Hide file tree
Showing 12 changed files with 344 additions and 54 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/rust_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ jobs:

- name: Unit Test
working-directory: ./rust
run: cargo test
run: cargo test -- --nocapture
17 changes: 14 additions & 3 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@
name = "rocketmq"
version = "0.1.0"
edition = "2021"
rust-version = "1.61"
authors = [
"SSpirits <[email protected]>",
"Zhanhui Li <[email protected]>",
]

license = "MIT/Apache-2.0"
readme = "./README.md"
repository = "https://github.com/apache/rocketmq-clients"
documentation = "https://docs.rs/rocketmq"
description = "Rust client for Apache RocketMQ"
keywords = ["rocketmq", "api", "client", "sdk", "grpc"]

[dependencies]
tokio = { version = "1", features = ["full"] }
Expand All @@ -42,16 +54,15 @@ slog-json = "2.6.1"

opentelemetry = { version = "0.19.0", features = ["metrics", "rt-tokio"] }
opentelemetry-otlp = { version = "0.12.0", features = ["metrics", "grpc-tonic"] }
minitrace = "0.4"

byteorder = "1"
mac_address = "1.1.4"
hex = "0.4.3"
time = "0.3.19"
time = "0.3"
once_cell = "1.9.0"
tokio-stream="0.1.12"

minitrace = "0.4.1"

mockall = "0.11.4"
mockall_double= "0.3.0"

Expand Down
4 changes: 2 additions & 2 deletions rust/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

Here is the rust implementation of the client for [Apache RocketMQ](https://rocketmq.apache.org/). Different from the [remoting-based client](https://github.com/apache/rocketmq/tree/develop/client), the current implementation is based on separating architecture for computing and storage, which is the more recommended way to access the RocketMQ service.

Here are some preparations you may need to know (or refer to [here](https://rocketmq.apache.org/docs/quickStart/02quickstart)).
Here are some preparations you may need to know [Quick Start](https://rocketmq.apache.org/docs/quickStart/02quickstart).

## Getting Started

### Requirements

1. rust and cargo
1. rust toolchain, rocketmq's MSRV is 1.61.
2. protoc 3.15.0+
3. setup name server, broker, and [proxy](https://github.com/apache/rocketmq/tree/develop/proxy).

Expand Down
6 changes: 3 additions & 3 deletions rust/examples/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/
use rocketmq::conf::{ClientOption, ProducerOption};
use rocketmq::model::message::MessageImpl;
use rocketmq::model::message::MessageBuilder;
use rocketmq::Producer;

#[tokio::main]
Expand All @@ -34,9 +34,9 @@ async fn main() {
producer.start().await.unwrap();

// build message
let message = MessageImpl::builder()
let message = MessageBuilder::builder()
.set_topic("test_topic")
.set_tags("test_tag")
.set_tag("test_tag")
.set_body("hello world".as_bytes().to_vec())
.build()
.unwrap();
Expand Down
8 changes: 7 additions & 1 deletion rust/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,8 @@ impl Client {
}

#[cfg(test)]
mod tests {
pub(crate) mod tests {
use lazy_static::lazy_static;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::sleep;
Expand All @@ -507,6 +508,11 @@ mod tests {

use super::*;

lazy_static! {
// The lock is used to prevent the mocking static function at same time during parallel testing.
pub(crate) static ref MTX: Mutex<()> = Mutex::new(());
}

fn new_client_for_test() -> Client {
Client {
logger: terminal_logger(),
Expand Down
39 changes: 39 additions & 0 deletions rust/src/conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

//! Configuration of RocketMQ rust client.

use crate::model::common::ClientType;
#[allow(unused_imports)]
use crate::producer::Producer;
#[allow(unused_imports)]
use crate::simple_consumer::SimpleConsumer;
use std::time::Duration;

/// [`ClientOption`] is the configuration of internal client, which manages the connection and request with RocketMQ proxy.
#[derive(Debug, Clone)]
pub struct ClientOption {
pub(crate) client_type: ClientType,
Expand All @@ -43,41 +51,55 @@ impl Default for ClientOption {
}

impl ClientOption {
/// Get the access url of RocketMQ proxy
pub fn access_url(&self) -> &str {
&self.access_url
}
/// Set the access url of RocketMQ proxy
pub fn set_access_url(&mut self, access_url: impl Into<String>) {
self.access_url = access_url.into();
}

/// Whether to enable tls
pub fn enable_tls(&self) -> bool {
self.enable_tls
}
/// Set whether to enable tls, default is true
pub fn set_enable_tls(&mut self, enable_tls: bool) {
self.enable_tls = enable_tls;
}

/// Get the timeout of connection and generic request
pub fn timeout(&self) -> &Duration {
&self.timeout
}
/// Set the timeout of connection and generic request, default is 3 seconds
pub fn set_timeout(&mut self, timeout: Duration) {
self.timeout = timeout;
}

/// Get the await duration during long polling
pub fn long_polling_timeout(&self) -> &Duration {
&self.long_polling_timeout
}
/// Set the await duration during long polling, default is 40 seconds
///
/// This option only affects receive requests, it means timeout for a receive request will be `long_polling_timeout` + `timeout`
pub fn set_long_polling_timeout(&mut self, long_polling_timeout: Duration) {
self.long_polling_timeout = long_polling_timeout;
}
}

/// Log format for output.
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum LoggingFormat {
/// Print log in terminal
Terminal,
/// Print log in json file
Json,
}

/// The configuration of [`Producer`].
#[derive(Debug, Clone)]
pub struct ProducerOption {
logging_format: LoggingFormat,
Expand All @@ -100,23 +122,29 @@ impl Default for ProducerOption {
}

impl ProducerOption {
/// Get the logging format of producer
pub fn logging_format(&self) -> &LoggingFormat {
&self.logging_format
}
/// Set the logging format for producer
pub fn set_logging_format(&mut self, logging_format: LoggingFormat) {
self.logging_format = logging_format;
}

/// Whether to prefetch route info
pub fn prefetch_route(&self) -> &bool {
&self.prefetch_route
}
/// Set whether to prefetch route info, default is true
pub fn set_prefetch_route(&mut self, prefetch_route: bool) {
self.prefetch_route = prefetch_route;
}

/// Get which topic(s) to messages to
pub fn topics(&self) -> &Option<Vec<String>> {
&self.topics
}
/// Set which topic(s) to messages to, it will prefetch route info for these topics when the producer starts
pub fn set_topics(&mut self, topics: Vec<impl Into<String>>) {
self.topics = Some(topics.into_iter().map(|t| t.into()).collect());
}
Expand All @@ -129,14 +157,17 @@ impl ProducerOption {
self.namespace = name_space.into();
}

/// Whether to validate message type
pub fn validate_message_type(&self) -> bool {
self.validate_message_type
}
/// Set whether to validate message type, default is true
pub fn set_validate_message_type(&mut self, validate_message_type: bool) {
self.validate_message_type = validate_message_type;
}
}

/// The configuration of [`SimpleConsumer`].
#[derive(Debug, Clone)]
pub struct SimpleConsumerOption {
logging_format: LoggingFormat,
Expand All @@ -159,30 +190,38 @@ impl Default for SimpleConsumerOption {
}

impl SimpleConsumerOption {
/// Set the logging format of simple consumer
pub fn logging_format(&self) -> &LoggingFormat {
&self.logging_format
}
/// set the logging format for simple consumer
pub fn set_logging_format(&mut self, logging_format: LoggingFormat) {
self.logging_format = logging_format;
}

/// Get the consumer group of simple consumer
pub fn consumer_group(&self) -> &str {
&self.consumer_group
}
/// Set the consumer group of simple consumer
pub fn set_consumer_group(&mut self, consumer_group: impl Into<String>) {
self.consumer_group = consumer_group.into();
}

/// Whether to prefetch route info
pub fn prefetch_route(&self) -> &bool {
&self.prefetch_route
}
/// Set whether to prefetch route info, default is true
pub fn set_prefetch_route(&mut self, prefetch_route: bool) {
self.prefetch_route = prefetch_route;
}

/// Set which topic(s) to receive messages
pub fn topics(&self) -> &Option<Vec<String>> {
&self.topics
}
/// Set which topic(s) to receive messages, it will prefetch route info for these topics when the simple consumer starts
pub fn set_topics(&mut self, topics: Vec<impl Into<String>>) {
self.topics = Some(topics.into_iter().map(|t| t.into()).collect());
}
Expand Down
105 changes: 105 additions & 0 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,111 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

//! # The Rust Implementation of Apache RocketMQ Client
//!
//! Here is the official rust client for [Apache RocketMQ](https://rocketmq.apache.org/)
//! providing async/await API powered by tokio runtime.
//!
//! Different from the [remoting-based client](https://github.com/apache/rocketmq/tree/develop/client),
//! the current implementation is based on separating architecture for computing and storage,
//! which is the more recommended way to access the RocketMQ service.
//!
//! Here are some preparations you may need to know: [RocketMQ Quick Start](https://rocketmq.apache.org/docs/quickStart/02quickstart).
//!
//! ## Examples
//!
//! Basic usage:
//!
//! ### Producer
//! ```rust,no_run
//! use rocketmq::conf::{ClientOption, ProducerOption};
//! use rocketmq::model::message::MessageBuilder;
//! use rocketmq::Producer;
//!
//! #[tokio::main]
//! async fn main() {
//! // recommend to specify which topic(s) you would like to send message to
//! // producer will prefetch topic route when start and failed fast if topic not exist
//! let mut producer_option = ProducerOption::default();
//! producer_option.set_topics(vec!["test_topic"]);
//!
//! // set which rocketmq proxy to connect
//! let mut client_option = ClientOption::default();
//! client_option.set_access_url("localhost:8081");
//!
//! // build and start producer
//! let producer = Producer::new(producer_option, client_option).unwrap();
//! producer.start().await.unwrap();
//!
//! // build message
//! let message = MessageBuilder::builder()
//! .set_topic("test_topic")
//! .set_tag("test_tag")
//! .set_body("hello world".as_bytes().to_vec())
//! .build()
//! .unwrap();
//!
//! // send message to rocketmq proxy
//! let result = producer.send_one(message).await;
//! debug_assert!(result.is_ok(), "send message failed: {:?}", result);
//! println!(
//! "send message success, message_id={}",
//! result.unwrap().message_id
//! );
//! }
//! ```
//!
//! ### Simple Consumer
//! ```rust,no_run
//! use rocketmq::conf::{ClientOption, SimpleConsumerOption};
//! use rocketmq::model::common::{FilterExpression, FilterType};
//! use rocketmq::SimpleConsumer;
//!
//! #[tokio::main]
//! async fn main() {
//! // recommend to specify which topic(s) you would like to send message to
//! // simple consumer will prefetch topic route when start and failed fast if topic not exist
//! let mut consumer_option = SimpleConsumerOption::default();
//! consumer_option.set_topics(vec!["test_topic"]);
//! consumer_option.set_consumer_group("SimpleConsumerGroup");
//!
//! // set which rocketmq proxy to connect
//! let mut client_option = ClientOption::default();
//! client_option.set_access_url("localhost:8081");
//!
//! // build and start simple consumer
//! let consumer = SimpleConsumer::new(consumer_option, client_option).unwrap();
//! consumer.start().await.unwrap();
//!
//! // pop message from rocketmq proxy
//! let receive_result = consumer
//! .receive(
//! "test_topic".to_string(),
//! &FilterExpression::new(FilterType::Tag, "test_tag"),
//! )
//! .await;
//! debug_assert!(
//! receive_result.is_ok(),
//! "receive message failed: {:?}",
//! receive_result.unwrap_err()
//! );
//!
//! let messages = receive_result.unwrap();
//! for message in messages {
//! println!("receive message: {:?}", message);
//! // ack message to rocketmq proxy
//! let ack_result = consumer.ack(message).await;
//! debug_assert!(
//! ack_result.is_ok(),
//! "ack message failed: {:?}",
//! ack_result.unwrap_err()
//! );
//! }
//! }
//! ```
//!

#[allow(dead_code)]
pub mod conf;
mod error;
Expand Down
Loading

0 comments on commit bc47512

Please sign in to comment.