diff --git a/conf/janus.eventhandler.rabbitmqevh.jcfg.sample b/conf/janus.eventhandler.rabbitmqevh.jcfg.sample index a29ec4e3cb..c8635e2453 100644 --- a/conf/janus.eventhandler.rabbitmqevh.jcfg.sample +++ b/conf/janus.eventhandler.rabbitmqevh.jcfg.sample @@ -20,7 +20,7 @@ general: { #exchange = "janus-exchange" route_key = "janus-events" # Routing key to use when publishing messages #exchange_type = "fanout" # Rabbitmq exchange_type can be one of the available types: direct, topic, headers and fanout (fanout by defualt). - #heartbeat = 60 # Defines the seconds without communication that should pass before considering the TCP connection has unreachable. + #heartbeat = 60 # Defines the seconds without communication that should pass before considering the TCP connection unreachable. #declare_outgoing_queue = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior #ssl_enable = false # Whether ssl support must be enabled diff --git a/conf/janus.transport.rabbitmq.jcfg.sample b/conf/janus.transport.rabbitmq.jcfg.sample index 9aae80ef0a..27cfebd0a0 100644 --- a/conf/janus.transport.rabbitmq.jcfg.sample +++ b/conf/janus.transport.rabbitmq.jcfg.sample @@ -34,6 +34,7 @@ general: { #queue_durable = false # Whether or not incoming queue should remain after a RabbitMQ reboot #queue_autodelete = false # Whether or not incoming queue should autodelete after janus disconnects from RabbitMQ #queue_exclusive = false # Whether or not incoming queue should only allow one subscriber + #heartbeat = 60 # Defines the seconds without communication that should pass before considering the TCP connection unreachable. #ssl_enabled = false # Whether ssl support must be enabled #ssl_verify_peer = true # Whether peer verification must be enabled diff --git a/configure.ac b/configure.ac index 8315f2b6b8..69827ed9eb 100644 --- a/configure.ac +++ b/configure.ac @@ -605,6 +605,7 @@ AC_CHECK_LIB([rabbitmq], AC_DEFINE(HAVE_RABBITMQEVH) enable_rabbitmq_event_handler=yes ]) + AC_CHECK_HEADERS([rabbitmq-c/amqp.h]) ], [ AS_IF([test "x$enable_rabbitmq" = "xyes"], diff --git a/events/janus_rabbitmqevh.c b/events/janus_rabbitmqevh.c index 9fda6ddced..003ddf22be 100644 --- a/events/janus_rabbitmqevh.c +++ b/events/janus_rabbitmqevh.c @@ -12,10 +12,18 @@ #include +/* Latest RabbitMQ-C library changes the library paths from 0.12.0.0 onwards */ +#ifdef HAVE_RABBITMQ_C_AMQP_H +#include +#include +#include +#include +#else #include #include #include #include +#endif #include "../debug.h" #include "../config.h" @@ -177,7 +185,7 @@ int janus_rabbitmqevh_init(const char *config_path) { /* Compact, so no spaces between separators */ json_format = JSON_COMPACT | JSON_PRESERVE_ORDER; } else { - JANUS_LOG(LOG_WARN, "Unsupported JSON format option '%s', using default (indented)\n", item->value); + JANUS_LOG(LOG_WARN, "RabbitMQEventHandler: Unsupported JSON format option '%s', using default (indented)\n", item->value); json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER; } } @@ -224,7 +232,7 @@ int janus_rabbitmqevh_init(const char *config_path) { item = janus_config_get(config, config_general, janus_config_type_item, "heartbeat"); if(item && item->value && janus_string_to_uint16(item->value, &heartbeat) < 0) { - JANUS_LOG(LOG_ERR, "Invalid heartbeat timeout (%s), falling back to default\n", item->value); + JANUS_LOG(LOG_ERR, "RabbitMQEventHandler: Invalid heartbeat timeout (%s), falling back to default (0, disabling heartbeat)\n", item->value); heartbeat = 0; } @@ -279,9 +287,9 @@ int janus_rabbitmqevh_init(const char *config_path) { exchange = g_strdup(item->value); } if (exchange == NULL) { - JANUS_LOG(LOG_INFO, "RabbitMQ event handler enabled, %s:%d (%s) exchange_type:%s\n", rmqhost, rmqport, route_key,exchange_type); + JANUS_LOG(LOG_INFO, "RabbitMQEventHandler: enabled, %s:%d (%s) exchange_type:%s\n", rmqhost, rmqport, route_key,exchange_type); } else { - JANUS_LOG(LOG_INFO, "RabbitMQ event handler enabled, %s:%d (%s) exch: (%s) exchange_type:%s\n", rmqhost, rmqport, route_key, exchange,exchange_type); + JANUS_LOG(LOG_INFO, "RabbitMQEventHandler: enabled, %s:%d (%s) exch: (%s) exchange_type:%s\n", rmqhost, rmqport, route_key, exchange,exchange_type); } /* Connect */ @@ -300,7 +308,7 @@ int janus_rabbitmqevh_init(const char *config_path) { handler_thread = g_thread_try_new("janus rabbitmqevh handler", jns_rmqevh_hdlr, NULL, &error); if(error != NULL) { g_atomic_int_set(&initialized, 0); - JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the RabbitMQEventHandler handler thread...\n", + JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Got error %d (%s) trying to launch the RabbitMQEventHandler handler thread...\n", error->code, error->message ? error->message : "??"); g_error_free(error); goto error; @@ -309,7 +317,7 @@ int janus_rabbitmqevh_init(const char *config_path) { in_thread = g_thread_try_new("janus rabbitmqevh heartbeat handler", jns_rmqevh_hrtbt, NULL, &error); if(error != NULL) { g_atomic_int_set(&initialized, 0); - JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the RabbitMQEventHandler heartbeat thread...\n", + JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Got error %d (%s) trying to launch the RabbitMQEventHandler heartbeat thread...\n", error->code, error->message ? error->message : "??"); g_error_free(error); goto error; @@ -350,16 +358,10 @@ int janus_rabbitmqevh_connect(void) { JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error creating socket...\n"); return -1; } - if(ssl_verify_peer) { - amqp_ssl_socket_set_verify_peer(socket, 1); - } else { - amqp_ssl_socket_set_verify_peer(socket, 0); - } - if(ssl_verify_hostname) { - amqp_ssl_socket_set_verify_hostname(socket, 1); - } else { - amqp_ssl_socket_set_verify_hostname(socket, 0); - } + + amqp_ssl_socket_set_verify_peer(socket, ssl_verify_peer); + amqp_ssl_socket_set_verify_hostname(socket, ssl_verify_hostname); + if(ssl_cacert_file) { status = amqp_ssl_socket_set_cacert(socket, ssl_cacert_file); if(status != AMQP_STATUS_OK) { @@ -385,7 +387,7 @@ int janus_rabbitmqevh_connect(void) { JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Connecting to RabbitMQ server...\n"); status = amqp_socket_open(socket, rmqhost, rmqport); if(status != AMQP_STATUS_OK) { - JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error opening socket... (%s)\n", amqp_error_string2(status)); + JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error opening socket... (%s)\n", amqp_error_string2(status)); return -1; } JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Logging in...\n"); @@ -410,13 +412,13 @@ int janus_rabbitmqevh_connect(void) { amqp_exchange_declare(rmq_conn, rmq_channel, rmq_exchange, amqp_cstring_bytes(exchange_type), 0, 0, 0, 0, amqp_empty_table); result = amqp_get_rpc_reply(rmq_conn); if(result.reply_type != AMQP_RESPONSE_NORMAL) { - JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error diclaring exchange... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error declaring exchange... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); return -1; } } if (declare_outgoing_queue) { - JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", route_key); + JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Declaring outgoing queue... (%s)\n", route_key); amqp_queue_declare(rmq_conn, rmq_channel, amqp_cstring_bytes(route_key), 0, 0, 0, 0, amqp_empty_table); result = amqp_get_rpc_reply(rmq_conn); if(result.reply_type != AMQP_RESPONSE_NORMAL) { @@ -425,6 +427,8 @@ int janus_rabbitmqevh_connect(void) { } } + JANUS_LOG(LOG_INFO, "RabbitMQEventHandler: Connected successfully"); + return 0; } @@ -446,9 +450,7 @@ void janus_rabbitmqevh_destroy(void) { g_async_queue_unref(events); events = NULL; - if(rmq_conn && rmq_channel) { - amqp_channel_close(rmq_conn, rmq_channel, AMQP_REPLY_SUCCESS); - amqp_connection_close(rmq_conn, AMQP_REPLY_SUCCESS); + if(rmq_conn) { amqp_destroy_connection(rmq_conn); } if(rmq_exchange.bytes) @@ -548,7 +550,7 @@ json_t *janus_rabbitmqevh_handle_request(json_t *request) { if(json_object_get(request, "grouping")) group_events = json_is_true(json_object_get(request, "grouping")); } else { - JANUS_LOG(LOG_VERB, "Unknown request '%s'\n", request_text); + JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Unknown request '%s'\n", request_text); error_code = JANUS_RABBITMQEVH_ERROR_INVALID_REQUEST; g_snprintf(error_cause, 512, "Unknown request '%s'", request_text); } @@ -570,7 +572,7 @@ json_t *janus_rabbitmqevh_handle_request(json_t *request) { /* Thread to handle incoming events */ static void *jns_rmqevh_hdlr(void *data) { - JANUS_LOG(LOG_VERB, "Joining RabbitMQEventHandler handler thread\n"); + JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: joining handler thread\n"); json_t *event = NULL, *output = NULL; char *event_text = NULL; int count = 0, max = group_events ? 100 : 1; @@ -589,7 +591,7 @@ static void *jns_rmqevh_hdlr(void *data) { if(created && json_is_integer(created)) { gint64 then = json_integer_value(created); gint64 now = janus_get_monotonic_time(); - JANUS_LOG(LOG_DBG, "Handled event after %"SCNu64" us\n", now-then); + JANUS_LOG(LOG_DBG, "RabbitMQEventHandler: Handled event after %"SCNu64" us\n", now-then); } if(!group_events) { /* We're done here, we just need a single event */ @@ -613,7 +615,7 @@ static void *jns_rmqevh_hdlr(void *data) { /* Since this a simple plugin, it does the same for all events: so just convert to string... */ event_text = json_dumps(output, json_format); if(event_text == NULL) { - JANUS_LOG(LOG_WARN, "Failed to stringify event, event lost...\n"); + JANUS_LOG(LOG_WARN, "RabbitMQEventHandler: Failed to stringify event, event lost...\n"); /* Nothing we can do... get rid of the event */ json_decref(output); output = NULL; @@ -638,14 +640,14 @@ static void *jns_rmqevh_hdlr(void *data) { json_decref(output); output = NULL; } - JANUS_LOG(LOG_VERB, "Leaving RabbitMQEventHandler handler thread\n"); + JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: leaving handler thread\n"); return NULL; } /* Thread to handle heartbeats */ static void *jns_rmqevh_hrtbt(void *data) { - JANUS_LOG(LOG_VERB, "Monitoring RabbitMQ HeartBeat\n"); + JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Monitoring RabbitMQ Heartbeat\n"); int waiting_usec = (heartbeat/2) * 1000000; struct timeval timeout; timeout.tv_sec = 0; @@ -666,15 +668,13 @@ static void *jns_rmqevh_hrtbt(void *data) { continue; } - JANUS_LOG(LOG_VERB, "Error on amqp_simple_wait_frame_noblock: %d (%s)\n", res, amqp_error_string2(res)); + JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Error on amqp_simple_wait_frame_noblock: %d (%s)\n", res, amqp_error_string2(res)); - if(rmq_conn && rmq_channel) { - amqp_channel_close(rmq_conn, rmq_channel, AMQP_REPLY_SUCCESS); - amqp_connection_close(rmq_conn, AMQP_REPLY_SUCCESS); + if(rmq_conn) { amqp_destroy_connection(rmq_conn); } if(!g_atomic_int_get(&stopping)) { - JANUS_LOG(LOG_VERB, "Trying to reconnect with RabbitMQ Server\n"); + JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Trying to reconnect\n"); int result = janus_rabbitmqevh_connect(); if(result < 0) { g_usleep(5000000); @@ -688,6 +688,6 @@ static void *jns_rmqevh_hrtbt(void *data) { } } - JANUS_LOG(LOG_VERB, "Leaving RabbitMQEventHandler HeartBeat thread\n"); + JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Leaving HeartBeat thread\n"); return NULL; } diff --git a/transports/janus_rabbitmq.c b/transports/janus_rabbitmq.c index 5e93f8fa04..06fc2e2c6e 100644 --- a/transports/janus_rabbitmq.c +++ b/transports/janus_rabbitmq.c @@ -33,10 +33,18 @@ #include "transport.h" +/* Latest RabbitMQ-C library changes the library paths from 0.12.0.0 onwards */ +#ifdef HAVE_RABBITMQ_C_AMQP_H +#include +#include +#include +#include +#else #include #include #include #include +#endif #include "../debug.h" #include "../apierror.h" @@ -72,6 +80,8 @@ void janus_rabbitmq_session_over(janus_transport_session *transport, guint64 ses void janus_rabbitmq_session_claimed(janus_transport_session *transport, guint64 session_id); json_t *janus_rabbitmq_query_transport(json_t *request); +/* Internal methods */ +static int janus_rabbitmq_connect(void); /* Transport setup */ static janus_transport janus_rabbitmq_transport = @@ -111,6 +121,9 @@ static janus_transport_callbacks *gateway = NULL; static gboolean rmq_janus_api_enabled = FALSE; static gboolean rmq_admin_api_enabled = FALSE; static gboolean notify_events = TRUE; +static guint rmq_reconnect_backoff_initial = 100000; /* 100ms */ +static guint rmq_reconnect_backoff_max = 5000000; /* 5s */ +static gfloat rmq_reconnect_backoff_multiplier = 1.5; #define JANUS_RABBITMQ_EXCHANGE_TYPE "fanout" @@ -146,6 +159,7 @@ typedef struct janus_rabbitmq_client { janus_mutex mutex; /* Mutex to lock/unlock this session */ gint session_timeout:1; /* Whether a Janus session timeout occurred in the core */ gint destroy:1; /* Flag to trigger a lazy session destruction */ + gint connected:1; /* Flag to specify whether or not RabbitMQ connection is deemed to be up */ } janus_rabbitmq_client; /* RabbitMQ response */ @@ -170,6 +184,12 @@ static char *rmqhost = NULL, *vhost = NULL, *username = NULL, *password = NULL, *ssl_cacert_file = NULL, *ssl_cert_file = NULL, *ssl_key_file = NULL, *to_janus = NULL, *from_janus = NULL, *to_janus_admin = NULL, *from_janus_admin = NULL, *janus_exchange = NULL, *janus_exchange_type = NULL, *queue_name = NULL, *queue_name_admin = NULL; +static uint16_t rmqport = AMQP_PROTOCOL_PORT; +static gboolean ssl_enabled = FALSE, ssl_verify_peer = FALSE, ssl_verify_hostname = FALSE; +static gboolean declare_outgoing_queue = FALSE, declare_outgoing_queue_admin = FALSE; +amqp_boolean_t queue_durable = 0, queue_exclusive = 0, queue_autodelete = 0, + queue_durable_admin = 0, queue_exclusive_admin = 0, queue_autodelete_admin = 0; +static uint16_t heartbeat = 0; /* Transport implementation */ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_path) { @@ -233,7 +253,6 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ rmqhost = g_strdup(item->value); else rmqhost = g_strdup("localhost"); - uint16_t rmqport = AMQP_PROTOCOL_PORT; item = janus_config_get(config, config_general, janus_config_type_item, "port"); if(item && item->value && janus_string_to_uint16(item->value, &rmqport) < 0) { JANUS_LOG(LOG_ERR, "Invalid port (%s), falling back to default\n", item->value); @@ -257,10 +276,6 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ else password = g_strdup("guest"); - /* SSL config*/ - gboolean ssl_enabled = FALSE; - gboolean ssl_verify_peer = FALSE; - gboolean ssl_verify_hostname = FALSE; item = janus_config_get(config, config_general, janus_config_type_item, "ssl_enabled"); if(item == NULL) { /* Try legacy property */ @@ -290,6 +305,13 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ ssl_verify_hostname = TRUE; } + /* Heartbeat config */ + item = janus_config_get(config, config_general, janus_config_type_item, "heartbeat"); + if(item && item->value && janus_string_to_uint16(item->value, &heartbeat) < 0) { + JANUS_LOG(LOG_ERR, "Invalid heartbeat timeout (%s), falling back to default (0, disabling heartbeat)\n", item->value); + heartbeat = 0; + } + /* Now check if the Janus API must be supported */ item = janus_config_get(config, config_general, janus_config_type_item, "enabled"); if(item == NULL) { @@ -338,6 +360,28 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ } from_janus = g_strdup(item->value); + /* Set queue options */ + item = janus_config_get(config, config_general, janus_config_type_item, "queue_durable"); + if(item && item->value && janus_is_true(item->value)) { + queue_durable = 1; + } + + item = janus_config_get(config, config_general, janus_config_type_item, "queue_exclusive"); + if(item && item->value && janus_is_true(item->value)) { + queue_exclusive = 1; + } + + item = janus_config_get(config, config_general, janus_config_type_item, "queue_autodelete"); + if(item && item->value && janus_is_true(item->value)) { + queue_autodelete = 1; + } + + /* By default, declare the outgoing queue */ + item = janus_config_get(config, config_general, janus_config_type_item, "declare_outgoing_queue"); + if(!item || !item->value || janus_is_true(item->value)) { + declare_outgoing_queue = TRUE; + } + if(janus_exchange == NULL) { JANUS_LOG(LOG_INFO, "RabbitMQ support for Janus API enabled, %s:%d (%s/%s) exchange_type:%s \n", rmqhost, rmqport, to_janus, from_janus, janus_exchange_type); } else { @@ -377,6 +421,29 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ } from_janus_admin = g_strdup(item->value); + + /* Set queue options */ + item = janus_config_get(config, config_admin, janus_config_type_item, "queue_durable_admin"); + if(item && item->value && janus_is_true(item->value)) { + queue_durable_admin = 1; + } + + item = janus_config_get(config, config_admin, janus_config_type_item, "queue_exclusive_admin"); + if(item && item->value && janus_is_true(item->value)) { + queue_exclusive_admin = 1; + } + + item = janus_config_get(config, config_admin, janus_config_type_item, "queue_autodelete_admin"); + if(item && item->value && janus_is_true(item->value)) { + queue_autodelete_admin = 1; + } + + /* By default, declare the outgoing queue */ + item = janus_config_get(config, config_admin, janus_config_type_item, "declare_outgoing_queue_admin"); + if(!item || !item->value || janus_is_true(item->value)) { + declare_outgoing_queue_admin = TRUE; + } + JANUS_LOG(LOG_INFO, "RabbitMQ support for Admin API enabled, %s:%d (%s/%s)\n", rmqhost, rmqport, to_janus_admin, from_janus_admin); rmq_admin_api_enabled = TRUE; } @@ -386,235 +453,13 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ } else { /* FIXME We currently support a single application, create a new janus_rabbitmq_client instance */ rmq_client = g_malloc0(sizeof(janus_rabbitmq_client)); + /* Connect */ - rmq_client->rmq_conn = amqp_new_connection(); - amqp_socket_t *socket = NULL; - amqp_queue_declare_ok_t *declare = NULL; - int status; - JANUS_LOG(LOG_VERB, "Creating RabbitMQ socket...\n"); - if (ssl_enabled) { - socket = amqp_ssl_socket_new(rmq_client->rmq_conn); - if(socket == NULL) { - JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error creating socket...\n"); - goto error; - } - if(ssl_verify_peer) { - amqp_ssl_socket_set_verify_peer(socket, 1); - } else { - amqp_ssl_socket_set_verify_peer(socket, 0); - } - if(ssl_verify_hostname) { - amqp_ssl_socket_set_verify_hostname(socket, 1); - } else { - amqp_ssl_socket_set_verify_hostname(socket, 0); - } - if(ssl_cacert_file) { - status = amqp_ssl_socket_set_cacert(socket, ssl_cacert_file); - if(status != AMQP_STATUS_OK) { - JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error setting CA certificate... (%s)\n", amqp_error_string2(status)); - goto error; - } - } - if(ssl_cert_file && ssl_key_file) { - status = amqp_ssl_socket_set_key(socket, ssl_cert_file, ssl_key_file); - if(status != AMQP_STATUS_OK) { - JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error setting key... (%s)\n", amqp_error_string2(status)); - goto error; - } - } - } else { - socket = amqp_tcp_socket_new(rmq_client->rmq_conn); - if(socket == NULL) { - JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error creating socket...\n"); - goto error; - } - } - JANUS_LOG(LOG_VERB, "Connecting to RabbitMQ server...\n"); - status = amqp_socket_open(socket, rmqhost, rmqport); - if(status != AMQP_STATUS_OK) { - JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error opening socket... (%s)\n", amqp_error_string2(status)); - goto error; - } - JANUS_LOG(LOG_VERB, "Logging in...\n"); - amqp_rpc_reply_t result = amqp_login(rmq_client->rmq_conn, vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, username, password); - if(result.reply_type != AMQP_RESPONSE_NORMAL) { - JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error logging in... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); - goto error; - } - rmq_client->rmq_channel = 1; - JANUS_LOG(LOG_VERB, "Opening channel...\n"); - amqp_channel_open(rmq_client->rmq_conn, rmq_client->rmq_channel); - result = amqp_get_rpc_reply(rmq_client->rmq_conn); - if(result.reply_type != AMQP_RESPONSE_NORMAL) { - JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error opening channel... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + int result = janus_rabbitmq_connect(); + if(result < 0) { goto error; } - rmq_client->janus_exchange = amqp_empty_bytes; - if(janus_exchange != NULL) { - JANUS_LOG(LOG_VERB, "Declaring exchange...\n"); - rmq_client->janus_exchange = amqp_cstring_bytes(janus_exchange); - amqp_exchange_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->janus_exchange, amqp_cstring_bytes(janus_exchange_type), 0, 0, 0, 0, amqp_empty_table); - result = amqp_get_rpc_reply(rmq_client->rmq_conn); - if(result.reply_type != AMQP_RESPONSE_NORMAL) { - JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error diclaring exchange... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); - goto error; - } - } - rmq_client->janus_api_enabled = FALSE; - if(rmq_janus_api_enabled) { - rmq_client->janus_api_enabled = TRUE; - - /* Set queue options */ - amqp_boolean_t queue_durable = 0; - item = janus_config_get(config, config_general, janus_config_type_item, "queue_durable"); - if(item && item->value && janus_is_true(item->value)) { - queue_durable = 1; - } - - amqp_boolean_t queue_exclusive = 0; - item = janus_config_get(config, config_general, janus_config_type_item, "queue_exclusive"); - if(item && item->value && janus_is_true(item->value)) { - queue_exclusive = 1; - } - - amqp_boolean_t queue_autodelete = 0; - item = janus_config_get(config, config_general, janus_config_type_item, "queue_autodelete"); - if(item && item->value && janus_is_true(item->value)) { - queue_autodelete = 1; - } - - /* Case when we have a queue_name, and to_janus is the name of the topic to bind on (if exchange_type is topic) */ - if(queue_name != NULL) { - JANUS_LOG(LOG_VERB, "Declaring incoming queue (using queue_name)... (%s)\n", queue_name); - declare = amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(queue_name), 0, queue_durable, queue_exclusive, queue_autodelete, amqp_empty_table); - rmq_client->to_janus_queue = declare->queue; - JANUS_LOG(LOG_VERB, "Incoming queue declared: (%s)\n", (char *) rmq_client->to_janus_queue.bytes); - result = amqp_get_rpc_reply(rmq_client->rmq_conn); - if(result.reply_type != AMQP_RESPONSE_NORMAL) { - JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); - goto error; - } - - if(strcmp(janus_exchange_type, "topic") == 0 || strcmp(janus_exchange_type, "direct") == 0) { - JANUS_LOG(LOG_VERB, "Binding queue (%s) to routing key (%s)\n", queue_name, to_janus); - amqp_queue_bind(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, rmq_client->janus_exchange, amqp_cstring_bytes(to_janus), amqp_empty_table); - result = amqp_get_rpc_reply(rmq_client->rmq_conn); - if(result.reply_type != AMQP_RESPONSE_NORMAL) { - JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error binding queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); - goto error; - } - } - - /* Case when to_janus is the name of the queue (and there's no binding) */ - } else { - JANUS_LOG(LOG_VERB, "Declaring incoming queue (using to_janus)... (%s)\n", to_janus); - declare = amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(to_janus), 0, queue_durable, queue_exclusive, queue_autodelete, amqp_empty_table); - rmq_client->to_janus_queue = declare->queue; - JANUS_LOG(LOG_VERB, "Incoming queue declared: (%s)\n", (char *)rmq_client->to_janus_queue.bytes); - result = amqp_get_rpc_reply(rmq_client->rmq_conn); - if(result.reply_type != AMQP_RESPONSE_NORMAL) { - JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); - goto error; - } - } - - /* By default, declare the outgoing queue */ - item = janus_config_get(config, config_general, janus_config_type_item, "declare_outgoing_queue"); - if(!item || !item->value || janus_is_true(item->value)) { - JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", from_janus); - amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(from_janus), 0, 0, 0, 0, amqp_empty_table); - result = amqp_get_rpc_reply(rmq_client->rmq_conn); - if(result.reply_type != AMQP_RESPONSE_NORMAL) { - JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); - goto error; - } - } - - amqp_basic_consume(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); - result = amqp_get_rpc_reply(rmq_client->rmq_conn); - if(result.reply_type != AMQP_RESPONSE_NORMAL) { - JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error consuming... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); - goto error; - } - } - rmq_client->admin_api_enabled = FALSE; - if(rmq_admin_api_enabled) { - rmq_client->admin_api_enabled = TRUE; - /* Set queue options */ - amqp_boolean_t queue_durable_admin = 0; - item = janus_config_get(config, config_admin, janus_config_type_item, "queue_durable_admin"); - if(item && item->value && janus_is_true(item->value)) { - queue_durable_admin = 1; - } - - amqp_boolean_t queue_exclusive_admin = 0; - item = janus_config_get(config, config_admin, janus_config_type_item, "queue_exclusive_admin"); - if(item && item->value && janus_is_true(item->value)) { - queue_exclusive_admin = 1; - } - - amqp_boolean_t queue_autodelete_admin = 0; - item = janus_config_get(config, config_admin, janus_config_type_item, "queue_autodelete_admin"); - if(item && item->value && janus_is_true(item->value)) { - queue_autodelete_admin = 1; - } - - /* Case when we have a queue_name_admin, and to_janus_admin is the name of the routing key to bind on (if exchange_type is topic or direct) */ - if(queue_name_admin != NULL) { - JANUS_LOG(LOG_VERB, "Declaring incoming admin queue (using queue_name_admin)... (%s)\n", queue_name_admin); - declare = amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(queue_name_admin), 0, queue_durable_admin, queue_exclusive_admin, queue_autodelete_admin, amqp_empty_table); - rmq_client->to_janus_admin_queue = declare->queue; - JANUS_LOG(LOG_VERB, "Incoming admin queue declared: (%s)\n", (char *) rmq_client->to_janus_queue.bytes); - result = amqp_get_rpc_reply(rmq_client->rmq_conn); - if(result.reply_type != AMQP_RESPONSE_NORMAL) { - JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); - goto error; - } - - if(strcmp(janus_exchange_type, "topic") == 0 || strcmp(janus_exchange_type, "direct") == 0) { - JANUS_LOG(LOG_VERB, "Binding queue (%s) to routing key (%s)\n", queue_name_admin, to_janus_admin); - amqp_queue_bind(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, rmq_client->janus_exchange, amqp_cstring_bytes(to_janus_admin), amqp_empty_table); - result = amqp_get_rpc_reply(rmq_client->rmq_conn); - if(result.reply_type != AMQP_RESPONSE_NORMAL) { - JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error binding queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); - goto error; - } - } - - /* Case when to_janus_admin is the name of the queue (and there's no binding */ - } else { - JANUS_LOG(LOG_VERB, "Declaring incoming admin queue (using to_janus_admin)... (%s)\n", to_janus_admin); - rmq_client->to_janus_admin_queue = amqp_cstring_bytes(to_janus_admin); - declare = amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, queue_durable_admin, queue_exclusive_admin, queue_autodelete_admin, amqp_empty_table); - rmq_client->to_janus_admin_queue = declare->queue; - JANUS_LOG(LOG_VERB, "Incoming admin queue declared: (%s)\n", (char *) rmq_client->to_janus_queue.bytes); - result = amqp_get_rpc_reply(rmq_client->rmq_conn); - if(result.reply_type != AMQP_RESPONSE_NORMAL) { - JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); - goto error; - } - } - - /* By default, declare the outgoing queue */ - item = janus_config_get(config, config_admin, janus_config_type_item, "declare_outgoing_queue_admin"); - if(!item || !item->value || janus_is_true(item->value)) { - JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", from_janus_admin); - amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(from_janus_admin), 0, 0, 0, 0, amqp_empty_table); - result = amqp_get_rpc_reply(rmq_client->rmq_conn); - if(result.reply_type != AMQP_RESPONSE_NORMAL) { - JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); - goto error; - } - } - - amqp_basic_consume(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); - result = amqp_get_rpc_reply(rmq_client->rmq_conn); - if(result.reply_type != AMQP_RESPONSE_NORMAL) { - JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error consuming... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); - goto error; - } - } rmq_client->messages = g_async_queue_new(); rmq_client->destroy = 0; /* Prepare the transport session (again, just one) */ @@ -643,6 +488,7 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ janus_config_destroy(config); return -1; } + janus_mutex_init(&rmq_client->mutex); /* Done */ JANUS_LOG(LOG_INFO, "Setup of RabbitMQ integration completed\n"); @@ -684,6 +530,196 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ return -1; } +int janus_rabbitmq_connect(void) { + rmq_client->connected = 0; + /* Connect */ + rmq_client->rmq_conn = amqp_new_connection(); + amqp_socket_t *socket = NULL; + amqp_queue_declare_ok_t *declare = NULL; + int status; + JANUS_LOG(LOG_VERB, "Creating RabbitMQ socket...\n"); + if(ssl_enabled) { + socket = amqp_ssl_socket_new(rmq_client->rmq_conn); + if(socket == NULL) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error creating socket...\n"); + return -1; + } + amqp_ssl_socket_set_verify_peer(socket, ssl_verify_peer); + amqp_ssl_socket_set_verify_hostname(socket, ssl_verify_hostname); + + if(ssl_cacert_file) { + status = amqp_ssl_socket_set_cacert(socket, ssl_cacert_file); + if(status != AMQP_STATUS_OK) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error setting CA certificate... (%s)\n", amqp_error_string2(status)); + return -1; + } + } + if(ssl_cert_file && ssl_key_file) { + status = amqp_ssl_socket_set_key(socket, ssl_cert_file, ssl_key_file); + if(status != AMQP_STATUS_OK) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error setting key... (%s)\n", amqp_error_string2(status)); + return -1; + } + } + } else { + socket = amqp_tcp_socket_new(rmq_client->rmq_conn); + if(socket == NULL) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error creating socket...\n"); + return -1; + } + } + JANUS_LOG(LOG_VERB, "Connecting to RabbitMQ server...\n"); + status = amqp_socket_open(socket, rmqhost, rmqport); + if(status != AMQP_STATUS_OK) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error opening socket... (%s)\n", amqp_error_string2(status)); + return -1; + } + JANUS_LOG(LOG_VERB, "Logging in...\n"); + amqp_rpc_reply_t result = amqp_login(rmq_client->rmq_conn, vhost, 0, 131072, heartbeat, AMQP_SASL_METHOD_PLAIN, username, password); + if(result.reply_type != AMQP_RESPONSE_NORMAL) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error logging in... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + return -1; + } + rmq_client->rmq_channel = 1; + JANUS_LOG(LOG_VERB, "Opening channel...\n"); + amqp_channel_open(rmq_client->rmq_conn, rmq_client->rmq_channel); + result = amqp_get_rpc_reply(rmq_client->rmq_conn); + if(result.reply_type != AMQP_RESPONSE_NORMAL) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error opening channel... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + return -1; + } + rmq_client->janus_exchange = amqp_empty_bytes; + if(janus_exchange != NULL) { + JANUS_LOG(LOG_VERB, "Declaring exchange...\n"); + rmq_client->janus_exchange = amqp_cstring_bytes(janus_exchange); + amqp_exchange_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->janus_exchange, amqp_cstring_bytes(janus_exchange_type), 0, 0, 0, 0, amqp_empty_table); + result = amqp_get_rpc_reply(rmq_client->rmq_conn); + if(result.reply_type != AMQP_RESPONSE_NORMAL) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring exchange... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + return -1; + } + } + rmq_client->janus_api_enabled = FALSE; + if(rmq_janus_api_enabled) { + rmq_client->janus_api_enabled = TRUE; + + /* Case when we have a queue_name, and to_janus is the name of the topic to bind on (if exchange_type is topic) */ + if(queue_name != NULL) { + JANUS_LOG(LOG_VERB, "Declaring incoming queue (using queue_name)... (%s)\n", queue_name); + declare = amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(queue_name), 0, queue_durable, queue_exclusive, queue_autodelete, amqp_empty_table); + rmq_client->to_janus_queue = declare->queue; + JANUS_LOG(LOG_VERB, "Incoming queue declared: (%s)\n", (char *) rmq_client->to_janus_queue.bytes); + result = amqp_get_rpc_reply(rmq_client->rmq_conn); + if(result.reply_type != AMQP_RESPONSE_NORMAL) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + return -1; + } + + if(strcmp(janus_exchange_type, "topic") == 0 || strcmp(janus_exchange_type, "direct") == 0) { + JANUS_LOG(LOG_VERB, "Binding queue (%s) to routing key (%s)\n", queue_name, to_janus); + amqp_queue_bind(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, rmq_client->janus_exchange, amqp_cstring_bytes(to_janus), amqp_empty_table); + result = amqp_get_rpc_reply(rmq_client->rmq_conn); + if(result.reply_type != AMQP_RESPONSE_NORMAL) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error binding queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + return -1; + } + } + + /* Case when to_janus is the name of the queue (and there's no binding) */ + } else { + JANUS_LOG(LOG_VERB, "Declaring incoming queue (using to_janus)... (%s)\n", to_janus); + declare = amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(to_janus), 0, queue_durable, queue_exclusive, queue_autodelete, amqp_empty_table); + rmq_client->to_janus_queue = declare->queue; + JANUS_LOG(LOG_VERB, "Incoming queue declared: (%s)\n", (char *)rmq_client->to_janus_queue.bytes); + result = amqp_get_rpc_reply(rmq_client->rmq_conn); + if(result.reply_type != AMQP_RESPONSE_NORMAL) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + return -1; + } + } + + if (declare_outgoing_queue) { + JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", from_janus); + amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(from_janus), 0, 0, 0, 0, amqp_empty_table); + result = amqp_get_rpc_reply(rmq_client->rmq_conn); + if(result.reply_type != AMQP_RESPONSE_NORMAL) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + return -1; + } + } + + amqp_basic_consume(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); + result = amqp_get_rpc_reply(rmq_client->rmq_conn); + if(result.reply_type != AMQP_RESPONSE_NORMAL) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error consuming... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + return -1; + } + } + rmq_client->admin_api_enabled = FALSE; + if(rmq_admin_api_enabled) { + rmq_client->admin_api_enabled = TRUE; + + /* Case when we have a queue_name_admin, and to_janus_admin is the name of the routing key to bind on (if exchange_type is topic or direct) */ + if(queue_name_admin != NULL) { + JANUS_LOG(LOG_VERB, "Declaring incoming admin queue (using queue_name_admin)... (%s)\n", queue_name_admin); + declare = amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(queue_name_admin), 0, queue_durable_admin, queue_exclusive_admin, queue_autodelete_admin, amqp_empty_table); + rmq_client->to_janus_admin_queue = declare->queue; + JANUS_LOG(LOG_VERB, "Incoming admin queue declared: (%s)\n", (char *) rmq_client->to_janus_queue.bytes); + result = amqp_get_rpc_reply(rmq_client->rmq_conn); + if(result.reply_type != AMQP_RESPONSE_NORMAL) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + return -1; + } + + if(strcmp(janus_exchange_type, "topic") == 0 || strcmp(janus_exchange_type, "direct") == 0) { + JANUS_LOG(LOG_VERB, "Binding queue (%s) to routing key (%s)\n", queue_name_admin, to_janus_admin); + amqp_queue_bind(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, rmq_client->janus_exchange, amqp_cstring_bytes(to_janus_admin), amqp_empty_table); + result = amqp_get_rpc_reply(rmq_client->rmq_conn); + if(result.reply_type != AMQP_RESPONSE_NORMAL) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error binding queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + return -1; + } + } + + /* Case when to_janus_admin is the name of the queue (and there's no binding */ + } else { + JANUS_LOG(LOG_VERB, "Declaring incoming admin queue (using to_janus_admin)... (%s)\n", to_janus_admin); + rmq_client->to_janus_admin_queue = amqp_cstring_bytes(to_janus_admin); + declare = amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, queue_durable_admin, queue_exclusive_admin, queue_autodelete_admin, amqp_empty_table); + rmq_client->to_janus_admin_queue = declare->queue; + JANUS_LOG(LOG_VERB, "Incoming admin queue declared: (%s)\n", (char *) rmq_client->to_janus_queue.bytes); + result = amqp_get_rpc_reply(rmq_client->rmq_conn); + if(result.reply_type != AMQP_RESPONSE_NORMAL) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + return -1; + } + } + + if (declare_outgoing_queue_admin){ + JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", from_janus_admin); + amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(from_janus_admin), 0, 0, 0, 0, amqp_empty_table); + result = amqp_get_rpc_reply(rmq_client->rmq_conn); + if(result.reply_type != AMQP_RESPONSE_NORMAL) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + return -1; + } + } + + amqp_basic_consume(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); + result = amqp_get_rpc_reply(rmq_client->rmq_conn); + if(result.reply_type != AMQP_RESPONSE_NORMAL) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error consuming... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + return -1; + } + } + + rmq_client->connected = 1; + + JANUS_LOG(LOG_INFO, "RabbitMQ Connected successfully"); + + return 0; +} + void janus_rabbitmq_destroy(void) { if(!g_atomic_int_get(&initialized)) return; @@ -696,9 +732,7 @@ void janus_rabbitmq_destroy(void) { g_thread_join(rmq_client->in_thread); if(rmq_client->out_thread) g_thread_join(rmq_client->out_thread); - if(rmq_client->rmq_conn && rmq_client->rmq_channel) { - amqp_channel_close(rmq_client->rmq_conn, rmq_client->rmq_channel, AMQP_REPLY_SUCCESS); - amqp_connection_close(rmq_client->rmq_conn, AMQP_REPLY_SUCCESS); + if(rmq_client->rmq_conn) { amqp_destroy_connection(rmq_client->rmq_conn); } } @@ -891,6 +925,8 @@ void *janus_rmq_in_thread(void *data) { timeout.tv_sec = 0; timeout.tv_usec = 20000; amqp_frame_t frame; + guint rmq_reconnect_backoff = rmq_reconnect_backoff_initial; + while(!rmq_client->destroy && !g_atomic_int_get(&stopping)) { amqp_maybe_release_buffers(rmq_client->rmq_conn); /* Wait for a frame */ @@ -900,8 +936,31 @@ void *janus_rmq_in_thread(void *data) { if(res == AMQP_STATUS_TIMEOUT || res == AMQP_STATUS_SSL_ERROR) continue; JANUS_LOG(LOG_VERB, "Error on amqp_simple_wait_frame_noblock: %d (%s)\n", res, amqp_error_string2(res)); - break; + + rmq_client->connected = 0; + + /* Try and reconnect */ + if(rmq_client->rmq_conn) { + amqp_destroy_connection(rmq_client->rmq_conn); + } + + if(!g_atomic_int_get(&stopping)) { + JANUS_LOG(LOG_VERB, "Trying to reconnect with RabbitMQ Server\n"); + int result = janus_rabbitmq_connect(); + if(result < 0) { + JANUS_LOG(LOG_WARN, "Failed to reconnect to RabbitMQ Server. Retrying in %fs...\n", (gfloat)rmq_reconnect_backoff/1000000); + g_usleep(rmq_reconnect_backoff); + rmq_reconnect_backoff *= rmq_reconnect_backoff_multiplier; + if(rmq_reconnect_backoff >= rmq_reconnect_backoff_max) + rmq_reconnect_backoff = rmq_reconnect_backoff_max; + } else { + rmq_reconnect_backoff = rmq_reconnect_backoff_initial; + } + + continue; + } } + /* We expect method first */ JANUS_LOG(LOG_VERB, "Frame type %d, channel %d\n", frame.frame_type, frame.channel); if(frame.frame_type != AMQP_FRAME_METHOD) @@ -973,7 +1032,20 @@ void *janus_rmq_out_thread(void *data) { return NULL; } JANUS_LOG(LOG_VERB, "Joining RabbitMQ out thread\n"); + guint rmq_reconnect_backoff = rmq_reconnect_backoff_initial; while(!rmq_client->destroy && !g_atomic_int_get(&stopping)) { + + if(!rmq_client->connected) { + g_usleep(rmq_reconnect_backoff); + rmq_reconnect_backoff *= rmq_reconnect_backoff_multiplier; + if (rmq_reconnect_backoff >= rmq_reconnect_backoff_max) + rmq_reconnect_backoff = rmq_reconnect_backoff_max; + + continue; + } + + rmq_reconnect_backoff = rmq_reconnect_backoff_initial; + /* We send messages from here as well, not only notifications */ janus_rabbitmq_response *response = g_async_queue_pop(rmq_client->messages); if(response == &exit_message)