Skip to content

Commit

Permalink
test(rust): add unit tests (#484)
Browse files Browse the repository at this point in the history
* test(rust): add unit tests

* doc(rust): fix README.md of rust sdk
  • Loading branch information
ShadowySpirits committed Apr 18, 2023
1 parent 6fe0f61 commit f99a835
Show file tree
Hide file tree
Showing 11 changed files with 470 additions and 58 deletions.
6 changes: 6 additions & 0 deletions rust/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ Here are some preparations you may need to know (or refer to [here](https://rock
2. protoc 3.15.0+
3. setup name server, broker, and [proxy](https://github.com/apache/rocketmq/tree/develop/proxy).

### Run Tests

```sh
cargo llvm-cov --ignore-filename-regex pb/ --open
```

### Run Example

Run the following command to start the example:
Expand Down
37 changes: 21 additions & 16 deletions rust/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,23 @@ lazy_static::lazy_static! {
static ref CLIENT_ID_SEQUENCE: AtomicUsize = AtomicUsize::new(0);
}

const OPERATION_CLIENT_NEW: &'static str = "client.new";
const OPERATION_QUERY_ROUTE: &'static str = "client.query_route";
const OPERATION_HEARTBEAT: &'static str = "client.heartbeat";
const OPERATION_SEND_MESSAGE: &'static str = "client.send_message";
const OPERATION_RECEIVE_MESSAGE: &'static str = "client.receive_message";
const OPERATION_ACK_MESSAGE: &'static str = "client.ack_message";

#[automock]
impl Client {
const OPERATION_CLIENT_NEW: &'static str = "client.new";
const OPERATION_QUERY_ROUTE: &'static str = "client.query_route";
const OPERATION_HEARTBEAT: &'static str = "client.heartbeat";
const OPERATION_SEND_MESSAGE: &'static str = "client.send_message";
const OPERATION_RECEIVE_MESSAGE: &'static str = "client.receive_message";
const OPERATION_ACK_MESSAGE: &'static str = "client.ack_message";

pub(crate) fn new(
logger: &Logger,
option: ClientOption,
settings: TelemetryCommand,
) -> Result<Self, ClientError> {
let id = Self::generate_client_id();
let endpoints = Endpoints::from_url(option.access_url())
.map_err(|e| e.with_operation(Self::OPERATION_CLIENT_NEW))?;
.map_err(|e| e.with_operation(OPERATION_CLIENT_NEW))?;
let session_manager = SessionManager::new(logger, id.clone(), &option);
Ok(Client {
logger: logger.new(o!("component" => "client")),
Expand Down Expand Up @@ -120,7 +120,7 @@ impl Client {
continue;
}
let result =
Self::handle_response_status(response.unwrap().status, Self::OPERATION_HEARTBEAT);
Self::handle_response_status(response.unwrap().status, OPERATION_HEARTBEAT);
if result.is_err() {
error!(
logger,
Expand All @@ -137,6 +137,7 @@ impl Client {
});
}

#[allow(dead_code)]
pub(crate) fn client_id(&self) -> &str {
&self.id
}
Expand Down Expand Up @@ -214,6 +215,7 @@ impl Client {
})
}

#[allow(dead_code)]
pub(crate) async fn topic_route(
&self,
topic: &str,
Expand Down Expand Up @@ -242,7 +244,7 @@ impl Client {
};

let response = rpc_client.query_route(request).await?;
Self::handle_response_status(response.status, Self::OPERATION_QUERY_ROUTE)?;
Self::handle_response_status(response.status, OPERATION_QUERY_ROUTE)?;

let route = Route {
index: AtomicUsize::new(0),
Expand Down Expand Up @@ -288,7 +290,7 @@ impl Client {
Err(_e) => Err(ClientError::new(
ErrorKind::ChannelReceive,
"wait for inflight query topic route request failed",
Self::OPERATION_QUERY_ROUTE,
OPERATION_QUERY_ROUTE,
)),
};
}
Expand Down Expand Up @@ -327,7 +329,7 @@ impl Client {
let _ = item.send(Err(ClientError::new(
ErrorKind::Server,
"query topic route failed",
Self::OPERATION_QUERY_ROUTE,
OPERATION_QUERY_ROUTE,
)));
}
};
Expand All @@ -352,6 +354,7 @@ impl Client {
Ok(response)
}

#[allow(dead_code)]
pub(crate) async fn send_message(
&self,
endpoints: &Endpoints,
Expand All @@ -372,7 +375,7 @@ impl Client {
let message_count = messages.len();
let request = SendMessageRequest { messages };
let response = rpc_client.send_message(request).await?;
Self::handle_response_status(response.status, Self::OPERATION_SEND_MESSAGE)?;
Self::handle_response_status(response.status, OPERATION_SEND_MESSAGE)?;

if response.entries.len() != message_count {
error!(self.logger, "server do not return illegal send result, this may be a bug. except result count: {}, found: {}", response.entries.len(), message_count);
Expand All @@ -381,6 +384,7 @@ impl Client {
Ok(response.entries)
}

#[allow(dead_code)]
pub(crate) async fn receive_message(
&self,
endpoints: &Endpoints,
Expand Down Expand Up @@ -427,7 +431,7 @@ impl Client {
for response in responses {
match response.content.unwrap() {
Content::Status(status) => {
Self::handle_response_status(Some(status), Self::OPERATION_RECEIVE_MESSAGE)?;
Self::handle_response_status(Some(status), OPERATION_RECEIVE_MESSAGE)?;
}
Content::Message(message) => {
messages.push(message);
Expand All @@ -438,6 +442,7 @@ impl Client {
Ok(messages)
}

#[allow(dead_code)]
pub(crate) async fn ack_message<T: AckMessageEntry + 'static>(
&self,
ack_entry: T,
Expand Down Expand Up @@ -475,7 +480,7 @@ impl Client {
entries,
};
let response = rpc_client.ack_message(request).await?;
Self::handle_response_status(response.status, Self::OPERATION_ACK_MESSAGE)?;
Self::handle_response_status(response.status, OPERATION_ACK_MESSAGE)?;
Ok(response.entries)
}
}
Expand Down Expand Up @@ -623,7 +628,7 @@ mod tests {
assert!(result.is_ok(), "should not return error when status is Ok");
}

fn new_topic_route_response() -> Result<QueryRouteResponse, ClientError> {
pub(crate) fn new_topic_route_response() -> Result<QueryRouteResponse, ClientError> {
Ok(QueryRouteResponse {
status: Some(Status {
code: Code::Ok as i32,
Expand Down
33 changes: 31 additions & 2 deletions rust/src/conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl Default for ClientOption {
group: "".to_string(),
namespace: "".to_string(),
access_url: "localhost:8081".to_string(),
enable_tls: false,
enable_tls: true,
timeout: Duration::from_secs(3),
long_polling_timeout: Duration::from_secs(40),
}
Expand Down Expand Up @@ -72,7 +72,7 @@ impl ClientOption {
}
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum LoggingFormat {
Terminal,
Json,
Expand Down Expand Up @@ -195,3 +195,32 @@ impl SimpleConsumerOption {
self.namespace = name_space.into();
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn conf_client_option() {
let option = ClientOption::default();
assert_eq!(option.access_url(), "localhost:8081");
assert!(option.enable_tls());
assert_eq!(option.timeout(), &Duration::from_secs(3));
assert_eq!(option.long_polling_timeout(), &Duration::from_secs(40));
}

#[test]
fn conf_producer_option() {
let option = ProducerOption::default();
assert_eq!(option.logging_format(), &LoggingFormat::Terminal);
assert!(option.prefetch_route());
assert!(option.validate_message_type());
}

#[test]
fn conf_simple_consumer_option() {
let option = SimpleConsumerOption::default();
assert_eq!(option.logging_format(), &LoggingFormat::Terminal);
assert!(option.prefetch_route());
}
}
19 changes: 19 additions & 0 deletions rust/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,22 @@ impl Debug for ClientError {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn error_client_error() {
let err = ClientError::new(ErrorKind::Config, "fake_message", "error_client_error")
.with_operation("another_operation")
.with_context("context_key", "context_value")
.set_source(anyhow::anyhow!("fake_source_error"));
assert_eq!(
err.to_string(),
"Failed to parse config at another_operation, context: { called: error_client_error, context_key: context_value } => fake_message, source: fake_source_error"
);
assert_eq!(format!("{:?}", err), "Failed to parse config at another_operation => fake_message\n\nContext:\n called: error_client_error\n context_key: context_value\n\nSource: fake_source_error\n");
assert_eq!(format!("{:#?}", err), "Error {\n kind: Config,\n message: \"fake_message\",\n operation: \"another_operation\",\n context: [\n (\n \"called\",\n \"error_client_error\",\n ),\n (\n \"context_key\",\n \"context_value\",\n ),\n ],\n source: Some(\n \"fake_source_error\",\n ),\n}");
}
}
2 changes: 0 additions & 2 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
#[allow(dead_code)]
pub mod conf;
#[allow(dead_code)]
mod error;
#[allow(dead_code)]
mod log;
Expand All @@ -28,7 +27,6 @@ mod client;
mod pb;
mod session;

#[allow(dead_code)]
pub mod model;
mod util;

Expand Down
1 change: 1 addition & 0 deletions rust/src/model/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::error::{ClientError, ErrorKind};
use crate::pb;
use crate::pb::{Address, AddressScheme, MessageQueue};

#[allow(dead_code)]
#[derive(Debug, Clone)]
pub(crate) enum ClientType {
Producer = 1,
Expand Down
Loading

0 comments on commit f99a835

Please sign in to comment.