Skip to content

Commit

Permalink
RabbitMQ Transport Reconnect Logic (meetecho#2651)
Browse files Browse the repository at this point in the history
  • Loading branch information
chriswiggins committed May 10, 2021
1 parent 280e8e4 commit f8e8c5e
Show file tree
Hide file tree
Showing 5 changed files with 343 additions and 269 deletions.
2 changes: 1 addition & 1 deletion conf/janus.eventhandler.rabbitmqevh.jcfg.sample
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ general: {
#exchange = "janus-exchange"
route_key = "janus-events" # Routing key to use when publishing messages
#exchange_type = "fanout" # Rabbitmq exchange_type can be one of the available types: direct, topic, headers and fanout (fanout by defualt).
#heartbeat = 60 # Defines the seconds without communication that should pass before considering the TCP connection has unreachable.
#heartbeat = 60 # Defines the seconds without communication that should pass before considering the TCP connection unreachable.
#declare_outgoing_queue = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior

#ssl_enable = false # Whether ssl support must be enabled
Expand Down
1 change: 1 addition & 0 deletions conf/janus.transport.rabbitmq.jcfg.sample
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ general: {
#queue_durable = false # Whether or not incoming queue should remain after a RabbitMQ reboot
#queue_autodelete = false # Whether or not incoming queue should autodelete after janus disconnects from RabbitMQ
#queue_exclusive = false # Whether or not incoming queue should only allow one subscriber
#heartbeat = 60 # Defines the seconds without communication that should pass before considering the TCP connection unreachable.

#ssl_enabled = false # Whether ssl support must be enabled
#ssl_verify_peer = true # Whether peer verification must be enabled
Expand Down
1 change: 1 addition & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ AC_CHECK_LIB([rabbitmq],
AC_DEFINE(HAVE_RABBITMQEVH)
enable_rabbitmq_event_handler=yes
])
AC_CHECK_HEADERS([rabbitmq-c/amqp.h])
],
[
AS_IF([test "x$enable_rabbitmq" = "xyes"],
Expand Down
68 changes: 34 additions & 34 deletions events/janus_rabbitmqevh.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,18 @@

#include <math.h>

/* Latest RabbitMQ-C library changes the library paths from 0.12.0.0 onwards */
#ifdef HAVE_RABBITMQ_C_AMQP_H
#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/framing.h>
#include <rabbitmq-c/tcp_socket.h>
#include <rabbitmq-c/ssl_socket.h>
#else
#include <amqp.h>
#include <amqp_framing.h>
#include <amqp_tcp_socket.h>
#include <amqp_ssl_socket.h>
#endif

#include "../debug.h"
#include "../config.h"
Expand Down Expand Up @@ -177,7 +185,7 @@ int janus_rabbitmqevh_init(const char *config_path) {
/* Compact, so no spaces between separators */
json_format = JSON_COMPACT | JSON_PRESERVE_ORDER;
} else {
JANUS_LOG(LOG_WARN, "Unsupported JSON format option '%s', using default (indented)\n", item->value);
JANUS_LOG(LOG_WARN, "RabbitMQEventHandler: Unsupported JSON format option '%s', using default (indented)\n", item->value);
json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
}
}
Expand Down Expand Up @@ -224,7 +232,7 @@ int janus_rabbitmqevh_init(const char *config_path) {

item = janus_config_get(config, config_general, janus_config_type_item, "heartbeat");
if(item && item->value && janus_string_to_uint16(item->value, &heartbeat) < 0) {
JANUS_LOG(LOG_ERR, "Invalid heartbeat timeout (%s), falling back to default\n", item->value);
JANUS_LOG(LOG_ERR, "RabbitMQEventHandler: Invalid heartbeat timeout (%s), falling back to default (0, disabling heartbeat)\n", item->value);
heartbeat = 0;
}

Expand Down Expand Up @@ -279,9 +287,9 @@ int janus_rabbitmqevh_init(const char *config_path) {
exchange = g_strdup(item->value);
}
if (exchange == NULL) {
JANUS_LOG(LOG_INFO, "RabbitMQ event handler enabled, %s:%d (%s) exchange_type:%s\n", rmqhost, rmqport, route_key,exchange_type);
JANUS_LOG(LOG_INFO, "RabbitMQEventHandler: enabled, %s:%d (%s) exchange_type:%s\n", rmqhost, rmqport, route_key,exchange_type);
} else {
JANUS_LOG(LOG_INFO, "RabbitMQ event handler enabled, %s:%d (%s) exch: (%s) exchange_type:%s\n", rmqhost, rmqport, route_key, exchange,exchange_type);
JANUS_LOG(LOG_INFO, "RabbitMQEventHandler: enabled, %s:%d (%s) exch: (%s) exchange_type:%s\n", rmqhost, rmqport, route_key, exchange,exchange_type);
}

/* Connect */
Expand All @@ -300,7 +308,7 @@ int janus_rabbitmqevh_init(const char *config_path) {
handler_thread = g_thread_try_new("janus rabbitmqevh handler", jns_rmqevh_hdlr, NULL, &error);
if(error != NULL) {
g_atomic_int_set(&initialized, 0);
JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the RabbitMQEventHandler handler thread...\n",
JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Got error %d (%s) trying to launch the RabbitMQEventHandler handler thread...\n",
error->code, error->message ? error->message : "??");
g_error_free(error);
goto error;
Expand All @@ -309,7 +317,7 @@ int janus_rabbitmqevh_init(const char *config_path) {
in_thread = g_thread_try_new("janus rabbitmqevh heartbeat handler", jns_rmqevh_hrtbt, NULL, &error);
if(error != NULL) {
g_atomic_int_set(&initialized, 0);
JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the RabbitMQEventHandler heartbeat thread...\n",
JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Got error %d (%s) trying to launch the RabbitMQEventHandler heartbeat thread...\n",
error->code, error->message ? error->message : "??");
g_error_free(error);
goto error;
Expand Down Expand Up @@ -350,16 +358,10 @@ int janus_rabbitmqevh_connect(void) {
JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error creating socket...\n");
return -1;
}
if(ssl_verify_peer) {
amqp_ssl_socket_set_verify_peer(socket, 1);
} else {
amqp_ssl_socket_set_verify_peer(socket, 0);
}
if(ssl_verify_hostname) {
amqp_ssl_socket_set_verify_hostname(socket, 1);
} else {
amqp_ssl_socket_set_verify_hostname(socket, 0);
}

amqp_ssl_socket_set_verify_peer(socket, ssl_verify_peer);
amqp_ssl_socket_set_verify_hostname(socket, ssl_verify_hostname);

if(ssl_cacert_file) {
status = amqp_ssl_socket_set_cacert(socket, ssl_cacert_file);
if(status != AMQP_STATUS_OK) {
Expand All @@ -385,7 +387,7 @@ int janus_rabbitmqevh_connect(void) {
JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Connecting to RabbitMQ server...\n");
status = amqp_socket_open(socket, rmqhost, rmqport);
if(status != AMQP_STATUS_OK) {
JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error opening socket... (%s)\n", amqp_error_string2(status));
JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error opening socket... (%s)\n", amqp_error_string2(status));
return -1;
}
JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Logging in...\n");
Expand All @@ -410,13 +412,13 @@ int janus_rabbitmqevh_connect(void) {
amqp_exchange_declare(rmq_conn, rmq_channel, rmq_exchange, amqp_cstring_bytes(exchange_type), 0, 0, 0, 0, amqp_empty_table);
result = amqp_get_rpc_reply(rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error diclaring exchange... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error declaring exchange... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
return -1;
}
}

if (declare_outgoing_queue) {
JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", route_key);
JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Declaring outgoing queue... (%s)\n", route_key);
amqp_queue_declare(rmq_conn, rmq_channel, amqp_cstring_bytes(route_key), 0, 0, 0, 0, amqp_empty_table);
result = amqp_get_rpc_reply(rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
Expand All @@ -425,6 +427,8 @@ int janus_rabbitmqevh_connect(void) {
}
}

JANUS_LOG(LOG_INFO, "RabbitMQEventHandler: Connected successfully");

return 0;
}

Expand All @@ -446,9 +450,7 @@ void janus_rabbitmqevh_destroy(void) {
g_async_queue_unref(events);
events = NULL;

if(rmq_conn && rmq_channel) {
amqp_channel_close(rmq_conn, rmq_channel, AMQP_REPLY_SUCCESS);
amqp_connection_close(rmq_conn, AMQP_REPLY_SUCCESS);
if(rmq_conn) {
amqp_destroy_connection(rmq_conn);
}
if(rmq_exchange.bytes)
Expand Down Expand Up @@ -548,7 +550,7 @@ json_t *janus_rabbitmqevh_handle_request(json_t *request) {
if(json_object_get(request, "grouping"))
group_events = json_is_true(json_object_get(request, "grouping"));
} else {
JANUS_LOG(LOG_VERB, "Unknown request '%s'\n", request_text);
JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Unknown request '%s'\n", request_text);
error_code = JANUS_RABBITMQEVH_ERROR_INVALID_REQUEST;
g_snprintf(error_cause, 512, "Unknown request '%s'", request_text);
}
Expand All @@ -570,7 +572,7 @@ json_t *janus_rabbitmqevh_handle_request(json_t *request) {

/* Thread to handle incoming events */
static void *jns_rmqevh_hdlr(void *data) {
JANUS_LOG(LOG_VERB, "Joining RabbitMQEventHandler handler thread\n");
JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: joining handler thread\n");
json_t *event = NULL, *output = NULL;
char *event_text = NULL;
int count = 0, max = group_events ? 100 : 1;
Expand All @@ -589,7 +591,7 @@ static void *jns_rmqevh_hdlr(void *data) {
if(created && json_is_integer(created)) {
gint64 then = json_integer_value(created);
gint64 now = janus_get_monotonic_time();
JANUS_LOG(LOG_DBG, "Handled event after %"SCNu64" us\n", now-then);
JANUS_LOG(LOG_DBG, "RabbitMQEventHandler: Handled event after %"SCNu64" us\n", now-then);
}
if(!group_events) {
/* We're done here, we just need a single event */
Expand All @@ -613,7 +615,7 @@ static void *jns_rmqevh_hdlr(void *data) {
/* Since this a simple plugin, it does the same for all events: so just convert to string... */
event_text = json_dumps(output, json_format);
if(event_text == NULL) {
JANUS_LOG(LOG_WARN, "Failed to stringify event, event lost...\n");
JANUS_LOG(LOG_WARN, "RabbitMQEventHandler: Failed to stringify event, event lost...\n");
/* Nothing we can do... get rid of the event */
json_decref(output);
output = NULL;
Expand All @@ -638,14 +640,14 @@ static void *jns_rmqevh_hdlr(void *data) {
json_decref(output);
output = NULL;
}
JANUS_LOG(LOG_VERB, "Leaving RabbitMQEventHandler handler thread\n");
JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: leaving handler thread\n");
return NULL;
}


/* Thread to handle heartbeats */
static void *jns_rmqevh_hrtbt(void *data) {
JANUS_LOG(LOG_VERB, "Monitoring RabbitMQ HeartBeat\n");
JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Monitoring RabbitMQ Heartbeat\n");
int waiting_usec = (heartbeat/2) * 1000000;
struct timeval timeout;
timeout.tv_sec = 0;
Expand All @@ -666,15 +668,13 @@ static void *jns_rmqevh_hrtbt(void *data) {
continue;
}

JANUS_LOG(LOG_VERB, "Error on amqp_simple_wait_frame_noblock: %d (%s)\n", res, amqp_error_string2(res));
JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Error on amqp_simple_wait_frame_noblock: %d (%s)\n", res, amqp_error_string2(res));

if(rmq_conn && rmq_channel) {
amqp_channel_close(rmq_conn, rmq_channel, AMQP_REPLY_SUCCESS);
amqp_connection_close(rmq_conn, AMQP_REPLY_SUCCESS);
if(rmq_conn) {
amqp_destroy_connection(rmq_conn);
}
if(!g_atomic_int_get(&stopping)) {
JANUS_LOG(LOG_VERB, "Trying to reconnect with RabbitMQ Server\n");
JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Trying to reconnect\n");
int result = janus_rabbitmqevh_connect();
if(result < 0) {
g_usleep(5000000);
Expand All @@ -688,6 +688,6 @@ static void *jns_rmqevh_hrtbt(void *data) {
}
}

JANUS_LOG(LOG_VERB, "Leaving RabbitMQEventHandler HeartBeat thread\n");
JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Leaving HeartBeat thread\n");
return NULL;
}
Loading

0 comments on commit f8e8c5e

Please sign in to comment.