Skip to content

Commit

Permalink
[ISSUE #526] feat(rust): support ak/sk authorization (#527)
Browse files Browse the repository at this point in the history
* feat(rust): support ak/sk authorization

Signed-off-by: SSpirits <[email protected]>

* feat(rust): change ak/sk type to option

Signed-off-by: SSpirits <[email protected]>

* Make code idiomatic

Signed-off-by: Li Zhanhui <[email protected]>

* feat(rust): fix license

Signed-off-by: SSpirits <[email protected]>

* feat(rust): optimize code

Signed-off-by: SSpirits <[email protected]>

* feat(rust): fix msrv test

Signed-off-by: SSpirits <[email protected]>

* fix(rust): fix msrv test

Signed-off-by: SSpirits <[email protected]>

---------

Signed-off-by: SSpirits <[email protected]>
Signed-off-by: Li Zhanhui <[email protected]>
Co-authored-by: Li Zhanhui <[email protected]>
  • Loading branch information
ShadowySpirits and lizhanhui committed Jun 6, 2023
1 parent e96712e commit fa8b4cb
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 72 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/rust_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
toolchain: ${{ matrix.msrv }}
- name: Check MSRV ${{ matrix.msrv }}
working-directory: ./rust
run: cp .cargo/Cargo.lock.min Cargo.lock && cargo fetch && cargo +${{ matrix.msrv }} check --locked --frozen
run: cp .cargo/Cargo.lock.min Cargo.lock && cargo +${{ matrix.msrv }} fetch && cargo +${{ matrix.msrv }} check --locked --frozen
build:
name: "${{ matrix.os }}"
runs-on: ${{ matrix.os }}
Expand Down
65 changes: 2 additions & 63 deletions rust/.cargo/Cargo.lock.min

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions rust/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
[registries.crates-io]
protocol = "sparse"
2 changes: 1 addition & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ prost-types = "0.11.8"
thiserror = "1.0"
anyhow = "1.0.70"
parking_lot = "0.12"
hmac = "0.12"
hostname = "0.3.1"
os_type = "2.6.0"

Expand All @@ -67,6 +66,7 @@ mockall = "0.11.4"
mockall_double= "0.3.0"

siphasher = "0.3.10"
ring = "0.16.20"

[build-dependencies]
tonic-build = "0.9.0"
Expand Down
5 changes: 0 additions & 5 deletions rust/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,15 +372,10 @@ impl Client {
mut rpc_client: T,
messages: Vec<Message>,
) -> Result<Vec<SendReceipt>, ClientError> {
let message_count = messages.len();
let request = SendMessageRequest { messages };
let response = rpc_client.send_message(request).await?;
Self::handle_response_status(response.status, OPERATION_SEND_MESSAGE)?;

if response.entries.len() != message_count {
error!(self.logger, "server do not return illegal send result, this may be a bug. except result count: {}, found: {}", response.entries.len(), message_count);
}

Ok(response
.entries
.iter()
Expand Down
25 changes: 24 additions & 1 deletion rust/src/conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

//! Configuration of RocketMQ rust client.

use std::time::Duration;

use crate::model::common::ClientType;
#[allow(unused_imports)]
use crate::producer::Producer;
#[allow(unused_imports)]
use crate::simple_consumer::SimpleConsumer;
use std::time::Duration;

/// [`ClientOption`] is the configuration of internal client, which manages the connection and request with RocketMQ proxy.
#[derive(Debug, Clone)]
Expand All @@ -34,6 +35,8 @@ pub struct ClientOption {
pub(crate) enable_tls: bool,
pub(crate) timeout: Duration,
pub(crate) long_polling_timeout: Duration,
pub(crate) access_key: Option<String>,
pub(crate) secret_key: Option<String>,
}

impl Default for ClientOption {
Expand All @@ -46,6 +49,8 @@ impl Default for ClientOption {
enable_tls: true,
timeout: Duration::from_secs(3),
long_polling_timeout: Duration::from_secs(40),
access_key: None,
secret_key: None,
}
}
}
Expand Down Expand Up @@ -88,6 +93,24 @@ impl ClientOption {
pub fn set_long_polling_timeout(&mut self, long_polling_timeout: Duration) {
self.long_polling_timeout = long_polling_timeout;
}

/// Get the access key
pub fn access_key(&self) -> Option<&String> {
self.access_key.as_ref()
}
/// Set the access key
pub fn set_access_key(&mut self, access_key: impl Into<String>) {
self.access_key = Some(access_key.into());
}

/// Get the secret key
pub fn secret_key(&self) -> Option<&String> {
self.secret_key.as_ref()
}
/// Set the secret key
pub fn set_secret_key(&mut self, secret_key: impl Into<String>) {
self.secret_key = Some(secret_key.into());
}
}

/// Log format for output.
Expand Down
38 changes: 37 additions & 1 deletion rust/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use std::collections::HashMap;

use async_trait::async_trait;
use mockall::automock;
use ring::hmac;
use slog::{debug, error, info, o, Logger};
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
use tokio::sync::{mpsc, Mutex};
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
Expand Down Expand Up @@ -217,6 +220,39 @@ impl Session {
"x-mq-protocol-version",
AsciiMetadataValue::from_static(PROTOCOL_VERSION),
);

let date_time_result = OffsetDateTime::now_local();
let date_time = if let Ok(result) = date_time_result {
result
} else {
OffsetDateTime::now_utc()
};

let date_time = date_time.format(&Rfc3339).unwrap();

metadata.insert(
"x-mq-date-time",
AsciiMetadataValue::try_from(&date_time).unwrap(),
);

if let Some((access_key, access_secret)) =
self.option.access_key().zip(self.option.secret_key())
{
let key = hmac::Key::new(
hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY,
access_secret.as_bytes(),
);
let signature = hmac::sign(&key, date_time.as_bytes());
let signature = hex::encode(signature.as_ref());
let authorization = format!(
"MQv2-HMAC-SHA1 Credential={}, SignedHeaders=x-mq-date-time, Signature={}",
access_key, signature
);
metadata.insert(
"authorization",
AsciiMetadataValue::try_from(authorization).unwrap(),
);
}
}

pub(crate) async fn start(&mut self, settings: TelemetryCommand) -> Result<(), ClientError> {
Expand Down Expand Up @@ -458,10 +494,10 @@ impl SessionManager {

#[cfg(test)]
mod tests {
use crate::conf::ProducerOption;
use slog::debug;
use wiremock_grpc::generate;

use crate::conf::ProducerOption;
use crate::log::terminal_logger;
use crate::util::build_producer_settings;

Expand Down

0 comments on commit fa8b4cb

Please sign in to comment.