Skip to content

Commit

Permalink
mqttevh: tls support implementation finished (meetecho#2517)
Browse files Browse the repository at this point in the history
* mqttevh: tls support implementation finished
* mqttevh: MQTTASYNC_OPERATION_INCOMPLETE is not error
* mqttevh: allow send messages while connecting is still in progress
  • Loading branch information
RSATom committed Jan 19, 2021
1 parent 97cd054 commit 79038e0
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 11 deletions.
6 changes: 4 additions & 2 deletions conf/janus.eventhandler.mqttevh.jcfg.sample
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ general: {
json = "indented" # Whether the JSON messages should be indented (default),
# plain (no indentation) or compact (no indentation and no spaces)

url = "tcp://localhost:1883" # The URL of the MQTT server. Only tcp supported at this time.
url = "tcp://localhost:1883" # The URL of the MQTT server. "tcp://" and "ssl://" protocols are supported.
#mqtt_version = "3.1.1" # Protocol version. Available values: 3.1, 3.1.1 (default), 5.
client_id = "janus.example.com" # Janus client id. You have to configure a unique ID (default: guest).
#keep_alive_interval = 20 # Keep connection for N seconds (default: 30)
Expand All @@ -30,6 +30,8 @@ general: {
#topic = "/janus/events" # Base topic (default: /janus/events)
#addevent = true # Whether we should add the event type to the base topic

#tls_enable = false # Whether TLS support must be enabled

# Initial message sent to status topic
#connect_status = "{\"event\": \"connected\", \"eventhandler\": \"janus.eventhandler.mqttevh\"}"
# Message sent after disconnect or as LWT
Expand All @@ -43,7 +45,7 @@ general: {
#tls_verify_peer = true # Whether peer verification must be enabled
#tls_verify_hostname = true # Whether hostname verification must be enabled

# Certificates to use when SSL support is enabled, if needed
# Certificates to use when TLS support is enabled, if needed
#tls_cacert = "/path/to/cacert.pem"
#tls_client_cert = "/path/to/cert.pem"
#tls_client_key = "/path/to/key.pem"
Expand Down
41 changes: 32 additions & 9 deletions events/janus_mqttevh.c
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,15 @@ static int janus_mqttevh_client_connect(janus_mqttevh_context *ctx) {
options.keepAliveInterval = ctx->connect.keep_alive_interval;
options.maxInflight = ctx->connect.max_inflight;

MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
if(ctx->tls.enable) {
ssl_opts.trustStore = ctx->tls.cacert_file;
ssl_opts.keyStore = ctx->tls.cert_file;
ssl_opts.privateKey = ctx->tls.key_file;
ssl_opts.enableServerCertAuth = ctx->tls.verify_peer;
options.ssl = &ssl_opts;
}

MQTTAsync_willOptions willOptions = MQTTAsync_willOptions_initializer;
if(ctx->will.enabled) {
willOptions.topicName = ctx->will.topic;
Expand Down Expand Up @@ -535,10 +544,14 @@ static int janus_mqttevh_client_publish_message(janus_mqttevh_context *ctx, cons
options.onFailure = janus_mqttevh_client_publish_message_failure;

rc = MQTTAsync_sendMessage(ctx->client, topic, &msg, &options);
if(rc == MQTTASYNC_SUCCESS) {
JANUS_LOG(LOG_HUGE, "MQTT EVH message sent to topic %s on %s. Result %d\n", topic, ctx->connect.url, rc);
} else {
JANUS_LOG(LOG_WARN, "FAILURE: MQTT EVH message propably not sent to topic %s on %s. Result %d\n", topic, ctx->connect.url, rc);
switch(rc) {
case MQTTASYNC_SUCCESS:
JANUS_LOG(LOG_HUGE, "MQTT EVH message sent to topic %s on %s. Result %d\n", topic, ctx->connect.url, rc);
break;
case MQTTASYNC_OPERATION_INCOMPLETE:
break;
default:
JANUS_LOG(LOG_WARN, "FAILURE: MQTT EVH message propably not sent to topic %s on %s. Result %d\n", topic, ctx->connect.url, rc);
}

return rc;
Expand All @@ -561,10 +574,14 @@ static int janus_mqttevh_client_publish_message5(janus_mqttevh_context *ctx, con
options.onFailure5 = janus_mqttevh_client_publish_message_failure5;

rc = MQTTAsync_sendMessage(ctx->client, topic, &msg, &options);
if(rc == MQTTASYNC_SUCCESS) {
JANUS_LOG(LOG_HUGE, "MQTT EVH message sent to topic %s on %s. Result %d\n", topic, ctx->connect.url, rc);
} else {
JANUS_LOG(LOG_WARN, "FAILURE: MQTT EVH message propably not sent to topic %s on %s. Result %d\n", topic, ctx->connect.url, rc);
switch(rc) {
case MQTTASYNC_SUCCESS:
JANUS_LOG(LOG_HUGE, "MQTT EVH message sent to topic %s on %s. Result %d\n", topic, ctx->connect.url, rc);
break;
case MQTTASYNC_OPERATION_INCOMPLETE:
break;
default:
JANUS_LOG(LOG_WARN, "FAILURE: MQTT EVH message propably not sent to topic %s on %s. Result %d\n", topic, ctx->connect.url, rc);
}

return rc;
Expand Down Expand Up @@ -604,7 +621,12 @@ static void janus_mqttevh_client_publish_message_failure5(void *context, MQTTAsy

static void janus_mqttevh_client_publish_message_failure_impl(void *context, int rc) {
janus_mqttevh_context *ctx = (janus_mqttevh_context *)context;
JANUS_LOG(LOG_ERR, "MQTT EVH client has failed publishing to MQTT topic: %s, return code: %d\n", ctx->publish.topic, rc);
switch(rc) {
case MQTTASYNC_OPERATION_INCOMPLETE:
break;
default:
JANUS_LOG(LOG_ERR, "MQTT EVH client has failed publishing to MQTT topic: %s, return code: %d\n", ctx->publish.topic, rc);
}
}

/* Destroy Janus MQTT event handler session context */
Expand Down Expand Up @@ -983,6 +1005,7 @@ static int janus_mqttevh_init(const char *config_path) {

create_options.maxBufferedMessages = ctx->connect.max_buffered;

create_options.sendWhileDisconnected = TRUE;
res = MQTTAsync_createWithOptions(
&ctx->client,
ctx->connect.url,
Expand Down

0 comments on commit 79038e0

Please sign in to comment.