Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(rust): add unit tests #484

Merged
merged 2 commits into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions rust/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ Here are some preparations you may need to know (or refer to [here](https://rock
2. protoc 3.15.0+
3. setup name server, broker, and [proxy](https://github.com/apache/rocketmq/tree/develop/proxy).

### Run Tests

```sh
cargo llvm-cov --ignore-filename-regex pb/ --open
```

### Run Example

Run the following command to start the example:
Expand Down
37 changes: 21 additions & 16 deletions rust/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,23 @@ lazy_static::lazy_static! {
static ref CLIENT_ID_SEQUENCE: AtomicUsize = AtomicUsize::new(0);
}

const OPERATION_CLIENT_NEW: &'static str = "client.new";
const OPERATION_QUERY_ROUTE: &'static str = "client.query_route";
const OPERATION_HEARTBEAT: &'static str = "client.heartbeat";
const OPERATION_SEND_MESSAGE: &'static str = "client.send_message";
const OPERATION_RECEIVE_MESSAGE: &'static str = "client.receive_message";
const OPERATION_ACK_MESSAGE: &'static str = "client.ack_message";

#[automock]
impl Client {
const OPERATION_CLIENT_NEW: &'static str = "client.new";
const OPERATION_QUERY_ROUTE: &'static str = "client.query_route";
const OPERATION_HEARTBEAT: &'static str = "client.heartbeat";
const OPERATION_SEND_MESSAGE: &'static str = "client.send_message";
const OPERATION_RECEIVE_MESSAGE: &'static str = "client.receive_message";
const OPERATION_ACK_MESSAGE: &'static str = "client.ack_message";

pub(crate) fn new(
logger: &Logger,
option: ClientOption,
settings: TelemetryCommand,
) -> Result<Self, ClientError> {
let id = Self::generate_client_id();
let endpoints = Endpoints::from_url(option.access_url())
.map_err(|e| e.with_operation(Self::OPERATION_CLIENT_NEW))?;
.map_err(|e| e.with_operation(OPERATION_CLIENT_NEW))?;
let session_manager = SessionManager::new(logger, id.clone(), &option);
Ok(Client {
logger: logger.new(o!("component" => "client")),
Expand Down Expand Up @@ -120,7 +120,7 @@ impl Client {
continue;
}
let result =
Self::handle_response_status(response.unwrap().status, Self::OPERATION_HEARTBEAT);
Self::handle_response_status(response.unwrap().status, OPERATION_HEARTBEAT);
if result.is_err() {
error!(
logger,
Expand All @@ -137,6 +137,7 @@ impl Client {
});
}

#[allow(dead_code)]
pub(crate) fn client_id(&self) -> &str {
&self.id
}
Expand Down Expand Up @@ -214,6 +215,7 @@ impl Client {
})
}

#[allow(dead_code)]
pub(crate) async fn topic_route(
&self,
topic: &str,
Expand Down Expand Up @@ -242,7 +244,7 @@ impl Client {
};

let response = rpc_client.query_route(request).await?;
Self::handle_response_status(response.status, Self::OPERATION_QUERY_ROUTE)?;
Self::handle_response_status(response.status, OPERATION_QUERY_ROUTE)?;

let route = Route {
index: AtomicUsize::new(0),
Expand Down Expand Up @@ -288,7 +290,7 @@ impl Client {
Err(_e) => Err(ClientError::new(
ErrorKind::ChannelReceive,
"wait for inflight query topic route request failed",
Self::OPERATION_QUERY_ROUTE,
OPERATION_QUERY_ROUTE,
)),
};
}
Expand Down Expand Up @@ -327,7 +329,7 @@ impl Client {
let _ = item.send(Err(ClientError::new(
ErrorKind::Server,
"query topic route failed",
Self::OPERATION_QUERY_ROUTE,
OPERATION_QUERY_ROUTE,
)));
}
};
Expand All @@ -352,6 +354,7 @@ impl Client {
Ok(response)
}

#[allow(dead_code)]
pub(crate) async fn send_message(
&self,
endpoints: &Endpoints,
Expand All @@ -372,7 +375,7 @@ impl Client {
let message_count = messages.len();
let request = SendMessageRequest { messages };
let response = rpc_client.send_message(request).await?;
Self::handle_response_status(response.status, Self::OPERATION_SEND_MESSAGE)?;
Self::handle_response_status(response.status, OPERATION_SEND_MESSAGE)?;

if response.entries.len() != message_count {
error!(self.logger, "server do not return illegal send result, this may be a bug. except result count: {}, found: {}", response.entries.len(), message_count);
Expand All @@ -381,6 +384,7 @@ impl Client {
Ok(response.entries)
}

#[allow(dead_code)]
pub(crate) async fn receive_message(
&self,
endpoints: &Endpoints,
Expand Down Expand Up @@ -427,7 +431,7 @@ impl Client {
for response in responses {
match response.content.unwrap() {
Content::Status(status) => {
Self::handle_response_status(Some(status), Self::OPERATION_RECEIVE_MESSAGE)?;
Self::handle_response_status(Some(status), OPERATION_RECEIVE_MESSAGE)?;
}
Content::Message(message) => {
messages.push(message);
Expand All @@ -438,6 +442,7 @@ impl Client {
Ok(messages)
}

#[allow(dead_code)]
pub(crate) async fn ack_message<T: AckMessageEntry + 'static>(
&self,
ack_entry: T,
Expand Down Expand Up @@ -475,7 +480,7 @@ impl Client {
entries,
};
let response = rpc_client.ack_message(request).await?;
Self::handle_response_status(response.status, Self::OPERATION_ACK_MESSAGE)?;
Self::handle_response_status(response.status, OPERATION_ACK_MESSAGE)?;
Ok(response.entries)
}
}
Expand Down Expand Up @@ -623,7 +628,7 @@ mod tests {
assert!(result.is_ok(), "should not return error when status is Ok");
}

fn new_topic_route_response() -> Result<QueryRouteResponse, ClientError> {
pub(crate) fn new_topic_route_response() -> Result<QueryRouteResponse, ClientError> {
Ok(QueryRouteResponse {
status: Some(Status {
code: Code::Ok as i32,
Expand Down
33 changes: 31 additions & 2 deletions rust/src/conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl Default for ClientOption {
group: "".to_string(),
namespace: "".to_string(),
access_url: "localhost:8081".to_string(),
enable_tls: false,
enable_tls: true,
timeout: Duration::from_secs(3),
long_polling_timeout: Duration::from_secs(40),
}
Expand Down Expand Up @@ -72,7 +72,7 @@ impl ClientOption {
}
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum LoggingFormat {
Terminal,
Json,
Expand Down Expand Up @@ -195,3 +195,32 @@ impl SimpleConsumerOption {
self.namespace = name_space.into();
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn conf_client_option() {
let option = ClientOption::default();
assert_eq!(option.access_url(), "localhost:8081");
assert!(option.enable_tls());
assert_eq!(option.timeout(), &Duration::from_secs(3));
assert_eq!(option.long_polling_timeout(), &Duration::from_secs(40));
}

#[test]
fn conf_producer_option() {
let option = ProducerOption::default();
assert_eq!(option.logging_format(), &LoggingFormat::Terminal);
assert!(option.prefetch_route());
assert!(option.validate_message_type());
}

#[test]
fn conf_simple_consumer_option() {
let option = SimpleConsumerOption::default();
assert_eq!(option.logging_format(), &LoggingFormat::Terminal);
assert!(option.prefetch_route());
}
}
19 changes: 19 additions & 0 deletions rust/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,22 @@ impl Debug for ClientError {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn error_client_error() {
let err = ClientError::new(ErrorKind::Config, "fake_message", "error_client_error")
.with_operation("another_operation")
.with_context("context_key", "context_value")
.set_source(anyhow::anyhow!("fake_source_error"));
assert_eq!(
err.to_string(),
"Failed to parse config at another_operation, context: { called: error_client_error, context_key: context_value } => fake_message, source: fake_source_error"
);
assert_eq!(format!("{:?}", err), "Failed to parse config at another_operation => fake_message\n\nContext:\n called: error_client_error\n context_key: context_value\n\nSource: fake_source_error\n");
assert_eq!(format!("{:#?}", err), "Error {\n kind: Config,\n message: \"fake_message\",\n operation: \"another_operation\",\n context: [\n (\n \"called\",\n \"error_client_error\",\n ),\n (\n \"context_key\",\n \"context_value\",\n ),\n ],\n source: Some(\n \"fake_source_error\",\n ),\n}");
}
}
2 changes: 0 additions & 2 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
#[allow(dead_code)]
pub mod conf;
#[allow(dead_code)]
mod error;
#[allow(dead_code)]
mod log;
Expand All @@ -28,7 +27,6 @@ mod client;
mod pb;
mod session;

#[allow(dead_code)]
pub mod model;
mod util;

Expand Down
1 change: 1 addition & 0 deletions rust/src/model/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::error::{ClientError, ErrorKind};
use crate::pb;
use crate::pb::{Address, AddressScheme, MessageQueue};

#[allow(dead_code)]
#[derive(Debug, Clone)]
pub(crate) enum ClientType {
Producer = 1,
Expand Down
Loading