From 8b25896daa22effb6504c04fb411c883b6ab6db0 Mon Sep 17 00:00:00 2001 From: yulangz <53958801+yulangz@users.noreply.github.com> Date: Fri, 14 Jul 2023 10:55:23 +0800 Subject: [PATCH] Allow user to disable TLS in cpp. (#542) * Allow user to disable TLS in cpp. * rename to withSsl * Enable ssl switch for push consumer --------- Co-authored-by: htaowang Co-authored-by: Aaron Ai --- cpp/examples/ExampleProducer.cpp | 1 + cpp/examples/ExampleProducerWithAsync.cpp | 1 + cpp/examples/ExampleProducerWithFifoMessage.cpp | 1 + cpp/examples/ExampleProducerWithTimedMessage.cpp | 1 + cpp/examples/ExampleProducerWithTransactionalMessage.cpp | 1 + cpp/examples/ExamplePushConsumer.cpp | 1 + cpp/examples/ExampleSimpleConsumer.cpp | 1 + cpp/include/rocketmq/Configuration.h | 7 +++++++ cpp/source/base/Configuration.cpp | 5 +++++ cpp/source/client/ClientManagerImpl.cpp | 7 ++++--- cpp/source/client/include/ClientConfig.h | 1 + cpp/source/client/include/ClientManagerImpl.h | 3 ++- cpp/source/rocketmq/ClientImpl.cpp | 2 +- cpp/source/rocketmq/Producer.cpp | 1 + cpp/source/rocketmq/PushConsumer.cpp | 1 + cpp/source/rocketmq/SimpleConsumer.cpp | 1 + cpp/source/rocketmq/include/ClientImpl.h | 4 ++++ cpp/source/rocketmq/include/SimpleConsumerImpl.h | 4 ++++ 18 files changed, 38 insertions(+), 5 deletions(-) diff --git a/cpp/examples/ExampleProducer.cpp b/cpp/examples/ExampleProducer.cpp index 452b4ce18..ca5fc7d70 100644 --- a/cpp/examples/ExampleProducer.cpp +++ b/cpp/examples/ExampleProducer.cpp @@ -77,6 +77,7 @@ int main(int argc, char* argv[]) { .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) + .withSsl(true) .build()) .build(); diff --git a/cpp/examples/ExampleProducerWithAsync.cpp b/cpp/examples/ExampleProducerWithAsync.cpp index 102af2f4f..d8846b495 100644 --- a/cpp/examples/ExampleProducerWithAsync.cpp +++ b/cpp/examples/ExampleProducerWithAsync.cpp @@ -116,6 +116,7 @@ int main(int argc, char* argv[]) { .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) + .withSsl(true) .build()) .build(); diff --git a/cpp/examples/ExampleProducerWithFifoMessage.cpp b/cpp/examples/ExampleProducerWithFifoMessage.cpp index caede1893..e8a6f2096 100644 --- a/cpp/examples/ExampleProducerWithFifoMessage.cpp +++ b/cpp/examples/ExampleProducerWithFifoMessage.cpp @@ -74,6 +74,7 @@ int main(int argc, char* argv[]) { .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) + .withSsl(true) .build()) .build(); diff --git a/cpp/examples/ExampleProducerWithTimedMessage.cpp b/cpp/examples/ExampleProducerWithTimedMessage.cpp index 1379cd1b3..c46238525 100644 --- a/cpp/examples/ExampleProducerWithTimedMessage.cpp +++ b/cpp/examples/ExampleProducerWithTimedMessage.cpp @@ -75,6 +75,7 @@ int main(int argc, char* argv[]) { .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) + .withSsl(true) .build()) .build(); diff --git a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp index abff6e4f9..befb18ca7 100644 --- a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp +++ b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp @@ -79,6 +79,7 @@ int main(int argc, char* argv[]) { .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) + .withSsl(true) .build()) .withTopics({FLAGS_topic}) .withTransactionChecker(checker) diff --git a/cpp/examples/ExamplePushConsumer.cpp b/cpp/examples/ExamplePushConsumer.cpp index 2a3d3fe23..1e20b2eef 100644 --- a/cpp/examples/ExamplePushConsumer.cpp +++ b/cpp/examples/ExamplePushConsumer.cpp @@ -58,6 +58,7 @@ int main(int argc, char* argv[]) { .withEndpoints(FLAGS_access_point) .withRequestTimeout(std::chrono::seconds(3)) .withCredentialsProvider(credentials_provider) + .withSsl(true) .build()) .withConsumeThreads(4) .withListener(listener) diff --git a/cpp/examples/ExampleSimpleConsumer.cpp b/cpp/examples/ExampleSimpleConsumer.cpp index 696442820..4c30214fb 100644 --- a/cpp/examples/ExampleSimpleConsumer.cpp +++ b/cpp/examples/ExampleSimpleConsumer.cpp @@ -51,6 +51,7 @@ int main(int argc, char* argv[]) { .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) + .withSsl(true) .build()) .subscribe(FLAGS_topic, tag) .build(); diff --git a/cpp/include/rocketmq/Configuration.h b/cpp/include/rocketmq/Configuration.h index 90cdf7d27..0037c270d 100644 --- a/cpp/include/rocketmq/Configuration.h +++ b/cpp/include/rocketmq/Configuration.h @@ -43,6 +43,10 @@ class Configuration { return request_timeout_; } + bool withSsl() const { + return withSsl_; + } + protected: friend class ConfigurationBuilder; @@ -52,6 +56,7 @@ class Configuration { std::string endpoints_; CredentialsProviderPtr credentials_provider_; std::chrono::milliseconds request_timeout_{ConfigurationDefaults::RequestTimeout}; + bool withSsl_ = true; }; class ConfigurationBuilder { @@ -62,6 +67,8 @@ class ConfigurationBuilder { ConfigurationBuilder& withRequestTimeout(std::chrono::milliseconds request_timeout); + ConfigurationBuilder& withSsl(bool enable); + Configuration build(); private: diff --git a/cpp/source/base/Configuration.cpp b/cpp/source/base/Configuration.cpp index cf0f4bd3d..2a136d5d9 100644 --- a/cpp/source/base/Configuration.cpp +++ b/cpp/source/base/Configuration.cpp @@ -38,6 +38,11 @@ ConfigurationBuilder& ConfigurationBuilder::withRequestTimeout(std::chrono::mill return *this; } +ConfigurationBuilder& ConfigurationBuilder::withSsl(bool enable) { + configuration_.withSsl_ = enable; + return *this; +} + Configuration ConfigurationBuilder::build() { return std::move(configuration_); } diff --git a/cpp/source/client/ClientManagerImpl.cpp b/cpp/source/client/ClientManagerImpl.cpp index a39bb7386..5865dbb23 100644 --- a/cpp/source/client/ClientManagerImpl.cpp +++ b/cpp/source/client/ClientManagerImpl.cpp @@ -47,10 +47,11 @@ ROCKETMQ_NAMESPACE_BEGIN -ClientManagerImpl::ClientManagerImpl(std::string resource_namespace) +ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool withSsl) : scheduler_(std::make_shared()), resource_namespace_(std::move(resource_namespace)), state_(State::CREATED), - callback_thread_pool_(absl::make_unique(std::thread::hardware_concurrency())) { + callback_thread_pool_(absl::make_unique(std::thread::hardware_concurrency())), + withSsl_(withSsl){ certificate_verifier_ = grpc::experimental::ExternalCertificateVerifier::Create(); tls_channel_credential_options_.set_verify_server_certs(false); tls_channel_credential_options_.set_check_call_host(false); @@ -469,7 +470,7 @@ std::shared_ptr ClientManagerImpl::createChannel(const std::strin std::vector> interceptor_factories; interceptor_factories.emplace_back(absl::make_unique()); auto channel = grpc::experimental::CreateCustomChannelWithInterceptors( - target_host, channel_credential_, channel_arguments_, std::move(interceptor_factories)); + target_host, withSsl_ ? channel_credential_ : grpc::InsecureChannelCredentials(), channel_arguments_, std::move(interceptor_factories)); return channel; } diff --git a/cpp/source/client/include/ClientConfig.h b/cpp/source/client/include/ClientConfig.h index 58cd1fe7e..e0a7fbf61 100644 --- a/cpp/source/client/include/ClientConfig.h +++ b/cpp/source/client/include/ClientConfig.h @@ -60,6 +60,7 @@ struct ClientConfig { PublisherConfig publisher; SubscriberConfig subscriber; Metric metric; + bool withSsl; std::unique_ptr sampler_; }; diff --git a/cpp/source/client/include/ClientManagerImpl.h b/cpp/source/client/include/ClientManagerImpl.h index 0769769a1..653fcad35 100644 --- a/cpp/source/client/include/ClientManagerImpl.h +++ b/cpp/source/client/include/ClientManagerImpl.h @@ -54,7 +54,7 @@ class ClientManagerImpl : virtual public ClientManager, public std::enable_share * effectively. * @param resource_namespace Abstract resource namespace, in which this client manager lives. */ - explicit ClientManagerImpl(std::string resource_namespace); + explicit ClientManagerImpl(std::string resource_namespace, bool withSsl = true); ~ClientManagerImpl() override; @@ -242,6 +242,7 @@ class ClientManagerImpl : virtual public ClientManager, public std::enable_share grpc::ChannelArguments channel_arguments_; bool trace_{false}; + bool withSsl_; }; ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/rocketmq/ClientImpl.cpp b/cpp/source/rocketmq/ClientImpl.cpp index 155186147..e2f401e3a 100644 --- a/cpp/source/rocketmq/ClientImpl.cpp +++ b/cpp/source/rocketmq/ClientImpl.cpp @@ -113,7 +113,7 @@ void ClientImpl::start() { client_config_.client_id = clientId(); if (!client_manager_) { - client_manager_ = std::make_shared(client_config_.resource_namespace); + client_manager_ = std::make_shared(client_config_.resource_namespace, client_config_.withSsl); } client_manager_->start(); diff --git a/cpp/source/rocketmq/Producer.cpp b/cpp/source/rocketmq/Producer.cpp index 49b9f71c1..8620f681d 100644 --- a/cpp/source/rocketmq/Producer.cpp +++ b/cpp/source/rocketmq/Producer.cpp @@ -83,6 +83,7 @@ ProducerBuilder& ProducerBuilder::withConfiguration(Configuration configuration) impl_->withNameServerResolver(std::move(name_server_resolver)); impl_->withCredentialsProvider(configuration.credentialsProvider()); impl_->withRequestTimeout(configuration.requestTimeout()); + impl_->withSsl(configuration.withSsl()); return *this; } diff --git a/cpp/source/rocketmq/PushConsumer.cpp b/cpp/source/rocketmq/PushConsumer.cpp index 17ea8ca81..2b1c1566c 100644 --- a/cpp/source/rocketmq/PushConsumer.cpp +++ b/cpp/source/rocketmq/PushConsumer.cpp @@ -43,6 +43,7 @@ PushConsumer PushConsumerBuilder::build() { } impl->consumeThreadPoolSize(consume_thread_); impl->withNameServerResolver(std::make_shared(configuration_.endpoints())); + impl->withSsl(configuration_.withSsl()); impl->withCredentialsProvider(configuration_.credentialsProvider()); impl->withRequestTimeout(configuration_.requestTimeout()); impl->start(); diff --git a/cpp/source/rocketmq/SimpleConsumer.cpp b/cpp/source/rocketmq/SimpleConsumer.cpp index d7e94ae91..a48a0e494 100644 --- a/cpp/source/rocketmq/SimpleConsumer.cpp +++ b/cpp/source/rocketmq/SimpleConsumer.cpp @@ -130,6 +130,7 @@ SimpleConsumer SimpleConsumerBuilder::build() { simple_consumer.impl_->withNameServerResolver(std::make_shared(configuration_.endpoints())); simple_consumer.impl_->withCredentialsProvider(configuration_.credentialsProvider()); simple_consumer.impl_->withReceiveMessageTimeout(await_duration_); + simple_consumer.impl_->withSsl(configuration_.withSsl()); for (const auto& entry : subscriptions_) { simple_consumer.impl_->subscribe(entry.first, entry.second); diff --git a/cpp/source/rocketmq/include/ClientImpl.h b/cpp/source/rocketmq/include/ClientImpl.h index e472c5909..70dc5382e 100644 --- a/cpp/source/rocketmq/include/ClientImpl.h +++ b/cpp/source/rocketmq/include/ClientImpl.h @@ -94,6 +94,10 @@ class ClientImpl : virtual public Client { client_config_.request_timeout = absl::FromChrono(request_timeout); } + void withSsl(bool enable) { + client_config_.withSsl = enable; + } + /** * Expose for test purpose only. */ diff --git a/cpp/source/rocketmq/include/SimpleConsumerImpl.h b/cpp/source/rocketmq/include/SimpleConsumerImpl.h index 7ef3d8e34..a20cce562 100644 --- a/cpp/source/rocketmq/include/SimpleConsumerImpl.h +++ b/cpp/source/rocketmq/include/SimpleConsumerImpl.h @@ -61,6 +61,10 @@ class SimpleConsumerImpl : public ClientImpl, public std::enable_shared_from_thi long_polling_duration_ = receive_timeout; } + void withSsl(bool enable) { + client_config_.withSsl = enable; + } + protected: void topicsOfInterest(std::vector topics) override;