From 0671908c2b0fc545c644744af616ce714959da04 Mon Sep 17 00:00:00 2001 From: Timofey Martynov Date: Wed, 25 Dec 2019 12:23:36 +0300 Subject: [PATCH 1/5] Add MQTT 5 properties support --- conf/janus.transport.mqtt.jcfg.sample | 4 + transports/janus_mqtt.c | 419 +++++++++++++++++++++++--- 2 files changed, 389 insertions(+), 34 deletions(-) diff --git a/conf/janus.transport.mqtt.jcfg.sample b/conf/janus.transport.mqtt.jcfg.sample index a072657df6..9418043d52 100644 --- a/conf/janus.transport.mqtt.jcfg.sample +++ b/conf/janus.transport.mqtt.jcfg.sample @@ -28,6 +28,10 @@ general: { #certfile = /path/to/cert.pem #keyfile = /path/to/key.pem + # These options work with MQTT 5 only. + #vacuum_interval = 60 # Interval for removing old transcation states in seconds. + #proxy_transaction_user_properties = [] # Array of user property names to copy from the incoming message. + #add_transaction_user_properties = () # List of user property ["key", "value"] pairs to add. } admin: { diff --git a/transports/janus_mqtt.c b/transports/janus_mqtt.c index 6f49b6fc85..2dd00e03a1 100644 --- a/transports/janus_mqtt.c +++ b/transports/janus_mqtt.c @@ -137,6 +137,10 @@ typedef struct janus_mqtt_context { char *topic; int qos; gboolean retain; + #ifdef MQTTVERSION_5 + GArray *proxy_transaction_user_properties; + GArray *add_transaction_user_properties; + #endif } publish; struct { struct { @@ -154,8 +158,24 @@ typedef struct janus_mqtt_context { char *cert_file; char *key_file; gboolean verify_peer; +#ifdef MQTTVERSION_5 + int vacuum_interval; +#endif } janus_mqtt_context; +#ifdef MQTTVERSION_5 +/* MQTT 5 specific types */ +typedef struct janus_mqtt_transaction_state { + MQTTProperties *properties; + time_t created_at; +} janus_mqtt_transaction_state; + +typedef struct janus_mqtt_set_add_transaction_user_property_user_data { + GArray *acc; + janus_config *config; +} janus_mqtt_set_add_transaction_user_property_user_data; +#endif + /* Transport client methods */ void janus_mqtt_client_connected(void *context, char *cause); void janus_mqtt_client_disconnected(void *context, MQTTProperties *properties, enum MQTTReasonCodes reasonCode); @@ -165,7 +185,6 @@ int janus_mqtt_client_connect(janus_mqtt_context *ctx); int janus_mqtt_client_reconnect(janus_mqtt_context *ctx); int janus_mqtt_client_disconnect(janus_mqtt_context *ctx); int janus_mqtt_client_subscribe(janus_mqtt_context *ctx, gboolean admin); -int janus_mqtt_client_publish_message(janus_mqtt_context *ctx, char *payload, gboolean admin); int janus_mqtt_client_publish_status_message(janus_mqtt_context *ctx, char *payload); void janus_mqtt_client_destroy_context(janus_mqtt_context **ctx); /* MQTT v3.x interface callbacks */ @@ -184,6 +203,7 @@ void janus_mqtt_client_publish_admin_success(void *context, MQTTAsync_successDat void janus_mqtt_client_publish_admin_failure(void *context, MQTTAsync_failureData *response); void janus_mqtt_client_publish_status_success(void *context, MQTTAsync_successData *response); void janus_mqtt_client_publish_status_failure(void *context, MQTTAsync_failureData *response); +int janus_mqtt_client_publish_message(janus_mqtt_context *ctx, char *payload, gboolean admin); int janus_mqtt_client_get_response_code(MQTTAsync_failureData *response); #ifdef MQTTVERSION_5 /* MQTT v5 interface callbacks */ @@ -202,6 +222,7 @@ void janus_mqtt_client_publish_admin_success5(void *context, MQTTAsync_successDa void janus_mqtt_client_publish_admin_failure5(void *context, MQTTAsync_failureData5 *response); void janus_mqtt_client_publish_status_success5(void *context, MQTTAsync_successData5 *response); void janus_mqtt_client_publish_status_failure5(void *context, MQTTAsync_failureData5 *response); +int janus_mqtt_client_publish_message5(janus_mqtt_context *ctx, char *payload, gboolean admin, MQTTProperties *properties, char *custom_topic); int janus_mqtt_client_get_response_code5(MQTTAsync_failureData5 *response); #endif /* MQTT version independent callback implementations */ @@ -225,6 +246,23 @@ void janus_mqtt_client_publish_status_failure_impl(void *context, int rc); static janus_mqtt_context *context_ = NULL; static janus_transport_session *mqtt_session = NULL; +#ifdef MQTTVERSION_5 +/* MQTT 5 specific statics and functions */ +static GHashTable *janus_mqtt_transaction_states; +static GRWLock janus_mqtt_transaction_states_lock; +static GMainContext *vacuum_context = NULL; +static GMainLoop *vacuum_loop = NULL; +static GThread *vacuum_thread = NULL; +static gpointer janus_mqtt_vacuum_thread(gpointer context); +static gboolean janus_mqtt_vacuum(gpointer context); +void janus_mqtt_transaction_state_free(gpointer ptr); +char *janus_mqtt_get_response_topic(janus_mqtt_transaction_state *state); +void janus_mqtt_proxy_properties(janus_mqtt_transaction_state *state, GArray *user_property_names, MQTTProperties *properties); +void janus_mqtt_add_properties(janus_mqtt_transaction_state *state, GArray *user_properties, MQTTProperties *properties); +void janus_mqtt_set_proxy_transaction_user_property(gpointer item_ptr, gpointer acc_ptr); +void janus_mqtt_set_add_transaction_user_property(gpointer item_ptr, gpointer user_data_ptr); +#endif + int janus_mqtt_init(janus_transport_callbacks *callback, const char *config_path) { if(callback == NULL || config_path == NULL) { /* Invalid arguments */ @@ -427,6 +465,50 @@ int janus_mqtt_init(janus_transport_callbacks *callback, const char *config_path JANUS_LOG(LOG_ERR, "Invalid publish-qos value: %s (falling back to default)\n", qos_item->value); ctx->publish.qos = 1; } + + #ifdef MQTTVERSION_5 + if (ctx->connect.mqtt_version == MQTTVERSION_5) { + /* MQTT 5 specific configuration */ + janus_config_array *proxy_transaction_user_properties_array = janus_config_get(config, config_general, janus_config_type_array, "proxy_transaction_user_properties"); + if(proxy_transaction_user_properties_array) { + GList *proxy_transaction_user_properties_array_items = janus_config_get_items(config, proxy_transaction_user_properties_array); + if(proxy_transaction_user_properties_array_items != NULL) { + int proxy_transaction_user_properties_array_len = g_list_length(proxy_transaction_user_properties_array_items); + if(proxy_transaction_user_properties_array_len > 0) { + ctx->publish.proxy_transaction_user_properties = g_array_sized_new(FALSE, FALSE, sizeof(char*), proxy_transaction_user_properties_array_len); + + g_list_foreach( + proxy_transaction_user_properties_array_items, + (GFunc)janus_mqtt_set_proxy_transaction_user_property, + (gpointer)ctx->publish.proxy_transaction_user_properties + ); + } + } + } + + janus_config_array *add_transaction_user_properties_array = janus_config_get(config, config_general, janus_config_type_array, "add_transaction_user_properties"); + if(add_transaction_user_properties_array) { + GList *add_transaction_user_properties_array_items = janus_config_get_arrays(config, add_transaction_user_properties_array); + if(add_transaction_user_properties_array_items != NULL) { + int add_transaction_user_properties_array_len = g_list_length(add_transaction_user_properties_array_items); + if(add_transaction_user_properties_array_len > 0) { + ctx->publish.add_transaction_user_properties = g_array_sized_new(FALSE, FALSE, sizeof(MQTTProperty), add_transaction_user_properties_array_len); + + janus_mqtt_set_add_transaction_user_property_user_data user_data = { + ctx->publish.add_transaction_user_properties, + config + }; + + g_list_foreach( + add_transaction_user_properties_array_items, + (GFunc)janus_mqtt_set_add_transaction_user_property, + (gpointer)&user_data + ); + } + } + } + } + #endif } } else { janus_mqtt_api_enabled_ = FALSE; @@ -536,6 +618,32 @@ int janus_mqtt_init(janus_transport_callbacks *callback, const char *config_path goto error; } +#ifdef MQTTVERSION_5 + if (ctx->connect.mqtt_version == MQTTVERSION_5) { + /* Initialize transaction states hash table and its lock */ + janus_mqtt_transaction_states = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, janus_mqtt_transaction_state_free); + g_rw_lock_init(&janus_mqtt_transaction_states_lock); + + /* Getting vacuum interval from config */ + janus_config_item *vacuum_interval_item = janus_config_get(config, config_general, janus_config_type_item, "vacuum_interval"); + ctx->vacuum_interval = (vacuum_interval_item && vacuum_interval_item->value) ? atoi(vacuum_interval_item->value) : 60; + if(ctx->vacuum_interval <= 0) { + JANUS_LOG(LOG_ERR, "Invalid vacuum interval value: %s (falling back to default)\n", vacuum_interval_item->value); + ctx->vacuum_interval = 60; + } + + /* Start vacuum thread */ + vacuum_context = g_main_context_new(); + vacuum_loop = g_main_loop_new(vacuum_context, FALSE); + GError *terror = NULL; + vacuum_thread = g_thread_try_new("mqtt vacuum", &janus_mqtt_vacuum_thread, ctx, &terror); + if(terror != NULL) { + JANUS_LOG(LOG_ERR, "Failed to spawn MQTT transport vacuum thread (%d): %s\n", terror->code, terror->message ? terror->message : "??"); + goto error; + } + } +#endif + /* Creating a client */ MQTTAsync_createOptions create_options = MQTTAsync_createOptions_initializer; @@ -590,6 +698,12 @@ int janus_mqtt_init(janus_transport_callbacks *callback, const char *config_path error: /* If we got here, something went wrong */ +#ifdef MQTTVERSION_5 + if(vacuum_loop != NULL) + g_main_loop_unref(vacuum_loop); + if(vacuum_context != NULL) + g_main_context_unref(vacuum_context); +#endif janus_transport_session_destroy(mqtt_session); janus_mqtt_client_destroy_context(&ctx); g_free((char *)url); @@ -604,7 +718,55 @@ void janus_mqtt_destroy(void) { janus_transport_session_destroy(mqtt_session); janus_mqtt_client_disconnect(context_); + +#ifdef MQTTVERSION_5 + if(vacuum_thread != NULL) { + if(g_main_loop_is_running(vacuum_loop)) { + g_main_loop_quit(vacuum_loop); + g_main_context_wakeup(vacuum_context); + } + g_thread_join(vacuum_thread); + vacuum_thread = NULL; + } +#endif +} + +#ifdef MQTTVERSION_5 +void janus_mqtt_set_proxy_transaction_user_property(gpointer item_ptr, gpointer acc_ptr) { + janus_config_item *item = (janus_config_item*)item_ptr; + if(item->value == NULL) { + return; + } + + gchar* name = g_strdup(item->value); + g_array_append_val((GArray *)acc_ptr, name); +} + +void janus_mqtt_set_add_transaction_user_property(gpointer item_ptr, gpointer user_data_ptr) { + janus_config_item *item = (janus_config_item*)item_ptr; + if(item->value != NULL) { + return; + } + + janus_mqtt_set_add_transaction_user_property_user_data *user_data = (janus_mqtt_set_add_transaction_user_property_user_data*)user_data_ptr; + GList *key_value = janus_config_get_items(user_data->config, item); + if(key_value == NULL || g_list_length(key_value) != 2) { + JANUS_LOG(LOG_ERR, "Expected a key-value pair\n"); + return; + } + + janus_config_item *key_item = (janus_config_item*)g_list_first(key_value)->data; + janus_config_item *value_item = (janus_config_item*)g_list_last(key_value)->data; + + MQTTProperty property; + property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY; + property.value.data.data = g_strdup(key_item->value); + property.value.data.len = strlen(key_item->value); + property.value.value.data = g_strdup(value_item->value); + property.value.value.len = strlen(value_item->value); + g_array_append_val(user_data->acc, property); } +#endif int janus_mqtt_get_api_compatibility(void) { return JANUS_TRANSPORT_API_VERSION; @@ -654,18 +816,119 @@ int janus_mqtt_send_message(janus_transport_session *transport, void *request_id } char *payload = json_dumps(message, json_format_); - json_decref(message); JANUS_LOG(LOG_HUGE, "Sending %s API message via MQTT: %s\n", admin ? "admin" : "Janus", payload); + + int rc; +#ifdef MQTTVERSION_5 + if(ctx->connect.mqtt_version == MQTTVERSION_5) { + char *response_topic = NULL; + MQTTProperties properties = MQTTProperties_initializer; + char *transaction = g_strdup(json_string_value(json_object_get(message, "transaction"))); + janus_mqtt_transaction_state *state = NULL; + + if (transaction != NULL) { + g_rw_lock_reader_lock(&janus_mqtt_transaction_states_lock); + state = g_hash_table_lookup(janus_mqtt_transaction_states, transaction); + + if(state != NULL) { + response_topic = janus_mqtt_get_response_topic(state); + janus_mqtt_proxy_properties(state, ctx->publish.proxy_transaction_user_properties, &properties); + janus_mqtt_add_properties(state, ctx->publish.add_transaction_user_properties, &properties); + } + + g_rw_lock_reader_unlock(&janus_mqtt_transaction_states_lock); + } - int rc = janus_mqtt_client_publish_message(ctx, payload, admin); + rc = janus_mqtt_client_publish_message5(ctx, payload, admin, &properties, response_topic); + if (response_topic != NULL) g_free(response_topic); + MQTTProperties_free(&properties); + } else { + rc = janus_mqtt_client_publish_message(ctx, payload, admin); + } +#else + rc = janus_mqtt_client_publish_message(ctx, payload, admin); +#endif + if(rc != MQTTASYNC_SUCCESS) { JANUS_LOG(LOG_ERR, "Can't publish to MQTT topic: %s, return code: %d\n", admin ? ctx->admin.publish.topic : ctx->publish.topic, rc); } - free(payload); + json_decref(message); + free(payload); return 0; } +#ifdef MQTTVERSION_5 +char *janus_mqtt_get_response_topic(janus_mqtt_transaction_state *state) { + MQTTProperty *property = MQTTProperties_getProperty(state->properties, MQTTPROPERTY_CODE_RESPONSE_TOPIC); + if(property == NULL) return NULL; + return g_strndup(property->value.data.data, property->value.data.len); +} + +void janus_mqtt_proxy_properties(janus_mqtt_transaction_state *state, GArray *user_property_names, MQTTProperties *properties) { + /* Proxy correlation data standard property unconditionally */ + MQTTProperty *corr_data_req_prop = MQTTProperties_getProperty(state->properties, MQTTPROPERTY_CODE_CORRELATION_DATA); + if(corr_data_req_prop != NULL) { + MQTTProperty corr_data_resp_prop; + corr_data_resp_prop.identifier = MQTTPROPERTY_CODE_CORRELATION_DATA; + corr_data_resp_prop.value.data.data = g_strndup(corr_data_req_prop->value.data.data, corr_data_req_prop->value.data.len); + corr_data_resp_prop.value.data.len = corr_data_req_prop->value.data.len; + + int rc = MQTTProperties_add(properties, &corr_data_resp_prop); + if(rc != 0) { + JANUS_LOG(LOG_ERR, "Failed to add correlation_data property to MQTT response\n"); + } + } + + /* Proxy additional user properties from config */ + if(user_property_names == NULL || user_property_names->len == 0) { + return; + } + + for(int i = 0; i < state->properties->count; i++) { + MQTTProperty request_prop = state->properties->array[i]; + if(request_prop.identifier != MQTTPROPERTY_CODE_USER_PROPERTY) { + continue; + } + + for(uint j = 0; j < user_property_names->len; j++) { + char *key = (char*)g_array_index(user_property_names, char*, j); + int key_len = strlen(key); + + if(strncmp(request_prop.value.data.data, key, key_len) == 0) { + MQTTProperty response_prop; + response_prop.identifier = MQTTPROPERTY_CODE_USER_PROPERTY; + response_prop.value.data.data = key; + response_prop.value.data.len = key_len; + response_prop.value.value.data = g_strndup(request_prop.value.value.data, request_prop.value.value.len); + response_prop.value.value.len = request_prop.value.value.len; + + int rc = MQTTProperties_add(properties, &response_prop); + if(rc == -1) { + JANUS_LOG(LOG_ERR, "Failed to proxy `%s` user property to MQTT response\n", key); + } + + break; + } + } + } +} + +void janus_mqtt_add_properties(janus_mqtt_transaction_state *state, GArray *user_properties, MQTTProperties *properties) { + if(user_properties == NULL || user_properties->len == 0) { + return; + } + + for(uint i = 0; i < user_properties->len; i++) { + MQTTProperty *property = &g_array_index(user_properties, MQTTProperty, i); + int rc = MQTTProperties_add(properties, property); + if(rc != 0) { + JANUS_LOG(LOG_ERR, "Failed to user properties to MQTT response\n"); + } + } +} +#endif + void janus_mqtt_session_created(janus_transport_session *transport, guint64 session_id) { /* We don't care */ } @@ -741,6 +1004,7 @@ void janus_mqtt_client_connection_lost(void *context, char *cause) { } int janus_mqtt_client_message_arrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message) { + int ret = FALSE; janus_mqtt_context *ctx = (janus_mqtt_context *)context; gchar *topic = g_strndup(topicName, topicLen); const gboolean janus = janus_mqtt_api_enabled_ && !strcasecmp(topic, ctx->subscribe.topic); @@ -748,16 +1012,42 @@ int janus_mqtt_client_message_arrived(void *context, char *topicName, int topicL g_free(topic); if((janus || admin) && message->payloadlen) { - JANUS_LOG(LOG_HUGE, "Receiving %s API message over MQTT: %s\n", admin ? "admin" : "Janus", (char *)message->payload); + JANUS_LOG(LOG_HUGE, "Receiving %s API message over MQTT: %.*s\n", admin ? "admin" : "Janus", message->payloadlen, (char*)message->payload); json_error_t error; json_t *root = json_loadb(message->payload, message->payloadlen, 0, &error); + + #ifdef MQTTVERSION_5 + if(ctx->connect.mqtt_version == MQTTVERSION_5 && !admin) { + /* Save MQTT 5 properties copy to the state */ + const gchar *transaction = g_strdup(json_string_value(json_object_get(root, "transaction"))); + if(transaction == NULL) { + JANUS_LOG(LOG_WARN, "`transaction` is missing or not a string\n"); + goto done; + } + + MQTTProperties *properties = g_malloc(sizeof(MQTTProperties)); + *properties = MQTTProperties_copy(&message->properties); + + janus_mqtt_transaction_state *state = g_malloc(sizeof(janus_mqtt_transaction_state)); + state->properties = properties; + state->created_at = time(0); + + g_rw_lock_writer_lock(&janus_mqtt_transaction_states_lock); + g_hash_table_insert(janus_mqtt_transaction_states, (gpointer) transaction, (gpointer) state); + g_rw_lock_writer_unlock(&janus_mqtt_transaction_states_lock); + } + #endif + ctx->gateway->incoming_request(&janus_mqtt_transport_, mqtt_session, NULL, admin, root, &error); } + ret = TRUE; + +done: MQTTAsync_freeMessage(&message); MQTTAsync_free(topicName); - return TRUE; + return ret; } int janus_mqtt_client_connect(janus_mqtt_context *ctx) { @@ -956,7 +1246,7 @@ int janus_mqtt_client_subscribe(janus_mqtt_context *ctx, gboolean admin) { MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer; options.context = ctx; if(admin) { -#ifdef MQTTVERSION_5 + #ifdef MQTTVERSION_5 if(ctx->connect.mqtt_version == MQTTVERSION_5) { options.onSuccess5 = janus_mqtt_client_admin_subscribe_success5; options.onFailure5 = janus_mqtt_client_admin_subscribe_failure5; @@ -964,13 +1254,13 @@ int janus_mqtt_client_subscribe(janus_mqtt_context *ctx, gboolean admin) { options.onSuccess = janus_mqtt_client_admin_subscribe_success; options.onFailure = janus_mqtt_client_admin_subscribe_failure; } -#else + #else options.onSuccess = janus_mqtt_client_admin_subscribe_success; options.onFailure = janus_mqtt_client_admin_subscribe_failure; -#endif + #endif return MQTTAsync_subscribe(ctx->client, ctx->admin.subscribe.topic, ctx->admin.subscribe.qos, &options); } else { -#ifdef MQTTVERSION_5 + #ifdef MQTTVERSION_5 if(ctx->connect.mqtt_version == MQTTVERSION_5) { options.onSuccess5 = janus_mqtt_client_subscribe_success5; options.onFailure5 = janus_mqtt_client_subscribe_failure5; @@ -978,10 +1268,10 @@ int janus_mqtt_client_subscribe(janus_mqtt_context *ctx, gboolean admin) { options.onSuccess = janus_mqtt_client_subscribe_success; options.onFailure = janus_mqtt_client_subscribe_failure; } -#else + #else options.onSuccess = janus_mqtt_client_subscribe_success; options.onFailure = janus_mqtt_client_subscribe_failure; -#endif + #endif return MQTTAsync_subscribe(ctx->client, ctx->subscribe.topic, ctx->subscribe.qos, &options); } } @@ -1081,39 +1371,54 @@ int janus_mqtt_client_publish_message(janus_mqtt_context *ctx, char *payload, gb msg.qos = ctx->publish.qos; msg.retained = FALSE; + char *topic = admin ? ctx->admin.publish.topic : ctx->publish.topic; + MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer; options.context = ctx; if(admin) { -#ifdef MQTTVERSION_5 - if(ctx->connect.mqtt_version == MQTTVERSION_5) { - options.onSuccess5 = janus_mqtt_client_publish_admin_success5; - options.onFailure5 = janus_mqtt_client_publish_admin_failure5; - } else { - options.onSuccess = janus_mqtt_client_publish_admin_success; - options.onFailure = janus_mqtt_client_publish_admin_failure; - } -#else options.onSuccess = janus_mqtt_client_publish_admin_success; options.onFailure = janus_mqtt_client_publish_admin_failure; -#endif - return MQTTAsync_sendMessage(ctx->client, ctx->admin.publish.topic, &msg, &options); } else { -#ifdef MQTTVERSION_5 - if(ctx->connect.mqtt_version == MQTTVERSION_5) { - options.onSuccess5 = janus_mqtt_client_publish_janus_success5; - options.onFailure5 = janus_mqtt_client_publish_janus_failure5; - } else { - options.onSuccess = janus_mqtt_client_publish_janus_success; - options.onFailure = janus_mqtt_client_publish_janus_failure; - } -#else options.onSuccess = janus_mqtt_client_publish_janus_success; options.onFailure = janus_mqtt_client_publish_janus_failure; -#endif - return MQTTAsync_sendMessage(ctx->client, ctx->publish.topic, &msg, &options); } + + return MQTTAsync_sendMessage(ctx->client, topic, &msg, &options); +} + +#ifdef MQTTVERSION_5 +int janus_mqtt_client_publish_message5(janus_mqtt_context *ctx, char *payload, gboolean admin, MQTTProperties *properties, char *custom_topic) { + MQTTAsync_message msg = MQTTAsync_message_initializer; + msg.payload = payload; + msg.payloadlen = strlen(payload); + msg.qos = ctx->publish.qos; + msg.retained = FALSE; + msg.properties = MQTTProperties_copy(properties); + + char *topic; + if(custom_topic) { + topic = custom_topic; + } else if(admin) { + topic = ctx->admin.publish.topic; + } else { + topic = ctx->publish.topic; + } + + MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer; + options.context = ctx; + + if(admin) { + options.onSuccess5 = janus_mqtt_client_publish_admin_success5; + options.onFailure5 = janus_mqtt_client_publish_admin_failure5; + } else { + options.onSuccess5 = janus_mqtt_client_publish_janus_success5; + options.onFailure5 = janus_mqtt_client_publish_janus_failure5; + } + + return MQTTAsync_sendMessage(ctx->client, topic, &msg, &options); } +#endif void janus_mqtt_client_publish_janus_success(void *context, MQTTAsync_successData *response) { janus_mqtt_client_publish_janus_success_impl(context); @@ -1247,6 +1552,9 @@ void janus_mqtt_client_destroy_context(janus_mqtt_context **ptr) { g_free(ctx->connect.password); g_free(ctx->admin.subscribe.topic); g_free(ctx->admin.publish.topic); + #ifdef MQTTVERSION_5 + g_rw_lock_clear(&janus_mqtt_transaction_states_lock); + #endif g_free(ctx); *ptr = NULL; } @@ -1263,3 +1571,46 @@ int janus_mqtt_client_get_response_code5(MQTTAsync_failureData5 *response) { return response ? response->code : 0; } #endif + +#ifdef MQTTVERSION_5 +static gpointer janus_mqtt_vacuum_thread(gpointer context) { + janus_mqtt_context *ctx = (janus_mqtt_context*)context; + + GSource *timeout_source; + timeout_source = g_timeout_source_new_seconds(ctx->vacuum_interval); + g_source_set_callback(timeout_source, janus_mqtt_vacuum, context, NULL); + g_source_attach(timeout_source, vacuum_context); + g_source_unref(timeout_source); + + JANUS_LOG(LOG_VERB, "Starting MQTT transport vacuum thread\n"); + g_main_loop_run(vacuum_loop); + JANUS_LOG(LOG_VERB, "MQTT transport vacuum thread finished\n"); + return NULL; +} + +static gboolean janus_mqtt_vacuum(gpointer context) { + janus_mqtt_context *ctx = (janus_mqtt_context*)context; + GHashTableIter iter; + gpointer value; + + g_rw_lock_writer_lock(&janus_mqtt_transaction_states_lock); + g_hash_table_iter_init(&iter, janus_mqtt_transaction_states); + + while (g_hash_table_iter_next(&iter, NULL, &value)) { + janus_mqtt_transaction_state* state = value; + if(time(0) - state->created_at > ctx->vacuum_interval) { + g_hash_table_iter_remove(&iter); + } + } + + g_rw_lock_writer_unlock(&janus_mqtt_transaction_states_lock); + return G_SOURCE_CONTINUE; +} + +void janus_mqtt_transaction_state_free(gpointer state_ptr) { + janus_mqtt_transaction_state *state = (janus_mqtt_transaction_state*)state_ptr; + MQTTProperties_free(state->properties); + g_free(state->properties); + g_free(state); +} +#endif From 906bd545d23e8e15a733fd4437c61dc208f562e1 Mon Sep 17 00:00:00 2001 From: Timofey Martynov Date: Wed, 25 Dec 2019 14:26:39 +0300 Subject: [PATCH 2/5] Fix Paho 1.1 compatibility --- transports/janus_mqtt.c | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/transports/janus_mqtt.c b/transports/janus_mqtt.c index 2dd00e03a1..7c64347195 100644 --- a/transports/janus_mqtt.c +++ b/transports/janus_mqtt.c @@ -178,7 +178,6 @@ typedef struct janus_mqtt_set_add_transaction_user_property_user_data { /* Transport client methods */ void janus_mqtt_client_connected(void *context, char *cause); -void janus_mqtt_client_disconnected(void *context, MQTTProperties *properties, enum MQTTReasonCodes reasonCode); void janus_mqtt_client_connection_lost(void *context, char *cause); int janus_mqtt_client_message_arrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message); int janus_mqtt_client_connect(janus_mqtt_context *ctx); @@ -207,6 +206,7 @@ int janus_mqtt_client_publish_message(janus_mqtt_context *ctx, char *payload, gb int janus_mqtt_client_get_response_code(MQTTAsync_failureData *response); #ifdef MQTTVERSION_5 /* MQTT v5 interface callbacks */ +void janus_mqtt_client_disconnected5(void *context, MQTTProperties *properties, enum MQTTReasonCodes reasonCode); void janus_mqtt_client_connect_failure5(void *context, MQTTAsync_failureData5 *response); void janus_mqtt_client_reconnect_success5(void *context, MQTTAsync_successData5 *response); void janus_mqtt_client_reconnect_failure5(void *context, MQTTAsync_failureData5 *response); @@ -669,7 +669,8 @@ int janus_mqtt_init(janus_transport_callbacks *callback, const char *config_path goto error; } - if(MQTTAsync_setDisconnected(ctx->client, ctx, janus_mqtt_client_disconnected) != MQTTASYNC_SUCCESS) { +#ifdef MQTTVERSION_5 + if(MQTTAsync_setDisconnected(ctx->client, ctx, janus_mqtt_client_disconnected5) != MQTTASYNC_SUCCESS) { JANUS_LOG(LOG_FATAL, "Can't connect to MQTT broker: error setting up disconnected callback...\n"); goto error; } @@ -683,6 +684,12 @@ int janus_mqtt_init(janus_transport_callbacks *callback, const char *config_path JANUS_LOG(LOG_FATAL, "Can't connect to MQTT broker: error setting up message arrived callback...\n"); goto error; } +#else + if(MQTTAsync_setCallbacks(ctx->client, ctx, janus_mqtt_client_connection_lost, janus_mqtt_client_message_arrived, NULL) != MQTTASYNC_SUCCESS) { + JANUS_LOG(LOG_FATAL, "Can't connect to MQTT broker: error callbacks...\n"); + goto error; + } +#endif /* Connecting to the broker */ int rc = janus_mqtt_client_connect(ctx); @@ -977,7 +984,8 @@ void janus_mqtt_client_connected(void *context, char *cause) { } } -void janus_mqtt_client_disconnected(void *context, MQTTProperties *properties, enum MQTTReasonCodes reasonCode) { +#ifdef MQTTVERSION_5 +void janus_mqtt_client_disconnected5(void *context, MQTTProperties *properties, enum MQTTReasonCodes reasonCode) { const char *reasonCodeStr = MQTTReasonCode_toString(reasonCode); JANUS_LOG(LOG_INFO, "Disconnected from MQTT broker: %s\n", reasonCodeStr); @@ -989,6 +997,7 @@ void janus_mqtt_client_disconnected(void *context, MQTTProperties *properties, e ctx->gateway->notify_event(&janus_mqtt_transport_, mqtt_session, info); } } +#endif void janus_mqtt_client_connection_lost(void *context, char *cause) { JANUS_LOG(LOG_INFO, "MQTT connection lost cause of %s. Reconnecting...\n", cause); @@ -1065,7 +1074,6 @@ int janus_mqtt_client_connect(janus_mqtt_context *ctx) { } #else options.cleansession = ctx->connect.cleansession; - options.onSuccess = janus_mqtt_client_connect_success; options.onFailure = janus_mqtt_client_connect_failure; #endif From dadbca54d11aaf1c46327329fa6d469a5aaa11f0 Mon Sep 17 00:00:00 2001 From: Timofey Martynov Date: Thu, 16 Jan 2020 16:22:32 +0300 Subject: [PATCH 3/5] Fix logging MQTT topics --- transports/janus_mqtt.c | 66 +++++++++++++++++++---------------------- 1 file changed, 30 insertions(+), 36 deletions(-) diff --git a/transports/janus_mqtt.c b/transports/janus_mqtt.c index 7c64347195..45b51f09d8 100644 --- a/transports/janus_mqtt.c +++ b/transports/janus_mqtt.c @@ -235,12 +235,12 @@ void janus_mqtt_client_subscribe_success_impl(void *context); void janus_mqtt_client_subscribe_failure_impl(void *context, int rc); void janus_mqtt_client_admin_subscribe_success_impl(void *context); void janus_mqtt_client_admin_subscribe_failure_impl(void *context, int rc); -void janus_mqtt_client_publish_janus_success_impl(void *context); -void janus_mqtt_client_publish_janus_failure_impl(void *context, int rc); -void janus_mqtt_client_publish_admin_success_impl(void *context); -void janus_mqtt_client_publish_admin_failure_impl(void *context, int rc); -void janus_mqtt_client_publish_status_success_impl(void *context); -void janus_mqtt_client_publish_status_failure_impl(void *context, int rc); +void janus_mqtt_client_publish_janus_success_impl(char *topic); +void janus_mqtt_client_publish_janus_failure_impl(int rc); +void janus_mqtt_client_publish_admin_success_impl(char *topic); +void janus_mqtt_client_publish_admin_failure_impl(int rc); +void janus_mqtt_client_publish_status_success_impl(char *topic); +void janus_mqtt_client_publish_status_failure_impl(int rc); /* We only handle a single client */ static janus_mqtt_context *context_ = NULL; @@ -1429,67 +1429,63 @@ int janus_mqtt_client_publish_message5(janus_mqtt_context *ctx, char *payload, g #endif void janus_mqtt_client_publish_janus_success(void *context, MQTTAsync_successData *response) { - janus_mqtt_client_publish_janus_success_impl(context); + janus_mqtt_client_publish_janus_success_impl(response->alt.pub.destinationName); } #ifdef MQTTVERSION_5 void janus_mqtt_client_publish_janus_success5(void *context, MQTTAsync_successData5 *response) { - janus_mqtt_client_publish_janus_success_impl(context); + janus_mqtt_client_publish_janus_success_impl(response->alt.pub.destinationName); } #endif -void janus_mqtt_client_publish_janus_success_impl(void *context) { - janus_mqtt_context *ctx = (janus_mqtt_context *)context; - JANUS_LOG(LOG_HUGE, "MQTT client has been successfully published to MQTT topic: %s\n", ctx->publish.topic); +void janus_mqtt_client_publish_janus_success_impl(char *topic) { + JANUS_LOG(LOG_HUGE, "MQTT client has been successfully published to MQTT topic: %s\n", topic); } void janus_mqtt_client_publish_janus_failure(void *context, MQTTAsync_failureData *response) { int rc = janus_mqtt_client_get_response_code(response); - janus_mqtt_client_publish_janus_failure_impl(context, rc); + janus_mqtt_client_publish_janus_failure_impl(rc); } #ifdef MQTTVERSION_5 void janus_mqtt_client_publish_janus_failure5(void *context, MQTTAsync_failureData5 *response) { int rc = janus_mqtt_client_get_response_code5(response); - janus_mqtt_client_publish_janus_failure_impl(context, rc); + janus_mqtt_client_publish_janus_failure_impl(rc); } #endif -void janus_mqtt_client_publish_janus_failure_impl(void *context, int rc) { - janus_mqtt_context *ctx = (janus_mqtt_context *)context; - JANUS_LOG(LOG_ERR, "MQTT client has failed publishing to MQTT topic: %s, return code: %d\n", ctx->publish.topic, rc); +void janus_mqtt_client_publish_janus_failure_impl(int rc) { + JANUS_LOG(LOG_ERR, "MQTT client has failed publishing, return code: %d\n", rc); } void janus_mqtt_client_publish_admin_success(void *context, MQTTAsync_successData *response) { - janus_mqtt_client_publish_admin_success_impl(context); + janus_mqtt_client_publish_admin_success_impl(response->alt.pub.destinationName); } #ifdef MQTTVERSION_5 void janus_mqtt_client_publish_admin_success5(void *context, MQTTAsync_successData5 *response) { - janus_mqtt_client_publish_admin_success_impl(context); + janus_mqtt_client_publish_admin_success_impl(response->alt.pub.destinationName); } #endif -void janus_mqtt_client_publish_admin_success_impl(void *context) { - janus_mqtt_context *ctx = (janus_mqtt_context *)context; - JANUS_LOG(LOG_HUGE, "MQTT client has been successfully published to MQTT topic: %s\n", ctx->admin.publish.topic); +void janus_mqtt_client_publish_admin_success_impl(char *topic) { + JANUS_LOG(LOG_HUGE, "MQTT client has been successfully published to MQTT topic: %s\n", topic); } void janus_mqtt_client_publish_admin_failure(void *context, MQTTAsync_failureData *response) { int rc = janus_mqtt_client_get_response_code(response); - janus_mqtt_client_publish_admin_failure_impl(context, rc); + janus_mqtt_client_publish_admin_failure_impl(rc); } #ifdef MQTTVERSION_5 void janus_mqtt_client_publish_admin_failure5(void *context, MQTTAsync_failureData5 *response) { int rc = janus_mqtt_client_get_response_code5(response); - janus_mqtt_client_publish_admin_failure_impl(context, rc); + janus_mqtt_client_publish_admin_failure_impl(rc); } #endif -void janus_mqtt_client_publish_admin_failure_impl(void *context, int rc) { - janus_mqtt_context *ctx = (janus_mqtt_context *)context; - JANUS_LOG(LOG_ERR, "MQTT client has failed publishing to MQTT topic: %s, return code: %d\n", ctx->admin.publish.topic, rc); +void janus_mqtt_client_publish_admin_failure_impl(int rc) { + JANUS_LOG(LOG_ERR, "MQTT client has failed publishing to admin topic, return code: %d\n", rc); } int janus_mqtt_client_publish_status_message(janus_mqtt_context *ctx, char *payload) { @@ -1519,35 +1515,33 @@ int janus_mqtt_client_publish_status_message(janus_mqtt_context *ctx, char *payl } void janus_mqtt_client_publish_status_success(void *context, MQTTAsync_successData *response) { - janus_mqtt_client_publish_status_success_impl(context); + janus_mqtt_client_publish_status_success_impl(response->alt.pub.destinationName); } #ifdef MQTTVERSION_5 void janus_mqtt_client_publish_status_success5(void *context, MQTTAsync_successData5 *response) { - janus_mqtt_client_publish_status_success_impl(context); + janus_mqtt_client_publish_status_success_impl(response->alt.pub.destinationName); } #endif -void janus_mqtt_client_publish_status_success_impl(void *context) { - janus_mqtt_context *ctx = (janus_mqtt_context *)context; - JANUS_LOG(LOG_HUGE, "MQTT client has been successfully published to status MQTT topic: %s\n", ctx->status.topic); +void janus_mqtt_client_publish_status_success_impl(char *topic) { + JANUS_LOG(LOG_HUGE, "MQTT client has been successfully published to status MQTT topic: %s\n", topic); } void janus_mqtt_client_publish_status_failure(void *context, MQTTAsync_failureData *response) { int rc = janus_mqtt_client_get_response_code(response); - janus_mqtt_client_publish_status_failure_impl(context, rc); + janus_mqtt_client_publish_status_failure_impl(rc); } #ifdef MQTTVERSION_5 void janus_mqtt_client_publish_status_failure5(void *context, MQTTAsync_failureData5 *response) { int rc = janus_mqtt_client_get_response_code5(response); - janus_mqtt_client_publish_status_failure_impl(context, rc); + janus_mqtt_client_publish_status_failure_impl(rc); } #endif -void janus_mqtt_client_publish_status_failure_impl(void *context, int rc) { - janus_mqtt_context *ctx = (janus_mqtt_context *)context; - JANUS_LOG(LOG_ERR, "MQTT client has failed publishing to status MQTT topic: %s, return code: %d\n", ctx->status.topic, rc); +void janus_mqtt_client_publish_status_failure_impl(int rc) { + JANUS_LOG(LOG_ERR, "MQTT client has failed publishing to status topic, return code: %d\n", rc); } void janus_mqtt_client_destroy_context(janus_mqtt_context **ptr) { From b8e8f6ea4b5bd750a5643bf894a8b085232fcf42 Mon Sep 17 00:00:00 2001 From: Timofey Martynov Date: Fri, 17 Apr 2020 07:19:59 +0300 Subject: [PATCH 4/5] Add MQTT v5 support to MQTT event handler --- conf/janus.eventhandler.mqttevh.jcfg.sample | 4 + events/janus_mqttevh.c | 485 ++++++++++++++------ 2 files changed, 359 insertions(+), 130 deletions(-) diff --git a/conf/janus.eventhandler.mqttevh.jcfg.sample b/conf/janus.eventhandler.mqttevh.jcfg.sample index 5d273dbe41..85d77e0559 100644 --- a/conf/janus.eventhandler.mqttevh.jcfg.sample +++ b/conf/janus.eventhandler.mqttevh.jcfg.sample @@ -16,6 +16,7 @@ general: { # plain (no indentation) or compact (no indentation and no spaces) url = "tcp://localhost:1883" # The URL of the MQTT server. Only tcp supported at this time. + #mqtt_version = "3.1.1" # Protocol version. Available values: 3.1, 3.1.1 (default), 5. client_id = "janus.example.com" # Janus client id. You have to configure a unique ID (default: guest). #keep_alive_interval = 20 # Keep connection for N seconds (default: 30) #cleansession = 0 # Clean session flag (default: off) @@ -46,4 +47,7 @@ general: { #tls_client_key = "/path/to/key.pem" #tls_ciphers #tls_version + + # These options work with MQTT 5 only. + #add_user_properties = () # List of user property ["key", "value"] pairs to add. } diff --git a/events/janus_mqttevh.c b/events/janus_mqttevh.c index 29bf519531..7b994de9f1 100644 --- a/events/janus_mqttevh.c +++ b/events/janus_mqttevh.c @@ -46,7 +46,6 @@ json_t *janus_mqttevh_handle_request(json_t *request); static int janus_mqttevh_send_message(void *context, const char *topic, json_t *message); static void *janus_mqttevh_handler(void *data); - /* Event handler setup */ static janus_eventhandler janus_mqttevh = JANUS_EVENTHANDLER_INIT ( @@ -92,27 +91,31 @@ static GThread *handler_thread; static volatile gint initialized = 0, stopping = 0; /* JSON serialization options */ -#define DEFAULT_ADDPLUGIN 1 -#define DEFAULT_ADDEVENT 1 -#define DEFAULT_KEEPALIVE 30 -#define DEFAULT_CLEANSESSION 0 /* Off */ -#define DEFAULT_TIMEOUT 30 -#define DEFAULT_DISCONNECT_TIMEOUT 100 -#define DEFAULT_QOS 0 -#define DEFAULT_RETAIN 0 -#define DEFAULT_CONNECT_STATUS "{\"event\": \"connected\", \"eventhandler\": \""JANUS_MQTTEVH_PACKAGE"\"}" -#define DEFAULT_DISCONNECT_STATUS "{\"event\": \"disconnected\"}" -#define DEFAULT_WILL_RETAIN 1 -#define DEFAULT_WILL_QOS 0 -#define DEFAULT_BASETOPIC "/janus/events" -#define DEFAULT_MQTTURL "tcp://localhost:1883" -#define DEFAULT_JSON_FORMAT JSON_INDENT(3) | JSON_PRESERVE_ORDER - -#define DEFAULT_TLS_ENABLE FALSE -#define DEFAULT_TLS_VERIFY_PEER FALSE -#define DEFAULT_TLS_VERIFY_HOST FALSE - -static size_t json_format = DEFAULT_JSON_FORMAT; +#define JANUS_MQTTEVH_DEFAULT_ADDPLUGIN 1 +#define JANUS_MQTTEVH_DEFAULT_ADDEVENT 1 +#define JANUS_MQTTEVH_DEFAULT_KEEPALIVE 30 +#define JANUS_MQTTEVH_DEFAULT_CLEANSESSION 0 /* Off */ +#define JANUS_MQTTEVH_DEFAULT_TIMEOUT 30 +#define JANUS_MQTTEVH_DEFAULT_DISCONNECT_TIMEOUT 100 +#define JANUS_MQTTEVH_DEFAULT_QOS 0 +#define JANUS_MQTTEVH_DEFAULT_RETAIN 0 +#define JANUS_MQTTEVH_DEFAULT_CONNECT_STATUS "{\"event\": \"connected\", \"eventhandler\": \""JANUS_MQTTEVH_PACKAGE"\"}" +#define JANUS_MQTTEVH_DEFAULT_DISCONNECT_STATUS "{\"event\": \"disconnected\"}" +#define JANUS_MQTTEVH_DEFAULT_WILL_RETAIN 1 +#define JANUS_MQTTEVH_DEFAULT_WILL_QOS 0 +#define JANUS_MQTTEVH_DEFAULT_BASETOPIC "/janus/events" +#define JANUS_MQTTEVH_DEFAULT_MQTTURL "tcp://localhost:1883" +#define JANUS_MQTTEVH_DEFAULT_JSON_FORMAT JSON_INDENT(3) | JSON_PRESERVE_ORDER +#define JANUS_MQTTEVH_DEFAULT_TLS_ENABLE FALSE +#define JANUS_MQTTEVH_DEFAULT_TLS_VERIFY_PEER FALSE +#define JANUS_MQTTEVH_DEFAULT_TLS_VERIFY_HOST FALSE + +#define JANUS_MQTTEVH_VERSION_3_1 "3.1" +#define JANUS_MQTTEVH_VERSION_3_1_1 "3.1.1" +#define JANUS_MQTTEVH_VERSION_5 "5" +#define JANUS_MQTTEVH_VERSION_DEFAULT JANUS_MQTTEVH_VERSION_3_1_1 + +static size_t json_format = JANUS_MQTTEVH_DEFAULT_JSON_FORMAT; /* Parameter validation (for tweaking via Admin API) */ @@ -142,6 +145,7 @@ typedef struct janus_mqttevh_context { /* Connection data - authentication and url */ struct { + int mqtt_version; int keep_alive_interval; int cleansession; char *client_id; @@ -161,6 +165,9 @@ typedef struct janus_mqttevh_context { char *disconnect_status; int qos; int retain; + #ifdef MQTTVERSION_5 + GArray *add_user_properties; + #endif } publish; /* If we loose connection, the will is our last publish */ @@ -182,21 +189,53 @@ typedef struct janus_mqttevh_context { } tls; } janus_mqttevh_context; +#ifdef MQTTVERSION_5 +/* MQTT 5 specific types */ +typedef struct janus_mqttevh_set_add_user_property_user_data { + GArray *acc; + janus_config *config; +} janus_mqttevh_set_add_user_property_user_data; +#endif + /* Event handler methods */ static void janus_mqttevh_client_connection_lost(void *context, char *cause); static int janus_mqttevh_client_connect(janus_mqttevh_context *ctx); +static int janus_mqttevh_client_disconnect(janus_mqttevh_context *ctx); +static void janus_mqttevh_client_destroy_context(janus_mqttevh_context **ctx); +/* MQTT v3.x interface callbacks */ static void janus_mqttevh_client_connect_success(void *context, MQTTAsync_successData *response); static void janus_mqttevh_client_connect_failure(void *context, MQTTAsync_failureData *response); -static int janus_mqttevh_client_reconnect(janus_mqttevh_context *ctx); -static void janus_mqttevh_client_reconnect_success(void *context, MQTTAsync_successData *response); -static void janus_mqttevh_client_reconnect_failure(void *context, MQTTAsync_failureData *response); -static int janus_mqttevh_client_disconnect(janus_mqttevh_context *ctx); static void janus_mqttevh_client_disconnect_success(void *context, MQTTAsync_successData *response); static void janus_mqttevh_client_disconnect_failure(void *context, MQTTAsync_failureData *response); +static void janus_mqttevh_client_publish_message_success(void *context, MQTTAsync_successData *response); +static void janus_mqttevh_client_publish_message_failure(void *context, MQTTAsync_failureData *response); static int janus_mqttevh_client_publish_message(janus_mqttevh_context *ctx, const char *topic, int retain, char *payload); -static void janus_mqttevh_client_publish_janus_success(void *context, MQTTAsync_successData *response); -static void janus_mqttevh_client_publish_janus_failure(void *context, MQTTAsync_failureData *response); -static void janus_mqttevh_client_destroy_context(janus_mqttevh_context **ctx); +int janus_mqttevh_client_get_response_code(MQTTAsync_failureData *response); +#ifdef MQTTVERSION_5 +/* MQTT v5 interface callbacks */ +static void janus_mqttevh_client_connect_success5(void *context, MQTTAsync_successData5 *response); +static void janus_mqttevh_client_connect_failure5(void *context, MQTTAsync_failureData5 *response); +static void janus_mqttevh_client_disconnect_success5(void *context, MQTTAsync_successData5 *response); +static void janus_mqttevh_client_disconnect_failure5(void *context, MQTTAsync_failureData5 *response); +static void janus_mqttevh_client_publish_message_success5(void *context, MQTTAsync_successData5 *response); +static void janus_mqttevh_client_publish_message_failure5(void *context, MQTTAsync_failureData5 *response); +static int janus_mqttevh_client_publish_message5(janus_mqttevh_context *ctx, const char *topic, int retain, char *payload, MQTTProperties *properties); +int janus_mqttevh_client_get_response_code5(MQTTAsync_failureData5 *response); +#endif +/* MQTT version independent callback implementations */ +static void janus_mqttevh_client_connect_success_impl(void *context); +static void janus_mqttevh_client_connect_failure_impl(void *context, int rc); +static void janus_mqttevh_client_disconnect_success_impl(void *context); +static void janus_mqttevh_client_disconnect_failure_impl(void *context, int rc); +static void janus_mqttevh_client_publish_message_success_impl(void *context); +static void janus_mqttevh_client_publish_message_failure_impl(void *context, int rc); +int janus_mqttevh_client_publish_message_wrap(void *context, const char *topic, int retain, char *payload); + +#ifdef MQTTVERSION_5 +/* MQTT 5 specific functions */ +void janus_mqttevh_add_properties(GArray *user_properties, MQTTProperties *properties); +void janus_mqttevh_set_add_user_property(gpointer item_ptr, gpointer user_data_ptr); +#endif /* We only handle a single connection */ static janus_mqttevh_context *context = NULL; @@ -267,8 +306,7 @@ static int janus_mqttevh_send_message(void *context, const char *topic, json_t * /* Ok, lets' get rid of the message */ json_decref(message); - rc = janus_mqttevh_client_publish_message(ctx, topic, ctx->publish.retain, payload); - + rc = janus_mqttevh_client_publish_message_wrap(context, topic, ctx->publish.retain, payload); if(rc != MQTTASYNC_SUCCESS) { JANUS_LOG(LOG_WARN, "Can't publish to MQTT topic: %s, return code: %d\n", ctx->publish.topic, rc); } @@ -280,6 +318,26 @@ static int janus_mqttevh_send_message(void *context, const char *topic, json_t * return 0; } +int janus_mqttevh_client_publish_message_wrap(void *context, const char *topic, int retain, char *payload) { + int rc = 0; + janus_mqttevh_context *ctx = (janus_mqttevh_context *)context; + +#ifdef MQTTVERSION_5 + if(ctx->connect.mqtt_version == MQTTVERSION_5) { + MQTTProperties properties = MQTTProperties_initializer; + janus_mqttevh_add_properties(ctx->publish.add_user_properties, &properties); + rc = janus_mqttevh_client_publish_message5(ctx, topic, retain, payload, &properties); + MQTTProperties_free(&properties); + } else { + rc = janus_mqttevh_client_publish_message(ctx, topic, retain, payload); + } +#else + rc = janus_mqttevh_client_publish_message(ctx, topic, retain, payload); +#endif + + return rc; +} + static void janus_mqttevh_client_connection_lost(void *context, char *cause) { /* Notify handlers about this transport being gone */ @@ -290,17 +348,31 @@ static void janus_mqttevh_client_connection_lost(void *context, char *cause) { /* Set up connection to MQTT broker */ static int janus_mqttevh_client_connect(janus_mqttevh_context *ctx) { - int rc; - MQTTAsync_connectOptions options = MQTTAsync_connectOptions_initializer; - options.keepAliveInterval = ctx->connect.keep_alive_interval; + +#ifdef MQTTVERSION_5 + if(ctx->connect.mqtt_version == MQTTVERSION_5) { + MQTTAsync_connectOptions options5 = MQTTAsync_connectOptions_initializer5; + options = options5; + options.cleanstart = ctx->connect.cleansession; + options.onSuccess5 = janus_mqttevh_client_connect_success5; + options.onFailure5 = janus_mqttevh_client_connect_failure5; + } else { + options.cleansession = ctx->connect.cleansession; + options.onSuccess = janus_mqttevh_client_connect_success; + options.onFailure = janus_mqttevh_client_connect_failure; + } +#else options.cleansession = ctx->connect.cleansession; + options.onSuccess = janus_mqttevh_client_connect_success; + options.onFailure = janus_mqttevh_client_connect_failure; +#endif + + options.MQTTVersion = ctx->connect.mqtt_version; options.username = ctx->connect.username; options.password = ctx->connect.password; options.automaticReconnect = TRUE; - options.onSuccess = janus_mqttevh_client_connect_success; - options.onFailure = janus_mqttevh_client_connect_failure; - options.context = ctx; + options.keepAliveInterval = ctx->connect.keep_alive_interval; MQTTAsync_willOptions willOptions = MQTTAsync_willOptions_initializer; if(ctx->will.enabled) { @@ -308,16 +380,25 @@ static int janus_mqttevh_client_connect(janus_mqttevh_context *ctx) { willOptions.message = ctx->publish.disconnect_status; willOptions.retained = ctx->will.retain; willOptions.qos = ctx->will.qos; - options.will = &willOptions; } - rc = MQTTAsync_connect(ctx->client, &options); - return rc; + options.context = ctx; + return MQTTAsync_connect(ctx->client, &options); } -/* Callback for succesful connection to MQTT broker */ +/* Callback for successful connection to MQTT broker */ static void janus_mqttevh_client_connect_success(void *context, MQTTAsync_successData *response) { + janus_mqttevh_client_connect_success_impl(context); +} + +#ifdef MQTTVERSION_5 +static void janus_mqttevh_client_connect_success5(void *context, MQTTAsync_successData5 *response) { + janus_mqttevh_client_connect_success_impl(context); +} +#endif + +static void janus_mqttevh_client_connect_success_impl(void *context) { JANUS_LOG(LOG_INFO, "MQTT EVH client has been successfully connected to the broker\n"); janus_mqttevh_context *ctx = (janus_mqttevh_context *)context; @@ -328,7 +409,7 @@ static void janus_mqttevh_client_connect_success(void *context, MQTTAsync_succes /* Using LWT's retain for initial status message because * we need to ensure we overwrite LWT if it's retained. */ - int rc = janus_mqttevh_client_publish_message(ctx, topicbuf, ctx->will.retain, ctx->publish.connect_status); + int rc = janus_mqttevh_client_publish_message_wrap(context, topicbuf, ctx->will.retain, ctx->publish.connect_status); if(rc != MQTTASYNC_SUCCESS) { JANUS_LOG(LOG_WARN, "Can't publish to MQTT topic: %s, return code: %d\n", topicbuf, rc); @@ -337,58 +418,20 @@ static void janus_mqttevh_client_connect_success(void *context, MQTTAsync_succes /* Callback for MQTT broker connection failure */ static void janus_mqttevh_client_connect_failure(void *context, MQTTAsync_failureData *response) { - int rc = response ? response->code : 0; - - /* Notify handlers about this transport failure */ - JANUS_LOG(LOG_ERR, "MQTT EVH client has failed connecting to the broker, return code: %d. Reconnecting...\n", rc); - + int rc = janus_mqttevh_client_get_response_code(response); + janus_mqttevh_client_connect_failure_impl(context, rc); } -/* MQTT broker Reconnect function */ -static int janus_mqttevh_client_reconnect(janus_mqttevh_context *ctx) { - JANUS_LOG(LOG_INFO, "MQTT EVH client reconnecting to %s. Reconnecting...\n", ctx->connect.url); - - MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer; - options.onSuccess = janus_mqttevh_client_reconnect_success; - options.onFailure = janus_mqttevh_client_reconnect_failure; - options.context = ctx; - options.timeout = ctx->disconnect.timeout; - - return MQTTAsync_disconnect(ctx->client, &options); -} - -/* Callback for successful reconnection to MQTT broker */ -static void janus_mqttevh_client_reconnect_success(void *context, MQTTAsync_successData *response) { - janus_mqttevh_context *ctx = (janus_mqttevh_context *)context; - - JANUS_LOG(LOG_WARN, "MQTT EVH client has been disconnected from %s. Reconnecting...\n", ctx->connect.url); - - int rc = janus_mqttevh_client_connect(context); - if(rc != MQTTASYNC_SUCCESS) { - const char *error; - switch(rc) { - case 1: error = "Connection refused - protocol version"; - break; - case 2: error = "Connection refused - identifier rejected"; - break; - case 3: error = "Connection refused - server unavailable"; - break; - case 4: error = "Connection refused - bad credentials"; - break; - case 5: error = "Connection refused - not authroized"; - break; - default: error = "Connection refused - unknown error"; - break; - } - JANUS_LOG(LOG_FATAL, "Can't connect to MQTT broker, return code: %d (%s)\n", rc, error); - return; - } +#ifdef MQTTVERSION_5 +static void janus_mqttevh_client_connect_failure5(void *context, MQTTAsync_failureData5 *response) { + int rc = janus_mqttevh_client_get_response_code5(response); + janus_mqttevh_client_connect_failure_impl(context, rc); } +#endif -/* Callback for MQTT broker reconnect failure */ -static void janus_mqttevh_client_reconnect_failure(void *context, MQTTAsync_failureData *response) { - int rc = response ? response->code : 0; - JANUS_LOG(LOG_ERR, "MQTT EVH client failed reconnecting to MQTT broker, return code: %d\n", rc); +static void janus_mqttevh_client_connect_failure_impl(void *contexts, int rc) { + /* Notify handlers about this transport failure */ + JANUS_LOG(LOG_ERR, "MQTT EVH client has failed connecting to the broker, return code: %d. Reconnecting...\n", rc); } /* Disconnect from MQTT broker */ @@ -399,23 +442,44 @@ static int janus_mqttevh_client_disconnect(janus_mqttevh_context *ctx) { /* Using LWT's retain for disconnect status message because * we need to ensure we overwrite LWT if it's retained. */ - int rc = janus_mqttevh_client_publish_message(ctx, topicbuf, 1, ctx->publish.disconnect_status); + int rc = janus_mqttevh_client_publish_message_wrap(context, topicbuf, 1, ctx->publish.disconnect_status); if(rc != MQTTASYNC_SUCCESS) { JANUS_LOG(LOG_WARN, "Can't publish to MQTT topic: %s, return code: %d\n", topicbuf, rc); } MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer; - options.onSuccess = janus_mqttevh_client_disconnect_success; - options.onFailure = janus_mqttevh_client_disconnect_failure; options.context = ctx; options.timeout = ctx->disconnect.timeout; + +#ifdef MQTTVERSION_5 + if(ctx->connect.mqtt_version == MQTTVERSION_5) { + options.onSuccess5 = janus_mqttevh_client_disconnect_success5; + options.onFailure5 = janus_mqttevh_client_disconnect_failure5; + } else { + options.onSuccess = janus_mqttevh_client_disconnect_success; + options.onFailure = janus_mqttevh_client_disconnect_failure; + } +#else + options.onSuccess = janus_mqttevh_client_disconnect_success; + options.onFailure = janus_mqttevh_client_disconnect_failure; +#endif + return MQTTAsync_disconnect(ctx->client, &options); } /* Callback for succesful MQTT disconnect */ static void janus_mqttevh_client_disconnect_success(void *context, MQTTAsync_successData *response) { + janus_mqttevh_client_disconnect_success_impl(context); +} +#ifdef MQTTVERSION_5 +static void janus_mqttevh_client_disconnect_success5(void *context, MQTTAsync_successData5 *response) { + janus_mqttevh_client_disconnect_success_impl(context); +} +#endif + +static void janus_mqttevh_client_disconnect_success_impl(void *context) { /* Notify handlers about this transport being gone */ janus_mqttevh_context *ctx = (janus_mqttevh_context *)context; @@ -425,11 +489,20 @@ static void janus_mqttevh_client_disconnect_success(void *context, MQTTAsync_suc /* Callback for MQTT disconnect failure */ void janus_mqttevh_client_disconnect_failure(void *context, MQTTAsync_failureData *response) { - int rc = response ? response->code : 0; - janus_mqttevh_context *ctx = (janus_mqttevh_context *)context; + int rc = janus_mqttevh_client_get_response_code(response); + janus_mqttevh_client_disconnect_failure_impl(context, rc); +} - JANUS_LOG(LOG_ERR, "Can't disconnect from MQTT EVH broker %s, return code: %d\n", ctx->connect.url, rc); +#ifdef MQTTVERSION_5 +void janus_mqttevh_client_disconnect_failure5(void *context, MQTTAsync_failureData5 *response) { + int rc = janus_mqttevh_client_get_response_code5(response); + janus_mqttevh_client_disconnect_failure_impl(context, rc); +} +#endif +void janus_mqttevh_client_disconnect_failure_impl(void *context, int rc) { + janus_mqttevh_context *ctx = (janus_mqttevh_context *)context; + JANUS_LOG(LOG_ERR, "Can't disconnect from MQTT EVH broker %s, return code: %d\n", ctx->connect.url, rc); janus_mqttevh_client_destroy_context(&ctx); } @@ -437,14 +510,10 @@ void janus_mqttevh_client_disconnect_failure(void *context, MQTTAsync_failureDat /* Publish mqtt message using paho * Payload is a string. JSON objects should be stringified before calling this function. */ -static int janus_mqttevh_client_publish_message(janus_mqttevh_context *ctx, const char *topic, int retain, char *payload) -{ +static int janus_mqttevh_client_publish_message(janus_mqttevh_context *ctx, const char *topic, int retain, char *payload) { int rc; - MQTTAsync_responseOptions options; - memset(&options, 0, sizeof(MQTTAsync_responseOptions)); MQTTAsync_message msg = MQTTAsync_message_initializer; - msg.payload = payload; msg.payloadlen = strlen(payload); msg.qos = ctx->publish.qos; @@ -455,9 +524,37 @@ static int janus_mqttevh_client_publish_message(janus_mqttevh_context *ctx, cons payload = (char *)NULL; */ + MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer; + options.context = ctx; + options.onSuccess = janus_mqttevh_client_publish_message_success; + options.onFailure = janus_mqttevh_client_publish_message_failure; + + rc = MQTTAsync_sendMessage(ctx->client, topic, &msg, &options); + if(rc == MQTTASYNC_SUCCESS) { + JANUS_LOG(LOG_HUGE, "MQTT EVH message sent to topic %s on %s. Result %d\n", topic, ctx->connect.url, rc); + } else { + JANUS_LOG(LOG_WARN, "FAILURE: MQTT EVH message propably not sent to topic %s on %s. Result %d\n", topic, ctx->connect.url, rc); + } + + return rc; +} + +#ifdef MQTTVERSION_5 +static int janus_mqttevh_client_publish_message5(janus_mqttevh_context *ctx, const char *topic, int retain, char *payload, MQTTProperties *properties) { + int rc; + + MQTTAsync_message msg = MQTTAsync_message_initializer; + msg.payload = payload; + msg.payloadlen = strlen(payload); + msg.qos = ctx->publish.qos; + msg.retained = retain; + msg.properties = MQTTProperties_copy(properties); + + MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer; options.context = ctx; - options.onSuccess = janus_mqttevh_client_publish_janus_success; - options.onFailure = janus_mqttevh_client_publish_janus_failure; + options.onSuccess5 = janus_mqttevh_client_publish_message_success5; + options.onFailure5 = janus_mqttevh_client_publish_message_failure5; + rc = MQTTAsync_sendMessage(ctx->client, topic, &msg, &options); if(rc == MQTTASYNC_SUCCESS) { JANUS_LOG(LOG_HUGE, "MQTT EVH message sent to topic %s on %s. Result %d\n", topic, ctx->connect.url, rc); @@ -467,9 +564,20 @@ static int janus_mqttevh_client_publish_message(janus_mqttevh_context *ctx, cons return rc; } +#endif /* Callback for successful MQTT publish */ -static void janus_mqttevh_client_publish_janus_success(void *context, MQTTAsync_successData *response) { +static void janus_mqttevh_client_publish_message_success(void *context, MQTTAsync_successData *response) { + janus_mqttevh_client_publish_message_success_impl(context); +} + +#ifdef MQTTVERSION_5 +static void janus_mqttevh_client_publish_message_success5(void *context, MQTTAsync_successData5 *response) { + janus_mqttevh_client_publish_message_success_impl(context); +} +#endif + +static void janus_mqttevh_client_publish_message_success_impl(void *context) { janus_mqttevh_context *ctx = (janus_mqttevh_context *)context; JANUS_LOG(LOG_HUGE, "MQTT EVH client has successfully published to MQTT base topic: %s\n", ctx->publish.topic); } @@ -477,13 +585,23 @@ static void janus_mqttevh_client_publish_janus_success(void *context, MQTTAsync_ /* Callback for MQTT publish failure * Should we bring message into queue? Right now, we just drop it. */ -static void janus_mqttevh_client_publish_janus_failure(void *context, MQTTAsync_failureData *response) { +static void janus_mqttevh_client_publish_message_failure(void *context, MQTTAsync_failureData *response) { + int rc = janus_mqttevh_client_get_response_code(response); + janus_mqttevh_client_publish_message_failure_impl(context, rc); +} + +#ifdef MQTTVERSION_5 +static void janus_mqttevh_client_publish_message_failure5(void *context, MQTTAsync_failureData5 *response) { + int rc = janus_mqttevh_client_get_response_code5(response); + janus_mqttevh_client_publish_message_failure_impl(context, rc); +} +#endif + +static void janus_mqttevh_client_publish_message_failure_impl(void *context, int rc) { janus_mqttevh_context *ctx = (janus_mqttevh_context *)context; - int rc = response ? response->code : 0; JANUS_LOG(LOG_ERR, "MQTT EVH client has failed publishing to MQTT topic: %s, return code: %d\n", ctx->publish.topic, rc); } - /* Destroy Janus MQTT event handler session context */ static void janus_mqttevh_client_destroy_context(janus_mqttevh_context **ptr) { JANUS_LOG(LOG_INFO, "About to destroy MQTT EVH context...\n"); @@ -572,21 +690,21 @@ static int janus_mqttevh_init(const char *config_path) { /* Set default values */ /* Strings are set to default values later */ - ctx->addplugin = DEFAULT_ADDPLUGIN; - ctx->addevent = DEFAULT_ADDEVENT; - ctx->publish.qos = DEFAULT_QOS; - ctx->publish.retain = DEFAULT_RETAIN; + ctx->addplugin = JANUS_MQTTEVH_DEFAULT_ADDPLUGIN; + ctx->addevent = JANUS_MQTTEVH_DEFAULT_ADDEVENT; + ctx->publish.qos = JANUS_MQTTEVH_DEFAULT_QOS; + ctx->publish.retain = JANUS_MQTTEVH_DEFAULT_RETAIN; ctx->connect.username = NULL; ctx->connect.password = NULL; - ctx->disconnect.timeout = DEFAULT_TIMEOUT; + ctx->disconnect.timeout = JANUS_MQTTEVH_DEFAULT_TIMEOUT; ctx->will.enabled = FALSE; - ctx->will.qos = DEFAULT_WILL_QOS; - ctx->will.retain = DEFAULT_WILL_RETAIN; + ctx->will.qos = JANUS_MQTTEVH_DEFAULT_WILL_QOS; + ctx->will.retain = JANUS_MQTTEVH_DEFAULT_WILL_RETAIN; - ctx->tls.enable = DEFAULT_TLS_ENABLE; - ctx->tls.verify_peer = DEFAULT_TLS_VERIFY_PEER; - ctx->tls.verify_host = DEFAULT_TLS_VERIFY_HOST; + ctx->tls.enable = JANUS_MQTTEVH_DEFAULT_TLS_ENABLE; + ctx->tls.verify_peer = JANUS_MQTTEVH_DEFAULT_TLS_VERIFY_PEER; + ctx->tls.verify_host = JANUS_MQTTEVH_DEFAULT_TLS_VERIFY_HOST; /* Setup the event handler, if required */ janus_config_item *item = janus_config_get(config, config_general, janus_config_type_item, "enabled"); @@ -599,7 +717,26 @@ static int janus_mqttevh_init(const char *config_path) { /* MQTT URL */ url_item = janus_config_get(config, config_general, janus_config_type_item, "url"); - ctx->connect.url= g_strdup((url_item && url_item->value) ? url_item->value : DEFAULT_MQTTURL); + ctx->connect.url= g_strdup((url_item && url_item->value) ? url_item->value : JANUS_MQTTEVH_DEFAULT_MQTTURL); + + janus_config_item *mqtt_version = janus_config_get(config, config_general, janus_config_type_item, "mqtt_version"); + const char *mqtt_version_str = (mqtt_version && mqtt_version->value) ? mqtt_version->value : JANUS_MQTTEVH_VERSION_DEFAULT; + + if(strcmp(mqtt_version_str, JANUS_MQTTEVH_VERSION_3_1) == 0) { + ctx->connect.mqtt_version = MQTTVERSION_3_1; + } else if(strcmp(mqtt_version_str, JANUS_MQTTEVH_VERSION_3_1_1) == 0) { + ctx->connect.mqtt_version = MQTTVERSION_3_1_1; + } else if(strcmp(mqtt_version_str, JANUS_MQTTEVH_VERSION_5) == 0) { +#ifdef MQTTVERSION_5 + ctx->connect.mqtt_version = MQTTVERSION_5; +#else + JANUS_LOG(LOG_FATAL, "Using MQTT v5 requires compilation with Paho >= 1.3.0\n"); + goto error; +#endif + } else { + JANUS_LOG(LOG_FATAL, "Unknown MQTT version\n"); + goto error; + } janus_config_item *client_id_item = janus_config_get(config, config_general, janus_config_type_item, "client_id"); @@ -633,7 +770,7 @@ static int janus_mqttevh_init(const char *config_path) { json_format = JSON_COMPACT | JSON_PRESERVE_ORDER; } else { JANUS_LOG(LOG_WARN, "Unsupported JSON format option '%s', using default (indented)\n", json_item->value); - json_format = DEFAULT_JSON_FORMAT; + json_format = JANUS_MQTTEVH_DEFAULT_JSON_FORMAT; } } @@ -645,32 +782,32 @@ static int janus_mqttevh_init(const char *config_path) { /* Connect configuration */ keep_alive_interval_item = janus_config_get(config, config_general, janus_config_type_item, "keep_alive_interval"); ctx->connect.keep_alive_interval = (keep_alive_interval_item && keep_alive_interval_item->value) ? - atoi(keep_alive_interval_item->value) : DEFAULT_KEEPALIVE; + atoi(keep_alive_interval_item->value) : JANUS_MQTTEVH_DEFAULT_KEEPALIVE; if(ctx->connect.keep_alive_interval < 0) { JANUS_LOG(LOG_ERR, "Invalid keep-alive value: %s (falling back to default)\n", keep_alive_interval_item->value); - ctx->connect.keep_alive_interval = DEFAULT_KEEPALIVE; + ctx->connect.keep_alive_interval = JANUS_MQTTEVH_DEFAULT_KEEPALIVE; } cleansession_item = janus_config_get(config, config_general, janus_config_type_item, "cleansession"); ctx->connect.cleansession = (cleansession_item && cleansession_item->value) ? - atoi(cleansession_item->value) : DEFAULT_CLEANSESSION; + atoi(cleansession_item->value) : JANUS_MQTTEVH_DEFAULT_CLEANSESSION; if(ctx->connect.cleansession < 0) { JANUS_LOG(LOG_ERR, "Invalid clean-session value: %s (falling back to default)\n", cleansession_item->value); - ctx->connect.cleansession = DEFAULT_CLEANSESSION; + ctx->connect.cleansession = JANUS_MQTTEVH_DEFAULT_CLEANSESSION; } /* Disconnect configuration */ disconnect_timeout_item = janus_config_get(config, config_general, janus_config_type_item, "disconnect_timeout"); ctx->disconnect.timeout = (disconnect_timeout_item && disconnect_timeout_item->value) ? - atoi(disconnect_timeout_item->value) : DEFAULT_DISCONNECT_TIMEOUT; + atoi(disconnect_timeout_item->value) : JANUS_MQTTEVH_DEFAULT_DISCONNECT_TIMEOUT; if(ctx->disconnect.timeout < 0) { JANUS_LOG(LOG_ERR, "Invalid disconnect-timeout value: %s (falling back to default)\n", disconnect_timeout_item->value); - ctx->disconnect.timeout = DEFAULT_DISCONNECT_TIMEOUT; + ctx->disconnect.timeout = JANUS_MQTTEVH_DEFAULT_DISCONNECT_TIMEOUT; } topic_item = janus_config_get(config, config_general, janus_config_type_item, "topic"); if(!topic_item || !topic_item->value) { - ctx->publish.topic = g_strdup(DEFAULT_BASETOPIC); + ctx->publish.topic = g_strdup(JANUS_MQTTEVH_DEFAULT_BASETOPIC); } else { ctx->publish.topic = g_strdup(topic_item->value); } @@ -698,14 +835,14 @@ static int janus_mqttevh_init(const char *config_path) { if(connect_status_item && connect_status_item->value) { ctx->publish.connect_status = g_strdup(connect_status_item->value); } else { - ctx->publish.connect_status = g_strdup(DEFAULT_CONNECT_STATUS); + ctx->publish.connect_status = g_strdup(JANUS_MQTTEVH_DEFAULT_CONNECT_STATUS); } disconnect_status_item = janus_config_get(config, config_general, janus_config_type_item, "disconnect_status"); if(disconnect_status_item && disconnect_status_item->value) { ctx->publish.disconnect_status = g_strdup(disconnect_status_item->value); } else { - ctx->publish.disconnect_status = g_strdup(DEFAULT_DISCONNECT_STATUS); + ctx->publish.disconnect_status = g_strdup(JANUS_MQTTEVH_DEFAULT_DISCONNECT_STATUS); } /* LWT config */ @@ -782,24 +919,62 @@ static int janus_mqttevh_init(const char *config_path) { } } +#ifdef MQTTVERSION_5 + if (ctx->connect.mqtt_version == MQTTVERSION_5) { + /* MQTT 5 specific configuration */ + janus_config_array *add_user_properties_array = janus_config_get(config, config_general, janus_config_type_array, "add_user_properties"); + if(add_user_properties_array) { + GList *add_user_properties_array_items = janus_config_get_arrays(config, add_user_properties_array); + if(add_user_properties_array_items != NULL) { + int add_user_properties_array_len = g_list_length(add_user_properties_array_items); + if(add_user_properties_array_len > 0) { + ctx->publish.add_user_properties = g_array_sized_new(FALSE, FALSE, sizeof(MQTTProperty), add_user_properties_array_len); + + janus_mqttevh_set_add_user_property_user_data user_data = { + ctx->publish.add_user_properties, + config + }; + + g_list_foreach( + add_user_properties_array_items, + (GFunc)janus_mqttevh_set_add_user_property, + (gpointer)&user_data + ); + } + } + } + } +#endif + if(!janus_mqtt_evh_enabled) { JANUS_LOG(LOG_WARN, "MQTT event handler support disabled, giving up\n"); goto error; } /* Create a MQTT client */ - res = MQTTAsync_create( + MQTTAsync_createOptions create_options = MQTTAsync_createOptions_initializer; + +#ifdef MQTTVERSION_5 + if (ctx->connect.mqtt_version == MQTTVERSION_5) { + create_options.MQTTVersion = MQTTVERSION_5; + } +#endif + + res = MQTTAsync_createWithOptions( &ctx->client, ctx->connect.url, ctx->connect.client_id, MQTTCLIENT_PERSISTENCE_NONE, - NULL); + NULL, + &create_options + ); if(res != MQTTASYNC_SUCCESS) { JANUS_LOG(LOG_FATAL, "Can't setup library for connection to MQTT broker %s: error %d creating client...\n", ctx->connect.url, res); goto error; } + /* Set callbacks. We should not really subscribe to anything but nevertheless */ res = MQTTAsync_setCallbacks(ctx->client, ctx, @@ -812,6 +987,7 @@ static int janus_mqttevh_init(const char *config_path) { ctx->connect.url, res); goto error; } + JANUS_LOG(LOG_INFO, "Event handler: About to connect to MQTT broker %s: ...\n", ctx->connect.url); @@ -1005,3 +1181,52 @@ static void *janus_mqttevh_handler(void *data) { JANUS_LOG(LOG_VERB, "Leaving MQTTEventHandler handler thread\n"); return NULL; } + +int janus_mqttevh_client_get_response_code(MQTTAsync_failureData *response) { + return response ? response->code : 0; +} + +#ifdef MQTTVERSION_5 +int janus_mqttevh_client_get_response_code5(MQTTAsync_failureData5 *response) { + return response ? response->code : 0; +} + +void janus_mqttevh_add_properties(GArray *user_properties, MQTTProperties *properties) { + if(user_properties == NULL || user_properties->len == 0) { + return; + } + + for(uint i = 0; i < user_properties->len; i++) { + MQTTProperty *property = &g_array_index(user_properties, MQTTProperty, i); + int rc = MQTTProperties_add(properties, property); + if(rc != 0) { + JANUS_LOG(LOG_ERR, "Failed to user properties to MQTT response\n"); + } + } +} + +void janus_mqttevh_set_add_user_property(gpointer item_ptr, gpointer user_data_ptr) { + janus_config_item *item = (janus_config_item*)item_ptr; + if(item->value != NULL) { + return; + } + + janus_mqttevh_set_add_user_property_user_data *user_data = (janus_mqttevh_set_add_user_property_user_data*)user_data_ptr; + GList *key_value = janus_config_get_items(user_data->config, item); + if(key_value == NULL || g_list_length(key_value) != 2) { + JANUS_LOG(LOG_ERR, "Expected a key-value pair\n"); + return; + } + + janus_config_item *key_item = (janus_config_item*)g_list_first(key_value)->data; + janus_config_item *value_item = (janus_config_item*)g_list_last(key_value)->data; + + MQTTProperty property; + property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY; + property.value.data.data = g_strdup(key_item->value); + property.value.data.len = strlen(key_item->value); + property.value.value.data = g_strdup(value_item->value); + property.value.value.len = strlen(value_item->value); + g_array_append_val(user_data->acc, property); +} +#endif From dc02133bab781d6f159e5655002d321f5481341e Mon Sep 17 00:00:00 2001 From: Timofey Martynov Date: Tue, 14 Jul 2020 16:23:54 +0300 Subject: [PATCH 5/5] Code style fixes --- conf/janus.transport.mqtt.jcfg.sample | 2 +- events/janus_mqttevh.c | 136 +++++++++++++------------- transports/janus_mqtt.c | 84 +++++++--------- 3 files changed, 104 insertions(+), 118 deletions(-) diff --git a/conf/janus.transport.mqtt.jcfg.sample b/conf/janus.transport.mqtt.jcfg.sample index 9418043d52..f5a706cfcc 100644 --- a/conf/janus.transport.mqtt.jcfg.sample +++ b/conf/janus.transport.mqtt.jcfg.sample @@ -29,7 +29,7 @@ general: { #keyfile = /path/to/key.pem # These options work with MQTT 5 only. - #vacuum_interval = 60 # Interval for removing old transcation states in seconds. + #vacuum_interval = 60 # Interval for removing old transaction states in seconds. #proxy_transaction_user_properties = [] # Array of user property names to copy from the incoming message. #add_transaction_user_properties = () # List of user property ["key", "value"] pairs to add. } diff --git a/events/janus_mqttevh.c b/events/janus_mqttevh.c index 7b994de9f1..2f85825d31 100644 --- a/events/janus_mqttevh.c +++ b/events/janus_mqttevh.c @@ -91,28 +91,28 @@ static GThread *handler_thread; static volatile gint initialized = 0, stopping = 0; /* JSON serialization options */ -#define JANUS_MQTTEVH_DEFAULT_ADDPLUGIN 1 -#define JANUS_MQTTEVH_DEFAULT_ADDEVENT 1 -#define JANUS_MQTTEVH_DEFAULT_KEEPALIVE 30 -#define JANUS_MQTTEVH_DEFAULT_CLEANSESSION 0 /* Off */ -#define JANUS_MQTTEVH_DEFAULT_TIMEOUT 30 +#define JANUS_MQTTEVH_DEFAULT_ADDPLUGIN 1 +#define JANUS_MQTTEVH_DEFAULT_ADDEVENT 1 +#define JANUS_MQTTEVH_DEFAULT_KEEPALIVE 30 +#define JANUS_MQTTEVH_DEFAULT_CLEANSESSION 0 /* Off */ +#define JANUS_MQTTEVH_DEFAULT_TIMEOUT 30 #define JANUS_MQTTEVH_DEFAULT_DISCONNECT_TIMEOUT 100 -#define JANUS_MQTTEVH_DEFAULT_QOS 0 -#define JANUS_MQTTEVH_DEFAULT_RETAIN 0 +#define JANUS_MQTTEVH_DEFAULT_QOS 0 +#define JANUS_MQTTEVH_DEFAULT_RETAIN 0 #define JANUS_MQTTEVH_DEFAULT_CONNECT_STATUS "{\"event\": \"connected\", \"eventhandler\": \""JANUS_MQTTEVH_PACKAGE"\"}" #define JANUS_MQTTEVH_DEFAULT_DISCONNECT_STATUS "{\"event\": \"disconnected\"}" -#define JANUS_MQTTEVH_DEFAULT_WILL_RETAIN 1 -#define JANUS_MQTTEVH_DEFAULT_WILL_QOS 0 -#define JANUS_MQTTEVH_DEFAULT_BASETOPIC "/janus/events" -#define JANUS_MQTTEVH_DEFAULT_MQTTURL "tcp://localhost:1883" -#define JANUS_MQTTEVH_DEFAULT_JSON_FORMAT JSON_INDENT(3) | JSON_PRESERVE_ORDER -#define JANUS_MQTTEVH_DEFAULT_TLS_ENABLE FALSE +#define JANUS_MQTTEVH_DEFAULT_WILL_RETAIN 1 +#define JANUS_MQTTEVH_DEFAULT_WILL_QOS 0 +#define JANUS_MQTTEVH_DEFAULT_BASETOPIC "/janus/events" +#define JANUS_MQTTEVH_DEFAULT_MQTTURL "tcp://localhost:1883" +#define JANUS_MQTTEVH_DEFAULT_JSON_FORMAT JSON_INDENT(3) | JSON_PRESERVE_ORDER +#define JANUS_MQTTEVH_DEFAULT_TLS_ENABLE FALSE #define JANUS_MQTTEVH_DEFAULT_TLS_VERIFY_PEER FALSE #define JANUS_MQTTEVH_DEFAULT_TLS_VERIFY_HOST FALSE -#define JANUS_MQTTEVH_VERSION_3_1 "3.1" -#define JANUS_MQTTEVH_VERSION_3_1_1 "3.1.1" -#define JANUS_MQTTEVH_VERSION_5 "5" +#define JANUS_MQTTEVH_VERSION_3_1 "3.1" +#define JANUS_MQTTEVH_VERSION_3_1_1 "3.1.1" +#define JANUS_MQTTEVH_VERSION_5 "5" #define JANUS_MQTTEVH_VERSION_DEFAULT JANUS_MQTTEVH_VERSION_3_1_1 static size_t json_format = JANUS_MQTTEVH_DEFAULT_JSON_FORMAT; @@ -145,7 +145,7 @@ typedef struct janus_mqttevh_context { /* Connection data - authentication and url */ struct { - int mqtt_version; + int mqtt_version; int keep_alive_interval; int cleansession; char *client_id; @@ -165,9 +165,9 @@ typedef struct janus_mqttevh_context { char *disconnect_status; int qos; int retain; - #ifdef MQTTVERSION_5 - GArray *add_user_properties; - #endif +#ifdef MQTTVERSION_5 + GArray *add_user_properties; +#endif } publish; /* If we loose connection, the will is our last publish */ @@ -541,14 +541,14 @@ static int janus_mqttevh_client_publish_message(janus_mqttevh_context *ctx, cons #ifdef MQTTVERSION_5 static int janus_mqttevh_client_publish_message5(janus_mqttevh_context *ctx, const char *topic, int retain, char *payload, MQTTProperties *properties) { - int rc; + int rc; MQTTAsync_message msg = MQTTAsync_message_initializer; msg.payload = payload; msg.payloadlen = strlen(payload); msg.qos = ctx->publish.qos; msg.retained = retain; - msg.properties = MQTTProperties_copy(properties); + msg.properties = MQTTProperties_copy(properties); MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer; options.context = ctx; @@ -568,12 +568,12 @@ static int janus_mqttevh_client_publish_message5(janus_mqttevh_context *ctx, con /* Callback for successful MQTT publish */ static void janus_mqttevh_client_publish_message_success(void *context, MQTTAsync_successData *response) { - janus_mqttevh_client_publish_message_success_impl(context); + janus_mqttevh_client_publish_message_success_impl(context); } #ifdef MQTTVERSION_5 static void janus_mqttevh_client_publish_message_success5(void *context, MQTTAsync_successData5 *response) { - janus_mqttevh_client_publish_message_success_impl(context); + janus_mqttevh_client_publish_message_success_impl(context); } #endif @@ -586,7 +586,7 @@ static void janus_mqttevh_client_publish_message_success_impl(void *context) { * Should we bring message into queue? Right now, we just drop it. */ static void janus_mqttevh_client_publish_message_failure(void *context, MQTTAsync_failureData *response) { - int rc = janus_mqttevh_client_get_response_code(response); + int rc = janus_mqttevh_client_get_response_code(response); janus_mqttevh_client_publish_message_failure_impl(context, rc); } @@ -920,30 +920,30 @@ static int janus_mqttevh_init(const char *config_path) { } #ifdef MQTTVERSION_5 - if (ctx->connect.mqtt_version == MQTTVERSION_5) { - /* MQTT 5 specific configuration */ - janus_config_array *add_user_properties_array = janus_config_get(config, config_general, janus_config_type_array, "add_user_properties"); - if(add_user_properties_array) { - GList *add_user_properties_array_items = janus_config_get_arrays(config, add_user_properties_array); - if(add_user_properties_array_items != NULL) { - int add_user_properties_array_len = g_list_length(add_user_properties_array_items); - if(add_user_properties_array_len > 0) { - ctx->publish.add_user_properties = g_array_sized_new(FALSE, FALSE, sizeof(MQTTProperty), add_user_properties_array_len); - - janus_mqttevh_set_add_user_property_user_data user_data = { - ctx->publish.add_user_properties, - config - }; - - g_list_foreach( - add_user_properties_array_items, - (GFunc)janus_mqttevh_set_add_user_property, - (gpointer)&user_data - ); - } - } - } - } + if (ctx->connect.mqtt_version == MQTTVERSION_5) { + /* MQTT 5 specific configuration */ + janus_config_array *add_user_properties_array = janus_config_get(config, config_general, janus_config_type_array, "add_user_properties"); + if(add_user_properties_array) { + GList *add_user_properties_array_items = janus_config_get_arrays(config, add_user_properties_array); + if(add_user_properties_array_items != NULL) { + int add_user_properties_array_len = g_list_length(add_user_properties_array_items); + if(add_user_properties_array_len > 0) { + ctx->publish.add_user_properties = g_array_sized_new(FALSE, FALSE, sizeof(MQTTProperty), add_user_properties_array_len); + + janus_mqttevh_set_add_user_property_user_data user_data = { + ctx->publish.add_user_properties, + config + }; + + g_list_foreach( + add_user_properties_array_items, + (GFunc)janus_mqttevh_set_add_user_property, + (gpointer)&user_data + ); + } + } + } + } #endif if(!janus_mqtt_evh_enabled) { @@ -1083,9 +1083,8 @@ static void janus_mqttevh_incoming_event(json_t *event) { } json_t *janus_mqttevh_handle_request(json_t *request) { - if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) { - return NULL; - } + if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) return NULL; + /* We can use this requests to apply tweaks to the logic */ int error_code = 0; char error_cause[512]; @@ -1142,9 +1141,8 @@ static void *janus_mqttevh_handler(void *data) { while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) { /* Get event from queue */ event = g_async_queue_pop(events); - if(event == &exit_event) { - break; - } + if(event == &exit_event) break; + /* Handle event: just for fun, let's see how long it took for us to take care of this */ json_t *created = json_object_get(event, "timestamp"); if(created && json_is_integer(created)) { @@ -1192,9 +1190,7 @@ int janus_mqttevh_client_get_response_code5(MQTTAsync_failureData5 *response) { } void janus_mqttevh_add_properties(GArray *user_properties, MQTTProperties *properties) { - if(user_properties == NULL || user_properties->len == 0) { - return; - } + if(user_properties == NULL || user_properties->len == 0) return; for(uint i = 0; i < user_properties->len; i++) { MQTTProperty *property = &g_array_index(user_properties, MQTTProperty, i); @@ -1207,9 +1203,7 @@ void janus_mqttevh_add_properties(GArray *user_properties, MQTTProperties *prope void janus_mqttevh_set_add_user_property(gpointer item_ptr, gpointer user_data_ptr) { janus_config_item *item = (janus_config_item*)item_ptr; - if(item->value != NULL) { - return; - } + if(item->value != NULL) return; janus_mqttevh_set_add_user_property_user_data *user_data = (janus_mqttevh_set_add_user_property_user_data*)user_data_ptr; GList *key_value = janus_config_get_items(user_data->config, item); @@ -1221,12 +1215,18 @@ void janus_mqttevh_set_add_user_property(gpointer item_ptr, gpointer user_data_p janus_config_item *key_item = (janus_config_item*)g_list_first(key_value)->data; janus_config_item *value_item = (janus_config_item*)g_list_last(key_value)->data; - MQTTProperty property; - property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY; - property.value.data.data = g_strdup(key_item->value); - property.value.data.len = strlen(key_item->value); - property.value.value.data = g_strdup(value_item->value); - property.value.value.len = strlen(value_item->value); - g_array_append_val(user_data->acc, property); + if(key_item->value == NULL) { + JANUS_LOG(LOG_ERR, "Expected key item to have a value\n"); + } else if(value_item->value == NULL) { + JANUS_LOG(LOG_ERR, "Expected value item to have a value\n"); + } else { + MQTTProperty property; + property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY; + property.value.data.data = g_strdup(key_item->value); + property.value.data.len = strlen(key_item->value); + property.value.value.data = g_strdup(value_item->value); + property.value.value.len = strlen(value_item->value); + g_array_append_val(user_data->acc, property); + } } #endif diff --git a/transports/janus_mqtt.c b/transports/janus_mqtt.c index 45b51f09d8..392206b6a2 100644 --- a/transports/janus_mqtt.c +++ b/transports/janus_mqtt.c @@ -35,12 +35,12 @@ #include "../utils.h" /* Transport plugin information */ -#define JANUS_MQTT_VERSION 1 +#define JANUS_MQTT_VERSION 1 #define JANUS_MQTT_VERSION_STRING "0.0.1" -#define JANUS_MQTT_DESCRIPTION "This transport plugin adds MQTT support to the Janus API via Paho client library." -#define JANUS_MQTT_NAME "JANUS MQTT transport plugin" -#define JANUS_MQTT_AUTHOR "Andrei Nesterov " -#define JANUS_MQTT_PACKAGE "janus.transport.mqtt" +#define JANUS_MQTT_DESCRIPTION "This transport plugin adds MQTT support to the Janus API via Paho client library." +#define JANUS_MQTT_NAME "JANUS MQTT transport plugin" +#define JANUS_MQTT_AUTHOR "Andrei Nesterov " +#define JANUS_MQTT_PACKAGE "janus.transport.mqtt" /* Transport methods */ janus_transport *create(void); @@ -60,10 +60,10 @@ void janus_mqtt_session_created(janus_transport_session *transport, guint64 sess void janus_mqtt_session_over(janus_transport_session *transport, guint64 session_id, gboolean timeout, gboolean claimed); void janus_mqtt_session_claimed(janus_transport_session *transport, guint64 session_id); -#define JANUS_MQTT_VERSION_3_1 "3.1" -#define JANUS_MQTT_VERSION_3_1_1 "3.1.1" -#define JANUS_MQTT_VERSION_5 "5" -#define JANUS_MQTT_VERSION_DEFAULT JANUS_MQTT_VERSION_3_1_1 +#define JANUS_MQTT_VERSION_3_1 "3.1" +#define JANUS_MQTT_VERSION_3_1_1 "3.1.1" +#define JANUS_MQTT_VERSION_5 "5" +#define JANUS_MQTT_VERSION_DEFAULT JANUS_MQTT_VERSION_3_1_1 #define JANUS_MQTT_DEFAULT_STATUS_TOPIC "status" #define JANUS_MQTT_DEFAULT_STATUS_QOS 1 @@ -137,10 +137,10 @@ typedef struct janus_mqtt_context { char *topic; int qos; gboolean retain; - #ifdef MQTTVERSION_5 +#ifdef MQTTVERSION_5 GArray *proxy_transaction_user_properties; GArray *add_transaction_user_properties; - #endif +#endif } publish; struct { struct { @@ -159,7 +159,7 @@ typedef struct janus_mqtt_context { char *key_file; gboolean verify_peer; #ifdef MQTTVERSION_5 - int vacuum_interval; + gint64 vacuum_interval; #endif } janus_mqtt_context; @@ -167,7 +167,7 @@ typedef struct janus_mqtt_context { /* MQTT 5 specific types */ typedef struct janus_mqtt_transaction_state { MQTTProperties *properties; - time_t created_at; + gint64 created_at; } janus_mqtt_transaction_state; typedef struct janus_mqtt_set_add_transaction_user_property_user_data { @@ -466,7 +466,7 @@ int janus_mqtt_init(janus_transport_callbacks *callback, const char *config_path ctx->publish.qos = 1; } - #ifdef MQTTVERSION_5 +#ifdef MQTTVERSION_5 if (ctx->connect.mqtt_version == MQTTVERSION_5) { /* MQTT 5 specific configuration */ janus_config_array *proxy_transaction_user_properties_array = janus_config_get(config, config_general, janus_config_type_array, "proxy_transaction_user_properties"); @@ -476,7 +476,6 @@ int janus_mqtt_init(janus_transport_callbacks *callback, const char *config_path int proxy_transaction_user_properties_array_len = g_list_length(proxy_transaction_user_properties_array_items); if(proxy_transaction_user_properties_array_len > 0) { ctx->publish.proxy_transaction_user_properties = g_array_sized_new(FALSE, FALSE, sizeof(char*), proxy_transaction_user_properties_array_len); - g_list_foreach( proxy_transaction_user_properties_array_items, (GFunc)janus_mqtt_set_proxy_transaction_user_property, @@ -493,12 +492,10 @@ int janus_mqtt_init(janus_transport_callbacks *callback, const char *config_path int add_transaction_user_properties_array_len = g_list_length(add_transaction_user_properties_array_items); if(add_transaction_user_properties_array_len > 0) { ctx->publish.add_transaction_user_properties = g_array_sized_new(FALSE, FALSE, sizeof(MQTTProperty), add_transaction_user_properties_array_len); - janus_mqtt_set_add_transaction_user_property_user_data user_data = { ctx->publish.add_transaction_user_properties, config }; - g_list_foreach( add_transaction_user_properties_array_items, (GFunc)janus_mqtt_set_add_transaction_user_property, @@ -508,7 +505,7 @@ int janus_mqtt_init(janus_transport_callbacks *callback, const char *config_path } } } - #endif +#endif } } else { janus_mqtt_api_enabled_ = FALSE; @@ -626,7 +623,7 @@ int janus_mqtt_init(janus_transport_callbacks *callback, const char *config_path /* Getting vacuum interval from config */ janus_config_item *vacuum_interval_item = janus_config_get(config, config_general, janus_config_type_item, "vacuum_interval"); - ctx->vacuum_interval = (vacuum_interval_item && vacuum_interval_item->value) ? atoi(vacuum_interval_item->value) : 60; + ctx->vacuum_interval = (vacuum_interval_item && vacuum_interval_item->value) ? (gint64)atoi(vacuum_interval_item->value) : 60; if(ctx->vacuum_interval <= 0) { JANUS_LOG(LOG_ERR, "Invalid vacuum interval value: %s (falling back to default)\n", vacuum_interval_item->value); ctx->vacuum_interval = 60; @@ -741,9 +738,7 @@ void janus_mqtt_destroy(void) { #ifdef MQTTVERSION_5 void janus_mqtt_set_proxy_transaction_user_property(gpointer item_ptr, gpointer acc_ptr) { janus_config_item *item = (janus_config_item*)item_ptr; - if(item->value == NULL) { - return; - } + if(item->value == NULL) return; gchar* name = g_strdup(item->value); g_array_append_val((GArray *)acc_ptr, name); @@ -751,9 +746,7 @@ void janus_mqtt_set_proxy_transaction_user_property(gpointer item_ptr, gpointer void janus_mqtt_set_add_transaction_user_property(gpointer item_ptr, gpointer user_data_ptr) { janus_config_item *item = (janus_config_item*)item_ptr; - if(item->value != NULL) { - return; - } + if(item->value != NULL) return; janus_mqtt_set_add_transaction_user_property_user_data *user_data = (janus_mqtt_set_add_transaction_user_property_user_data*)user_data_ptr; GList *key_value = janus_config_get_items(user_data->config, item); @@ -812,9 +805,8 @@ gboolean janus_mqtt_is_admin_api_enabled(void) { } int janus_mqtt_send_message(janus_transport_session *transport, void *request_id, gboolean admin, json_t *message) { - if(message == NULL || transport == NULL) { - return -1; - } + if(message == NULL || transport == NULL) return -1; + /* Not really needed as we always only have a single context, but that's fine */ janus_mqtt_context *ctx = (janus_mqtt_context *)transport->transport_p; if(ctx == NULL) { @@ -833,7 +825,7 @@ int janus_mqtt_send_message(janus_transport_session *transport, void *request_id char *transaction = g_strdup(json_string_value(json_object_get(message, "transaction"))); janus_mqtt_transaction_state *state = NULL; - if (transaction != NULL) { + if(transaction != NULL) { g_rw_lock_reader_lock(&janus_mqtt_transaction_states_lock); state = g_hash_table_lookup(janus_mqtt_transaction_states, transaction); @@ -847,7 +839,7 @@ int janus_mqtt_send_message(janus_transport_session *transport, void *request_id } rc = janus_mqtt_client_publish_message5(ctx, payload, admin, &properties, response_topic); - if (response_topic != NULL) g_free(response_topic); + if(response_topic != NULL) g_free(response_topic); MQTTProperties_free(&properties); } else { rc = janus_mqtt_client_publish_message(ctx, payload, admin); @@ -888,15 +880,11 @@ void janus_mqtt_proxy_properties(janus_mqtt_transaction_state *state, GArray *us } /* Proxy additional user properties from config */ - if(user_property_names == NULL || user_property_names->len == 0) { - return; - } + if(user_property_names == NULL || user_property_names->len == 0) return; for(int i = 0; i < state->properties->count; i++) { MQTTProperty request_prop = state->properties->array[i]; - if(request_prop.identifier != MQTTPROPERTY_CODE_USER_PROPERTY) { - continue; - } + if(request_prop.identifier != MQTTPROPERTY_CODE_USER_PROPERTY) continue; for(uint j = 0; j < user_property_names->len; j++) { char *key = (char*)g_array_index(user_property_names, char*, j); @@ -922,9 +910,7 @@ void janus_mqtt_proxy_properties(janus_mqtt_transaction_state *state, GArray *us } void janus_mqtt_add_properties(janus_mqtt_transaction_state *state, GArray *user_properties, MQTTProperties *properties) { - if(user_properties == NULL || user_properties->len == 0) { - return; - } + if(user_properties == NULL || user_properties->len == 0) return; for(uint i = 0; i < user_properties->len; i++) { MQTTProperty *property = &g_array_index(user_properties, MQTTProperty, i); @@ -1026,7 +1012,7 @@ int janus_mqtt_client_message_arrived(void *context, char *topicName, int topicL json_error_t error; json_t *root = json_loadb(message->payload, message->payloadlen, 0, &error); - #ifdef MQTTVERSION_5 +#ifdef MQTTVERSION_5 if(ctx->connect.mqtt_version == MQTTVERSION_5 && !admin) { /* Save MQTT 5 properties copy to the state */ const gchar *transaction = g_strdup(json_string_value(json_object_get(root, "transaction"))); @@ -1040,13 +1026,13 @@ int janus_mqtt_client_message_arrived(void *context, char *topicName, int topicL janus_mqtt_transaction_state *state = g_malloc(sizeof(janus_mqtt_transaction_state)); state->properties = properties; - state->created_at = time(0); + state->created_at = janus_get_monotonic_time(); g_rw_lock_writer_lock(&janus_mqtt_transaction_states_lock); g_hash_table_insert(janus_mqtt_transaction_states, (gpointer) transaction, (gpointer) state); g_rw_lock_writer_unlock(&janus_mqtt_transaction_states_lock); } - #endif +#endif ctx->gateway->incoming_request(&janus_mqtt_transport_, mqtt_session, NULL, admin, root, &error); } @@ -1254,7 +1240,7 @@ int janus_mqtt_client_subscribe(janus_mqtt_context *ctx, gboolean admin) { MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer; options.context = ctx; if(admin) { - #ifdef MQTTVERSION_5 +#ifdef MQTTVERSION_5 if(ctx->connect.mqtt_version == MQTTVERSION_5) { options.onSuccess5 = janus_mqtt_client_admin_subscribe_success5; options.onFailure5 = janus_mqtt_client_admin_subscribe_failure5; @@ -1262,13 +1248,13 @@ int janus_mqtt_client_subscribe(janus_mqtt_context *ctx, gboolean admin) { options.onSuccess = janus_mqtt_client_admin_subscribe_success; options.onFailure = janus_mqtt_client_admin_subscribe_failure; } - #else +#else options.onSuccess = janus_mqtt_client_admin_subscribe_success; options.onFailure = janus_mqtt_client_admin_subscribe_failure; - #endif +#endif return MQTTAsync_subscribe(ctx->client, ctx->admin.subscribe.topic, ctx->admin.subscribe.qos, &options); } else { - #ifdef MQTTVERSION_5 +#ifdef MQTTVERSION_5 if(ctx->connect.mqtt_version == MQTTVERSION_5) { options.onSuccess5 = janus_mqtt_client_subscribe_success5; options.onFailure5 = janus_mqtt_client_subscribe_failure5; @@ -1276,10 +1262,10 @@ int janus_mqtt_client_subscribe(janus_mqtt_context *ctx, gboolean admin) { options.onSuccess = janus_mqtt_client_subscribe_success; options.onFailure = janus_mqtt_client_subscribe_failure; } - #else +#else options.onSuccess = janus_mqtt_client_subscribe_success; options.onFailure = janus_mqtt_client_subscribe_failure; - #endif +#endif return MQTTAsync_subscribe(ctx->client, ctx->subscribe.topic, ctx->subscribe.qos, &options); } } @@ -1600,7 +1586,7 @@ static gboolean janus_mqtt_vacuum(gpointer context) { while (g_hash_table_iter_next(&iter, NULL, &value)) { janus_mqtt_transaction_state* state = value; - if(time(0) - state->created_at > ctx->vacuum_interval) { + if(janus_get_monotonic_time() - state->created_at > ctx->vacuum_interval) { g_hash_table_iter_remove(&iter); } }