diff --git a/conf/janus.transport.mqtt.jcfg.sample b/conf/janus.transport.mqtt.jcfg.sample index 9418043d52f..f5a706cfcc9 100644 --- a/conf/janus.transport.mqtt.jcfg.sample +++ b/conf/janus.transport.mqtt.jcfg.sample @@ -29,7 +29,7 @@ general: { #keyfile = /path/to/key.pem # These options work with MQTT 5 only. - #vacuum_interval = 60 # Interval for removing old transcation states in seconds. + #vacuum_interval = 60 # Interval for removing old transaction states in seconds. #proxy_transaction_user_properties = [] # Array of user property names to copy from the incoming message. #add_transaction_user_properties = () # List of user property ["key", "value"] pairs to add. } diff --git a/events/janus_mqttevh.c b/events/janus_mqttevh.c index 7b994de9f19..04cac83a479 100644 --- a/events/janus_mqttevh.c +++ b/events/janus_mqttevh.c @@ -91,28 +91,28 @@ static GThread *handler_thread; static volatile gint initialized = 0, stopping = 0; /* JSON serialization options */ -#define JANUS_MQTTEVH_DEFAULT_ADDPLUGIN 1 -#define JANUS_MQTTEVH_DEFAULT_ADDEVENT 1 -#define JANUS_MQTTEVH_DEFAULT_KEEPALIVE 30 -#define JANUS_MQTTEVH_DEFAULT_CLEANSESSION 0 /* Off */ -#define JANUS_MQTTEVH_DEFAULT_TIMEOUT 30 +#define JANUS_MQTTEVH_DEFAULT_ADDPLUGIN 1 +#define JANUS_MQTTEVH_DEFAULT_ADDEVENT 1 +#define JANUS_MQTTEVH_DEFAULT_KEEPALIVE 30 +#define JANUS_MQTTEVH_DEFAULT_CLEANSESSION 0 /* Off */ +#define JANUS_MQTTEVH_DEFAULT_TIMEOUT 30 #define JANUS_MQTTEVH_DEFAULT_DISCONNECT_TIMEOUT 100 -#define JANUS_MQTTEVH_DEFAULT_QOS 0 -#define JANUS_MQTTEVH_DEFAULT_RETAIN 0 +#define JANUS_MQTTEVH_DEFAULT_QOS 0 +#define JANUS_MQTTEVH_DEFAULT_RETAIN 0 #define JANUS_MQTTEVH_DEFAULT_CONNECT_STATUS "{\"event\": \"connected\", \"eventhandler\": \""JANUS_MQTTEVH_PACKAGE"\"}" #define JANUS_MQTTEVH_DEFAULT_DISCONNECT_STATUS "{\"event\": \"disconnected\"}" -#define JANUS_MQTTEVH_DEFAULT_WILL_RETAIN 1 -#define JANUS_MQTTEVH_DEFAULT_WILL_QOS 0 -#define JANUS_MQTTEVH_DEFAULT_BASETOPIC "/janus/events" -#define JANUS_MQTTEVH_DEFAULT_MQTTURL "tcp://localhost:1883" -#define JANUS_MQTTEVH_DEFAULT_JSON_FORMAT JSON_INDENT(3) | JSON_PRESERVE_ORDER -#define JANUS_MQTTEVH_DEFAULT_TLS_ENABLE FALSE +#define JANUS_MQTTEVH_DEFAULT_WILL_RETAIN 1 +#define JANUS_MQTTEVH_DEFAULT_WILL_QOS 0 +#define JANUS_MQTTEVH_DEFAULT_BASETOPIC "/janus/events" +#define JANUS_MQTTEVH_DEFAULT_MQTTURL "tcp://localhost:1883" +#define JANUS_MQTTEVH_DEFAULT_JSON_FORMAT JSON_INDENT(3) | JSON_PRESERVE_ORDER +#define JANUS_MQTTEVH_DEFAULT_TLS_ENABLE FALSE #define JANUS_MQTTEVH_DEFAULT_TLS_VERIFY_PEER FALSE #define JANUS_MQTTEVH_DEFAULT_TLS_VERIFY_HOST FALSE -#define JANUS_MQTTEVH_VERSION_3_1 "3.1" -#define JANUS_MQTTEVH_VERSION_3_1_1 "3.1.1" -#define JANUS_MQTTEVH_VERSION_5 "5" +#define JANUS_MQTTEVH_VERSION_3_1 "3.1" +#define JANUS_MQTTEVH_VERSION_3_1_1 "3.1.1" +#define JANUS_MQTTEVH_VERSION_5 "5" #define JANUS_MQTTEVH_VERSION_DEFAULT JANUS_MQTTEVH_VERSION_3_1_1 static size_t json_format = JANUS_MQTTEVH_DEFAULT_JSON_FORMAT; @@ -145,7 +145,7 @@ typedef struct janus_mqttevh_context { /* Connection data - authentication and url */ struct { - int mqtt_version; + int mqtt_version; int keep_alive_interval; int cleansession; char *client_id; @@ -165,9 +165,9 @@ typedef struct janus_mqttevh_context { char *disconnect_status; int qos; int retain; - #ifdef MQTTVERSION_5 - GArray *add_user_properties; - #endif +#ifdef MQTTVERSION_5 + GArray *add_user_properties; +#endif } publish; /* If we loose connection, the will is our last publish */ @@ -541,14 +541,14 @@ static int janus_mqttevh_client_publish_message(janus_mqttevh_context *ctx, cons #ifdef MQTTVERSION_5 static int janus_mqttevh_client_publish_message5(janus_mqttevh_context *ctx, const char *topic, int retain, char *payload, MQTTProperties *properties) { - int rc; + int rc; MQTTAsync_message msg = MQTTAsync_message_initializer; msg.payload = payload; msg.payloadlen = strlen(payload); msg.qos = ctx->publish.qos; msg.retained = retain; - msg.properties = MQTTProperties_copy(properties); + msg.properties = MQTTProperties_copy(properties); MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer; options.context = ctx; @@ -568,12 +568,12 @@ static int janus_mqttevh_client_publish_message5(janus_mqttevh_context *ctx, con /* Callback for successful MQTT publish */ static void janus_mqttevh_client_publish_message_success(void *context, MQTTAsync_successData *response) { - janus_mqttevh_client_publish_message_success_impl(context); + janus_mqttevh_client_publish_message_success_impl(context); } #ifdef MQTTVERSION_5 static void janus_mqttevh_client_publish_message_success5(void *context, MQTTAsync_successData5 *response) { - janus_mqttevh_client_publish_message_success_impl(context); + janus_mqttevh_client_publish_message_success_impl(context); } #endif @@ -586,7 +586,7 @@ static void janus_mqttevh_client_publish_message_success_impl(void *context) { * Should we bring message into queue? Right now, we just drop it. */ static void janus_mqttevh_client_publish_message_failure(void *context, MQTTAsync_failureData *response) { - int rc = janus_mqttevh_client_get_response_code(response); + int rc = janus_mqttevh_client_get_response_code(response); janus_mqttevh_client_publish_message_failure_impl(context, rc); } @@ -920,30 +920,30 @@ static int janus_mqttevh_init(const char *config_path) { } #ifdef MQTTVERSION_5 - if (ctx->connect.mqtt_version == MQTTVERSION_5) { - /* MQTT 5 specific configuration */ - janus_config_array *add_user_properties_array = janus_config_get(config, config_general, janus_config_type_array, "add_user_properties"); - if(add_user_properties_array) { - GList *add_user_properties_array_items = janus_config_get_arrays(config, add_user_properties_array); - if(add_user_properties_array_items != NULL) { - int add_user_properties_array_len = g_list_length(add_user_properties_array_items); - if(add_user_properties_array_len > 0) { - ctx->publish.add_user_properties = g_array_sized_new(FALSE, FALSE, sizeof(MQTTProperty), add_user_properties_array_len); - - janus_mqttevh_set_add_user_property_user_data user_data = { - ctx->publish.add_user_properties, - config - }; - - g_list_foreach( - add_user_properties_array_items, - (GFunc)janus_mqttevh_set_add_user_property, - (gpointer)&user_data - ); - } - } - } - } + if (ctx->connect.mqtt_version == MQTTVERSION_5) { + /* MQTT 5 specific configuration */ + janus_config_array *add_user_properties_array = janus_config_get(config, config_general, janus_config_type_array, "add_user_properties"); + if(add_user_properties_array) { + GList *add_user_properties_array_items = janus_config_get_arrays(config, add_user_properties_array); + if(add_user_properties_array_items != NULL) { + int add_user_properties_array_len = g_list_length(add_user_properties_array_items); + if(add_user_properties_array_len > 0) { + ctx->publish.add_user_properties = g_array_sized_new(FALSE, FALSE, sizeof(MQTTProperty), add_user_properties_array_len); + + janus_mqttevh_set_add_user_property_user_data user_data = { + ctx->publish.add_user_properties, + config + }; + + g_list_foreach( + add_user_properties_array_items, + (GFunc)janus_mqttevh_set_add_user_property, + (gpointer)&user_data + ); + } + } + } + } #endif if(!janus_mqtt_evh_enabled) { @@ -1083,9 +1083,8 @@ static void janus_mqttevh_incoming_event(json_t *event) { } json_t *janus_mqttevh_handle_request(json_t *request) { - if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) { - return NULL; - } + if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) return NULL; + /* We can use this requests to apply tweaks to the logic */ int error_code = 0; char error_cause[512]; @@ -1142,9 +1141,8 @@ static void *janus_mqttevh_handler(void *data) { while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) { /* Get event from queue */ event = g_async_queue_pop(events); - if(event == &exit_event) { - break; - } + if(event == &exit_event) break; + /* Handle event: just for fun, let's see how long it took for us to take care of this */ json_t *created = json_object_get(event, "timestamp"); if(created && json_is_integer(created)) { @@ -1192,9 +1190,7 @@ int janus_mqttevh_client_get_response_code5(MQTTAsync_failureData5 *response) { } void janus_mqttevh_add_properties(GArray *user_properties, MQTTProperties *properties) { - if(user_properties == NULL || user_properties->len == 0) { - return; - } + if(user_properties == NULL || user_properties->len == 0) return; for(uint i = 0; i < user_properties->len; i++) { MQTTProperty *property = &g_array_index(user_properties, MQTTProperty, i); @@ -1207,9 +1203,7 @@ void janus_mqttevh_add_properties(GArray *user_properties, MQTTProperties *prope void janus_mqttevh_set_add_user_property(gpointer item_ptr, gpointer user_data_ptr) { janus_config_item *item = (janus_config_item*)item_ptr; - if(item->value != NULL) { - return; - } + if(item->value != NULL) return; janus_mqttevh_set_add_user_property_user_data *user_data = (janus_mqttevh_set_add_user_property_user_data*)user_data_ptr; GList *key_value = janus_config_get_items(user_data->config, item); @@ -1221,12 +1215,18 @@ void janus_mqttevh_set_add_user_property(gpointer item_ptr, gpointer user_data_p janus_config_item *key_item = (janus_config_item*)g_list_first(key_value)->data; janus_config_item *value_item = (janus_config_item*)g_list_last(key_value)->data; - MQTTProperty property; - property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY; - property.value.data.data = g_strdup(key_item->value); - property.value.data.len = strlen(key_item->value); - property.value.value.data = g_strdup(value_item->value); - property.value.value.len = strlen(value_item->value); - g_array_append_val(user_data->acc, property); + if(key_item->value == NULL) { + JANUS_LOG(LOG_ERR, "Expected key item to have a value\n"); + } else if(value_item->value == NULL) { + JANUS_LOG(LOG_ERR, "Expected value item to have a value\n"); + } else { + MQTTProperty property; + property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY; + property.value.data.data = g_strdup(key_item->value); + property.value.data.len = strlen(key_item->value); + property.value.value.data = g_strdup(value_item->value); + property.value.value.len = strlen(value_item->value); + g_array_append_val(user_data->acc, property); + } } #endif diff --git a/transports/janus_mqtt.c b/transports/janus_mqtt.c index 45b51f09d87..392206b6a22 100644 --- a/transports/janus_mqtt.c +++ b/transports/janus_mqtt.c @@ -35,12 +35,12 @@ #include "../utils.h" /* Transport plugin information */ -#define JANUS_MQTT_VERSION 1 +#define JANUS_MQTT_VERSION 1 #define JANUS_MQTT_VERSION_STRING "0.0.1" -#define JANUS_MQTT_DESCRIPTION "This transport plugin adds MQTT support to the Janus API via Paho client library." -#define JANUS_MQTT_NAME "JANUS MQTT transport plugin" -#define JANUS_MQTT_AUTHOR "Andrei Nesterov " -#define JANUS_MQTT_PACKAGE "janus.transport.mqtt" +#define JANUS_MQTT_DESCRIPTION "This transport plugin adds MQTT support to the Janus API via Paho client library." +#define JANUS_MQTT_NAME "JANUS MQTT transport plugin" +#define JANUS_MQTT_AUTHOR "Andrei Nesterov " +#define JANUS_MQTT_PACKAGE "janus.transport.mqtt" /* Transport methods */ janus_transport *create(void); @@ -60,10 +60,10 @@ void janus_mqtt_session_created(janus_transport_session *transport, guint64 sess void janus_mqtt_session_over(janus_transport_session *transport, guint64 session_id, gboolean timeout, gboolean claimed); void janus_mqtt_session_claimed(janus_transport_session *transport, guint64 session_id); -#define JANUS_MQTT_VERSION_3_1 "3.1" -#define JANUS_MQTT_VERSION_3_1_1 "3.1.1" -#define JANUS_MQTT_VERSION_5 "5" -#define JANUS_MQTT_VERSION_DEFAULT JANUS_MQTT_VERSION_3_1_1 +#define JANUS_MQTT_VERSION_3_1 "3.1" +#define JANUS_MQTT_VERSION_3_1_1 "3.1.1" +#define JANUS_MQTT_VERSION_5 "5" +#define JANUS_MQTT_VERSION_DEFAULT JANUS_MQTT_VERSION_3_1_1 #define JANUS_MQTT_DEFAULT_STATUS_TOPIC "status" #define JANUS_MQTT_DEFAULT_STATUS_QOS 1 @@ -137,10 +137,10 @@ typedef struct janus_mqtt_context { char *topic; int qos; gboolean retain; - #ifdef MQTTVERSION_5 +#ifdef MQTTVERSION_5 GArray *proxy_transaction_user_properties; GArray *add_transaction_user_properties; - #endif +#endif } publish; struct { struct { @@ -159,7 +159,7 @@ typedef struct janus_mqtt_context { char *key_file; gboolean verify_peer; #ifdef MQTTVERSION_5 - int vacuum_interval; + gint64 vacuum_interval; #endif } janus_mqtt_context; @@ -167,7 +167,7 @@ typedef struct janus_mqtt_context { /* MQTT 5 specific types */ typedef struct janus_mqtt_transaction_state { MQTTProperties *properties; - time_t created_at; + gint64 created_at; } janus_mqtt_transaction_state; typedef struct janus_mqtt_set_add_transaction_user_property_user_data { @@ -466,7 +466,7 @@ int janus_mqtt_init(janus_transport_callbacks *callback, const char *config_path ctx->publish.qos = 1; } - #ifdef MQTTVERSION_5 +#ifdef MQTTVERSION_5 if (ctx->connect.mqtt_version == MQTTVERSION_5) { /* MQTT 5 specific configuration */ janus_config_array *proxy_transaction_user_properties_array = janus_config_get(config, config_general, janus_config_type_array, "proxy_transaction_user_properties"); @@ -476,7 +476,6 @@ int janus_mqtt_init(janus_transport_callbacks *callback, const char *config_path int proxy_transaction_user_properties_array_len = g_list_length(proxy_transaction_user_properties_array_items); if(proxy_transaction_user_properties_array_len > 0) { ctx->publish.proxy_transaction_user_properties = g_array_sized_new(FALSE, FALSE, sizeof(char*), proxy_transaction_user_properties_array_len); - g_list_foreach( proxy_transaction_user_properties_array_items, (GFunc)janus_mqtt_set_proxy_transaction_user_property, @@ -493,12 +492,10 @@ int janus_mqtt_init(janus_transport_callbacks *callback, const char *config_path int add_transaction_user_properties_array_len = g_list_length(add_transaction_user_properties_array_items); if(add_transaction_user_properties_array_len > 0) { ctx->publish.add_transaction_user_properties = g_array_sized_new(FALSE, FALSE, sizeof(MQTTProperty), add_transaction_user_properties_array_len); - janus_mqtt_set_add_transaction_user_property_user_data user_data = { ctx->publish.add_transaction_user_properties, config }; - g_list_foreach( add_transaction_user_properties_array_items, (GFunc)janus_mqtt_set_add_transaction_user_property, @@ -508,7 +505,7 @@ int janus_mqtt_init(janus_transport_callbacks *callback, const char *config_path } } } - #endif +#endif } } else { janus_mqtt_api_enabled_ = FALSE; @@ -626,7 +623,7 @@ int janus_mqtt_init(janus_transport_callbacks *callback, const char *config_path /* Getting vacuum interval from config */ janus_config_item *vacuum_interval_item = janus_config_get(config, config_general, janus_config_type_item, "vacuum_interval"); - ctx->vacuum_interval = (vacuum_interval_item && vacuum_interval_item->value) ? atoi(vacuum_interval_item->value) : 60; + ctx->vacuum_interval = (vacuum_interval_item && vacuum_interval_item->value) ? (gint64)atoi(vacuum_interval_item->value) : 60; if(ctx->vacuum_interval <= 0) { JANUS_LOG(LOG_ERR, "Invalid vacuum interval value: %s (falling back to default)\n", vacuum_interval_item->value); ctx->vacuum_interval = 60; @@ -741,9 +738,7 @@ void janus_mqtt_destroy(void) { #ifdef MQTTVERSION_5 void janus_mqtt_set_proxy_transaction_user_property(gpointer item_ptr, gpointer acc_ptr) { janus_config_item *item = (janus_config_item*)item_ptr; - if(item->value == NULL) { - return; - } + if(item->value == NULL) return; gchar* name = g_strdup(item->value); g_array_append_val((GArray *)acc_ptr, name); @@ -751,9 +746,7 @@ void janus_mqtt_set_proxy_transaction_user_property(gpointer item_ptr, gpointer void janus_mqtt_set_add_transaction_user_property(gpointer item_ptr, gpointer user_data_ptr) { janus_config_item *item = (janus_config_item*)item_ptr; - if(item->value != NULL) { - return; - } + if(item->value != NULL) return; janus_mqtt_set_add_transaction_user_property_user_data *user_data = (janus_mqtt_set_add_transaction_user_property_user_data*)user_data_ptr; GList *key_value = janus_config_get_items(user_data->config, item); @@ -812,9 +805,8 @@ gboolean janus_mqtt_is_admin_api_enabled(void) { } int janus_mqtt_send_message(janus_transport_session *transport, void *request_id, gboolean admin, json_t *message) { - if(message == NULL || transport == NULL) { - return -1; - } + if(message == NULL || transport == NULL) return -1; + /* Not really needed as we always only have a single context, but that's fine */ janus_mqtt_context *ctx = (janus_mqtt_context *)transport->transport_p; if(ctx == NULL) { @@ -833,7 +825,7 @@ int janus_mqtt_send_message(janus_transport_session *transport, void *request_id char *transaction = g_strdup(json_string_value(json_object_get(message, "transaction"))); janus_mqtt_transaction_state *state = NULL; - if (transaction != NULL) { + if(transaction != NULL) { g_rw_lock_reader_lock(&janus_mqtt_transaction_states_lock); state = g_hash_table_lookup(janus_mqtt_transaction_states, transaction); @@ -847,7 +839,7 @@ int janus_mqtt_send_message(janus_transport_session *transport, void *request_id } rc = janus_mqtt_client_publish_message5(ctx, payload, admin, &properties, response_topic); - if (response_topic != NULL) g_free(response_topic); + if(response_topic != NULL) g_free(response_topic); MQTTProperties_free(&properties); } else { rc = janus_mqtt_client_publish_message(ctx, payload, admin); @@ -888,15 +880,11 @@ void janus_mqtt_proxy_properties(janus_mqtt_transaction_state *state, GArray *us } /* Proxy additional user properties from config */ - if(user_property_names == NULL || user_property_names->len == 0) { - return; - } + if(user_property_names == NULL || user_property_names->len == 0) return; for(int i = 0; i < state->properties->count; i++) { MQTTProperty request_prop = state->properties->array[i]; - if(request_prop.identifier != MQTTPROPERTY_CODE_USER_PROPERTY) { - continue; - } + if(request_prop.identifier != MQTTPROPERTY_CODE_USER_PROPERTY) continue; for(uint j = 0; j < user_property_names->len; j++) { char *key = (char*)g_array_index(user_property_names, char*, j); @@ -922,9 +910,7 @@ void janus_mqtt_proxy_properties(janus_mqtt_transaction_state *state, GArray *us } void janus_mqtt_add_properties(janus_mqtt_transaction_state *state, GArray *user_properties, MQTTProperties *properties) { - if(user_properties == NULL || user_properties->len == 0) { - return; - } + if(user_properties == NULL || user_properties->len == 0) return; for(uint i = 0; i < user_properties->len; i++) { MQTTProperty *property = &g_array_index(user_properties, MQTTProperty, i); @@ -1026,7 +1012,7 @@ int janus_mqtt_client_message_arrived(void *context, char *topicName, int topicL json_error_t error; json_t *root = json_loadb(message->payload, message->payloadlen, 0, &error); - #ifdef MQTTVERSION_5 +#ifdef MQTTVERSION_5 if(ctx->connect.mqtt_version == MQTTVERSION_5 && !admin) { /* Save MQTT 5 properties copy to the state */ const gchar *transaction = g_strdup(json_string_value(json_object_get(root, "transaction"))); @@ -1040,13 +1026,13 @@ int janus_mqtt_client_message_arrived(void *context, char *topicName, int topicL janus_mqtt_transaction_state *state = g_malloc(sizeof(janus_mqtt_transaction_state)); state->properties = properties; - state->created_at = time(0); + state->created_at = janus_get_monotonic_time(); g_rw_lock_writer_lock(&janus_mqtt_transaction_states_lock); g_hash_table_insert(janus_mqtt_transaction_states, (gpointer) transaction, (gpointer) state); g_rw_lock_writer_unlock(&janus_mqtt_transaction_states_lock); } - #endif +#endif ctx->gateway->incoming_request(&janus_mqtt_transport_, mqtt_session, NULL, admin, root, &error); } @@ -1254,7 +1240,7 @@ int janus_mqtt_client_subscribe(janus_mqtt_context *ctx, gboolean admin) { MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer; options.context = ctx; if(admin) { - #ifdef MQTTVERSION_5 +#ifdef MQTTVERSION_5 if(ctx->connect.mqtt_version == MQTTVERSION_5) { options.onSuccess5 = janus_mqtt_client_admin_subscribe_success5; options.onFailure5 = janus_mqtt_client_admin_subscribe_failure5; @@ -1262,13 +1248,13 @@ int janus_mqtt_client_subscribe(janus_mqtt_context *ctx, gboolean admin) { options.onSuccess = janus_mqtt_client_admin_subscribe_success; options.onFailure = janus_mqtt_client_admin_subscribe_failure; } - #else +#else options.onSuccess = janus_mqtt_client_admin_subscribe_success; options.onFailure = janus_mqtt_client_admin_subscribe_failure; - #endif +#endif return MQTTAsync_subscribe(ctx->client, ctx->admin.subscribe.topic, ctx->admin.subscribe.qos, &options); } else { - #ifdef MQTTVERSION_5 +#ifdef MQTTVERSION_5 if(ctx->connect.mqtt_version == MQTTVERSION_5) { options.onSuccess5 = janus_mqtt_client_subscribe_success5; options.onFailure5 = janus_mqtt_client_subscribe_failure5; @@ -1276,10 +1262,10 @@ int janus_mqtt_client_subscribe(janus_mqtt_context *ctx, gboolean admin) { options.onSuccess = janus_mqtt_client_subscribe_success; options.onFailure = janus_mqtt_client_subscribe_failure; } - #else +#else options.onSuccess = janus_mqtt_client_subscribe_success; options.onFailure = janus_mqtt_client_subscribe_failure; - #endif +#endif return MQTTAsync_subscribe(ctx->client, ctx->subscribe.topic, ctx->subscribe.qos, &options); } } @@ -1600,7 +1586,7 @@ static gboolean janus_mqtt_vacuum(gpointer context) { while (g_hash_table_iter_next(&iter, NULL, &value)) { janus_mqtt_transaction_state* state = value; - if(time(0) - state->created_at > ctx->vacuum_interval) { + if(janus_get_monotonic_time() - state->created_at > ctx->vacuum_interval) { g_hash_table_iter_remove(&iter); } }