From 79038e063681bffb19348c20a0cbbe427a048e91 Mon Sep 17 00:00:00 2001 From: Sergey Radionov Date: Tue, 19 Jan 2021 18:16:03 +0700 Subject: [PATCH] mqttevh: tls support implementation finished (#2517) * mqttevh: tls support implementation finished * mqttevh: MQTTASYNC_OPERATION_INCOMPLETE is not error * mqttevh: allow send messages while connecting is still in progress --- conf/janus.eventhandler.mqttevh.jcfg.sample | 6 ++- events/janus_mqttevh.c | 41 ++++++++++++++++----- 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/conf/janus.eventhandler.mqttevh.jcfg.sample b/conf/janus.eventhandler.mqttevh.jcfg.sample index 1dd93d40c7..748559bd06 100644 --- a/conf/janus.eventhandler.mqttevh.jcfg.sample +++ b/conf/janus.eventhandler.mqttevh.jcfg.sample @@ -15,7 +15,7 @@ general: { json = "indented" # Whether the JSON messages should be indented (default), # plain (no indentation) or compact (no indentation and no spaces) - url = "tcp://localhost:1883" # The URL of the MQTT server. Only tcp supported at this time. + url = "tcp://localhost:1883" # The URL of the MQTT server. "tcp://" and "ssl://" protocols are supported. #mqtt_version = "3.1.1" # Protocol version. Available values: 3.1, 3.1.1 (default), 5. client_id = "janus.example.com" # Janus client id. You have to configure a unique ID (default: guest). #keep_alive_interval = 20 # Keep connection for N seconds (default: 30) @@ -30,6 +30,8 @@ general: { #topic = "/janus/events" # Base topic (default: /janus/events) #addevent = true # Whether we should add the event type to the base topic + #tls_enable = false # Whether TLS support must be enabled + # Initial message sent to status topic #connect_status = "{\"event\": \"connected\", \"eventhandler\": \"janus.eventhandler.mqttevh\"}" # Message sent after disconnect or as LWT @@ -43,7 +45,7 @@ general: { #tls_verify_peer = true # Whether peer verification must be enabled #tls_verify_hostname = true # Whether hostname verification must be enabled - # Certificates to use when SSL support is enabled, if needed + # Certificates to use when TLS support is enabled, if needed #tls_cacert = "/path/to/cacert.pem" #tls_client_cert = "/path/to/cert.pem" #tls_client_key = "/path/to/key.pem" diff --git a/events/janus_mqttevh.c b/events/janus_mqttevh.c index 43d1805416..7be87a6eb7 100644 --- a/events/janus_mqttevh.c +++ b/events/janus_mqttevh.c @@ -379,6 +379,15 @@ static int janus_mqttevh_client_connect(janus_mqttevh_context *ctx) { options.keepAliveInterval = ctx->connect.keep_alive_interval; options.maxInflight = ctx->connect.max_inflight; + MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer; + if(ctx->tls.enable) { + ssl_opts.trustStore = ctx->tls.cacert_file; + ssl_opts.keyStore = ctx->tls.cert_file; + ssl_opts.privateKey = ctx->tls.key_file; + ssl_opts.enableServerCertAuth = ctx->tls.verify_peer; + options.ssl = &ssl_opts; + } + MQTTAsync_willOptions willOptions = MQTTAsync_willOptions_initializer; if(ctx->will.enabled) { willOptions.topicName = ctx->will.topic; @@ -535,10 +544,14 @@ static int janus_mqttevh_client_publish_message(janus_mqttevh_context *ctx, cons options.onFailure = janus_mqttevh_client_publish_message_failure; rc = MQTTAsync_sendMessage(ctx->client, topic, &msg, &options); - if(rc == MQTTASYNC_SUCCESS) { - JANUS_LOG(LOG_HUGE, "MQTT EVH message sent to topic %s on %s. Result %d\n", topic, ctx->connect.url, rc); - } else { - JANUS_LOG(LOG_WARN, "FAILURE: MQTT EVH message propably not sent to topic %s on %s. Result %d\n", topic, ctx->connect.url, rc); + switch(rc) { + case MQTTASYNC_SUCCESS: + JANUS_LOG(LOG_HUGE, "MQTT EVH message sent to topic %s on %s. Result %d\n", topic, ctx->connect.url, rc); + break; + case MQTTASYNC_OPERATION_INCOMPLETE: + break; + default: + JANUS_LOG(LOG_WARN, "FAILURE: MQTT EVH message propably not sent to topic %s on %s. Result %d\n", topic, ctx->connect.url, rc); } return rc; @@ -561,10 +574,14 @@ static int janus_mqttevh_client_publish_message5(janus_mqttevh_context *ctx, con options.onFailure5 = janus_mqttevh_client_publish_message_failure5; rc = MQTTAsync_sendMessage(ctx->client, topic, &msg, &options); - if(rc == MQTTASYNC_SUCCESS) { - JANUS_LOG(LOG_HUGE, "MQTT EVH message sent to topic %s on %s. Result %d\n", topic, ctx->connect.url, rc); - } else { - JANUS_LOG(LOG_WARN, "FAILURE: MQTT EVH message propably not sent to topic %s on %s. Result %d\n", topic, ctx->connect.url, rc); + switch(rc) { + case MQTTASYNC_SUCCESS: + JANUS_LOG(LOG_HUGE, "MQTT EVH message sent to topic %s on %s. Result %d\n", topic, ctx->connect.url, rc); + break; + case MQTTASYNC_OPERATION_INCOMPLETE: + break; + default: + JANUS_LOG(LOG_WARN, "FAILURE: MQTT EVH message propably not sent to topic %s on %s. Result %d\n", topic, ctx->connect.url, rc); } return rc; @@ -604,7 +621,12 @@ static void janus_mqttevh_client_publish_message_failure5(void *context, MQTTAsy static void janus_mqttevh_client_publish_message_failure_impl(void *context, int rc) { janus_mqttevh_context *ctx = (janus_mqttevh_context *)context; - JANUS_LOG(LOG_ERR, "MQTT EVH client has failed publishing to MQTT topic: %s, return code: %d\n", ctx->publish.topic, rc); + switch(rc) { + case MQTTASYNC_OPERATION_INCOMPLETE: + break; + default: + JANUS_LOG(LOG_ERR, "MQTT EVH client has failed publishing to MQTT topic: %s, return code: %d\n", ctx->publish.topic, rc); + } } /* Destroy Janus MQTT event handler session context */ @@ -983,6 +1005,7 @@ static int janus_mqttevh_init(const char *config_path) { create_options.maxBufferedMessages = ctx->connect.max_buffered; + create_options.sendWhileDisconnected = TRUE; res = MQTTAsync_createWithOptions( &ctx->client, ctx->connect.url,