From fa8b4cbbcb9d68be77863a4644bae8e888e5b1be Mon Sep 17 00:00:00 2001 From: SSpirits Date: Tue, 6 Jun 2023 13:14:13 +0800 Subject: [PATCH] [ISSUE #526] feat(rust): support ak/sk authorization (#527) * feat(rust): support ak/sk authorization Signed-off-by: SSpirits * feat(rust): change ak/sk type to option Signed-off-by: SSpirits * Make code idiomatic Signed-off-by: Li Zhanhui * feat(rust): fix license Signed-off-by: SSpirits * feat(rust): optimize code Signed-off-by: SSpirits * feat(rust): fix msrv test Signed-off-by: SSpirits * fix(rust): fix msrv test Signed-off-by: SSpirits --------- Signed-off-by: SSpirits Signed-off-by: Li Zhanhui Co-authored-by: Li Zhanhui --- .github/workflows/rust_build.yml | 2 +- rust/.cargo/Cargo.lock.min | 65 +------------------------------- rust/.cargo/config.toml | 18 +++++++++ rust/Cargo.toml | 2 +- rust/src/client.rs | 5 --- rust/src/conf.rs | 25 +++++++++++- rust/src/session.rs | 38 ++++++++++++++++++- 7 files changed, 83 insertions(+), 72 deletions(-) create mode 100644 rust/.cargo/config.toml diff --git a/.github/workflows/rust_build.yml b/.github/workflows/rust_build.yml index b7e43500b..6689dd702 100644 --- a/.github/workflows/rust_build.yml +++ b/.github/workflows/rust_build.yml @@ -65,7 +65,7 @@ jobs: toolchain: ${{ matrix.msrv }} - name: Check MSRV ${{ matrix.msrv }} working-directory: ./rust - run: cp .cargo/Cargo.lock.min Cargo.lock && cargo fetch && cargo +${{ matrix.msrv }} check --locked --frozen + run: cp .cargo/Cargo.lock.min Cargo.lock && cargo +${{ matrix.msrv }} fetch && cargo +${{ matrix.msrv }} check --locked --frozen build: name: "${{ matrix.os }}" runs-on: ${{ matrix.os }} diff --git a/rust/.cargo/Cargo.lock.min b/rust/.cargo/Cargo.lock.min index 9262a0806..3b2ccb864 100644 --- a/rust/.cargo/Cargo.lock.min +++ b/rust/.cargo/Cargo.lock.min @@ -136,15 +136,6 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" -[[package]] -name = "block-buffer" -version = "0.10.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" -dependencies = [ - "generic-array", -] - [[package]] name = "bumpalo" version = "3.12.0" @@ -210,16 +201,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "crypto-common" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" -dependencies = [ - "generic-array", - "typenum", -] - [[package]] name = "ctor" version = "0.1.26" @@ -249,17 +230,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" -[[package]] -name = "digest" -version = "0.10.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f" -dependencies = [ - "block-buffer", - "crypto-common", - "subtle", -] - [[package]] name = "dirs-next" version = "2.0.0" @@ -439,16 +409,6 @@ dependencies = [ "slab", ] -[[package]] -name = "generic-array" -version = "0.14.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" -dependencies = [ - "typenum", - "version_check", -] - [[package]] name = "getrandom" version = "0.2.9" @@ -521,15 +481,6 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" -[[package]] -name = "hmac" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" -dependencies = [ - "digest", -] - [[package]] name = "hostname" version = "0.3.1" @@ -1315,7 +1266,7 @@ dependencies = [ [[package]] name = "rocketmq" -version = "0.1.0" +version = "0.1.1" dependencies = [ "anyhow", "async-trait", @@ -1323,7 +1274,6 @@ dependencies = [ "byteorder", "futures", "hex", - "hmac", "hostname", "lazy_static", "mac_address", @@ -1338,6 +1288,7 @@ dependencies = [ "prost 0.11.9", "prost-types", "regex", + "ring", "siphasher", "slog", "slog-async", @@ -1578,12 +1529,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" -[[package]] -name = "subtle" -version = "2.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" - [[package]] name = "syn" version = "1.0.109" @@ -1974,12 +1919,6 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" -[[package]] -name = "typenum" -version = "1.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" - [[package]] name = "unicode-ident" version = "1.0.8" diff --git a/rust/.cargo/config.toml b/rust/.cargo/config.toml new file mode 100644 index 000000000..311a0281e --- /dev/null +++ b/rust/.cargo/config.toml @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +[registries.crates-io] +protocol = "sparse" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 80c06fe23..d7655d1ff 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -43,7 +43,6 @@ prost-types = "0.11.8" thiserror = "1.0" anyhow = "1.0.70" parking_lot = "0.12" -hmac = "0.12" hostname = "0.3.1" os_type = "2.6.0" @@ -67,6 +66,7 @@ mockall = "0.11.4" mockall_double= "0.3.0" siphasher = "0.3.10" +ring = "0.16.20" [build-dependencies] tonic-build = "0.9.0" diff --git a/rust/src/client.rs b/rust/src/client.rs index f13c3a68d..4b601facd 100644 --- a/rust/src/client.rs +++ b/rust/src/client.rs @@ -372,15 +372,10 @@ impl Client { mut rpc_client: T, messages: Vec, ) -> Result, ClientError> { - let message_count = messages.len(); let request = SendMessageRequest { messages }; let response = rpc_client.send_message(request).await?; Self::handle_response_status(response.status, OPERATION_SEND_MESSAGE)?; - if response.entries.len() != message_count { - error!(self.logger, "server do not return illegal send result, this may be a bug. except result count: {}, found: {}", response.entries.len(), message_count); - } - Ok(response .entries .iter() diff --git a/rust/src/conf.rs b/rust/src/conf.rs index 7ecb69179..95b7adcb0 100644 --- a/rust/src/conf.rs +++ b/rust/src/conf.rs @@ -17,12 +17,13 @@ //! Configuration of RocketMQ rust client. +use std::time::Duration; + use crate::model::common::ClientType; #[allow(unused_imports)] use crate::producer::Producer; #[allow(unused_imports)] use crate::simple_consumer::SimpleConsumer; -use std::time::Duration; /// [`ClientOption`] is the configuration of internal client, which manages the connection and request with RocketMQ proxy. #[derive(Debug, Clone)] @@ -34,6 +35,8 @@ pub struct ClientOption { pub(crate) enable_tls: bool, pub(crate) timeout: Duration, pub(crate) long_polling_timeout: Duration, + pub(crate) access_key: Option, + pub(crate) secret_key: Option, } impl Default for ClientOption { @@ -46,6 +49,8 @@ impl Default for ClientOption { enable_tls: true, timeout: Duration::from_secs(3), long_polling_timeout: Duration::from_secs(40), + access_key: None, + secret_key: None, } } } @@ -88,6 +93,24 @@ impl ClientOption { pub fn set_long_polling_timeout(&mut self, long_polling_timeout: Duration) { self.long_polling_timeout = long_polling_timeout; } + + /// Get the access key + pub fn access_key(&self) -> Option<&String> { + self.access_key.as_ref() + } + /// Set the access key + pub fn set_access_key(&mut self, access_key: impl Into) { + self.access_key = Some(access_key.into()); + } + + /// Get the secret key + pub fn secret_key(&self) -> Option<&String> { + self.secret_key.as_ref() + } + /// Set the secret key + pub fn set_secret_key(&mut self, secret_key: impl Into) { + self.secret_key = Some(secret_key.into()); + } } /// Log format for output. diff --git a/rust/src/session.rs b/rust/src/session.rs index 0660931f5..229d9e561 100644 --- a/rust/src/session.rs +++ b/rust/src/session.rs @@ -18,7 +18,10 @@ use std::collections::HashMap; use async_trait::async_trait; use mockall::automock; +use ring::hmac; use slog::{debug, error, info, o, Logger}; +use time::format_description::well_known::Rfc3339; +use time::OffsetDateTime; use tokio::sync::{mpsc, Mutex}; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; @@ -217,6 +220,39 @@ impl Session { "x-mq-protocol-version", AsciiMetadataValue::from_static(PROTOCOL_VERSION), ); + + let date_time_result = OffsetDateTime::now_local(); + let date_time = if let Ok(result) = date_time_result { + result + } else { + OffsetDateTime::now_utc() + }; + + let date_time = date_time.format(&Rfc3339).unwrap(); + + metadata.insert( + "x-mq-date-time", + AsciiMetadataValue::try_from(&date_time).unwrap(), + ); + + if let Some((access_key, access_secret)) = + self.option.access_key().zip(self.option.secret_key()) + { + let key = hmac::Key::new( + hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY, + access_secret.as_bytes(), + ); + let signature = hmac::sign(&key, date_time.as_bytes()); + let signature = hex::encode(signature.as_ref()); + let authorization = format!( + "MQv2-HMAC-SHA1 Credential={}, SignedHeaders=x-mq-date-time, Signature={}", + access_key, signature + ); + metadata.insert( + "authorization", + AsciiMetadataValue::try_from(authorization).unwrap(), + ); + } } pub(crate) async fn start(&mut self, settings: TelemetryCommand) -> Result<(), ClientError> { @@ -458,10 +494,10 @@ impl SessionManager { #[cfg(test)] mod tests { - use crate::conf::ProducerOption; use slog::debug; use wiremock_grpc::generate; + use crate::conf::ProducerOption; use crate::log::terminal_logger; use crate::util::build_producer_settings;