From f604aeb76c71f7d93e131cb9db974d6e804af537 Mon Sep 17 00:00:00 2001 From: Chris Wiggins Date: Tue, 10 Nov 2020 17:23:11 +1300 Subject: [PATCH 1/8] Updates RabbitMQ logic - Publishing to a topic does not require an outgoing queue, just the topic, so the outgoing queues are no longer declared - When the janus_exchange_type is topic, we want to be able to name the queue, and then bind an incoming topic from the exchange to that queue, so that functionality has been added - This is all backwards compatible with original logic, and won't break existing logic --- ...janus.eventhandler.rabbitmqevh.jcfg.sample | 2 +- conf/janus.transport.rabbitmq.jcfg.sample | 15 +- events/janus_rabbitmqevh.c | 14 +- transports/janus_rabbitmq.c | 157 ++++++++++++------ 4 files changed, 122 insertions(+), 66 deletions(-) diff --git a/conf/janus.eventhandler.rabbitmqevh.jcfg.sample b/conf/janus.eventhandler.rabbitmqevh.jcfg.sample index 486eaa0976..c7802fcd14 100644 --- a/conf/janus.eventhandler.rabbitmqevh.jcfg.sample +++ b/conf/janus.eventhandler.rabbitmqevh.jcfg.sample @@ -18,7 +18,7 @@ general: { #password = "guest" # Password to use to authenticate, if needed #vhost = "/" # Virtual host to specify when logging in, if needed #exchange = "janus-exchange" - route_key = "janus-events" # Name of the queue for event messages + route_key = "janus.events" # Routing key to use when publishing messages #exchange_type = "fanout" # Rabbitmq exchange_type can be one of the available types: direct, topic, headers and fanout (fanout by defualt). #heartbeat = 60 # Defines the seconds without communication that should pass before considering the TCP connection has unreachable. diff --git a/conf/janus.transport.rabbitmq.jcfg.sample b/conf/janus.transport.rabbitmq.jcfg.sample index c39020731d..57dcd666c1 100644 --- a/conf/janus.transport.rabbitmq.jcfg.sample +++ b/conf/janus.transport.rabbitmq.jcfg.sample @@ -24,10 +24,15 @@ general: { #username = "guest" # Username to use to authenticate, if needed #password = "guest" # Password to use to authenticate, if needed #vhost = "/" # Virtual host to specify when logging in, if needed - to_janus = "to-janus" # Name of the queue for incoming messages - from_janus = "from-janus" # Name of the queue for outgoing messages + #janus_exchange = "janus-exchange" # Exchange for outgoing messages, using default if not provided #janus_exchange_type = "fanout" # Rabbitmq exchange_type can be one of the available types: direct, topic, headers and fanout (fanout by defualt). + + queue_name = "janus-gateway" # Queue name for incoming messages (if this is set and janus_exchange_type is topic, to_janus will be the topic the queue is bound to the exchange on) + + to_janus = "to-janus" # Name of the queue for incoming messages if queue_name isn't set, otherwise, the topic that queue_name is bound to + from_janus = "from-janus" # Topic of the message sent from janus + #ssl_enabled = false # Whether ssl support must be enabled #ssl_verify_peer = true # Whether peer verification must be enabled #ssl_verify_hostname = true # Whether hostname verification must be enabled @@ -44,6 +49,10 @@ general: { # Notice that by default the Admin API support via RabbitMQ is disabled. admin: { #admin_enabled = false # Whether the support must be enabled + + queue_name_admin = "janus-gateway-admin" # Queue name for incoming admin messages (if this is set and janus_exchange_type is topic, to_janus_admin will be the topic the queue is bound to the exchange on) + + # Deprecated config values, ignored if admin_queue_name is set above #to_janus_admin = "to-janus-admin" # Name of the queue for incoming messages - #from_janus_admin = "from-janus-admin" # Name of the queue for outgoing messages + #from_janus_admin = "from-janus-admin" # Topic of the message sent from janus } diff --git a/events/janus_rabbitmqevh.c b/events/janus_rabbitmqevh.c index 0cf269d89f..57cff8ac17 100644 --- a/events/janus_rabbitmqevh.c +++ b/events/janus_rabbitmqevh.c @@ -100,7 +100,6 @@ static size_t json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER; static amqp_connection_state_t rmq_conn; static amqp_channel_t rmq_channel = 0; static amqp_bytes_t rmq_exchange; -static amqp_bytes_t rmq_route_key; static janus_mutex mutex; @@ -408,14 +407,7 @@ int janus_rabbitmqevh_connect(void) { return -1; } } - JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", route_key); - rmq_route_key = amqp_cstring_bytes(route_key); - amqp_queue_declare(rmq_conn, rmq_channel, rmq_route_key, 0, 0, 0, 0, amqp_empty_table); - result = amqp_get_rpc_reply(rmq_conn); - if(result.reply_type != AMQP_RESPONSE_NORMAL) { - JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); - return -1; - } + return 0; } @@ -444,8 +436,6 @@ void janus_rabbitmqevh_destroy(void) { } if(rmq_exchange.bytes) g_free((char *)rmq_exchange.bytes); - if(rmq_route_key.bytes) - g_free((char *)rmq_route_key.bytes); if(rmqhost) g_free((char *)rmqhost); if(vhost) @@ -611,7 +601,7 @@ static void *jns_rmqevh_hdlr(void *data) { props.content_type = amqp_cstring_bytes("application/json"); amqp_bytes_t message = amqp_cstring_bytes(event_text); janus_mutex_lock(&mutex); - int status = amqp_basic_publish(rmq_conn, rmq_channel, rmq_exchange, rmq_route_key, 0, 0, &props, message); + int status = amqp_basic_publish(rmq_conn, rmq_channel, rmq_exchange, amqp_cstring_bytes(route_key), 0, 0, &props, message); if(status != AMQP_STATUS_OK) { JANUS_LOG(LOG_ERR, "RabbitMQEventHandler: Error publishing... %d, %s\n", status, amqp_error_string2(status)); } diff --git a/transports/janus_rabbitmq.c b/transports/janus_rabbitmq.c index d8babf9d81..14f70b098e 100644 --- a/transports/janus_rabbitmq.c +++ b/transports/janus_rabbitmq.c @@ -139,10 +139,8 @@ typedef struct janus_rabbitmq_client { gboolean janus_api_enabled; /* Whether the Janus API via RabbitMQ is enabled */ amqp_bytes_t janus_exchange; /* AMQP exchange for outgoing messages */ amqp_bytes_t to_janus_queue; /* AMQP outgoing messages queue (Janus API) */ - amqp_bytes_t from_janus_queue; /* AMQP incoming messages queue (Janus API) */ gboolean admin_api_enabled; /* Whether the Janus API via RabbitMQ is enabled */ amqp_bytes_t to_janus_admin_queue; /* AMQP outgoing messages queue (Admin API) */ - amqp_bytes_t from_janus_admin_queue; /* AMQP incoming messages queue (Admin API) */ GThread *in_thread, *out_thread; /* Threads to handle incoming and outgoing queues */ GAsyncQueue *messages; /* Queue of outgoing messages to push */ janus_mutex mutex; /* Mutex to lock/unlock this session */ @@ -170,7 +168,8 @@ static janus_transport_session *rmq_session = NULL; /* Global properties */ static char *rmqhost = NULL, *vhost = NULL, *username = NULL, *password = NULL, *ssl_cacert_file = NULL, *ssl_cert_file = NULL, *ssl_key_file = NULL, - *to_janus = NULL, *from_janus = NULL, *to_janus_admin = NULL, *from_janus_admin = NULL, *janus_exchange = NULL, *janus_exchange_type = NULL; + *to_janus = NULL, *from_janus = NULL, *to_janus_admin = NULL, *from_janus_admin = NULL, *janus_exchange = NULL, *janus_exchange_type = NULL, + *queue_name = NULL, *queue_name_admin = NULL; /* Transport implementation */ @@ -304,31 +303,42 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ if(!item || !item->value || !janus_is_true(item->value)) { JANUS_LOG(LOG_WARN, "RabbitMQ support disabled (Janus API)\n"); } else { - /* Parse configuration */ - item = janus_config_get(config, config_general, janus_config_type_item, "to_janus"); - if(!item || !item->value) { - JANUS_LOG(LOG_FATAL, "Missing name of incoming queue for RabbitMQ integration...\n"); - goto error; - } - to_janus = g_strdup(item->value); - item = janus_config_get(config, config_general, janus_config_type_item, "from_janus"); + + // Get exchange name config, or set to default exchange + item = janus_config_get(config, config_general, janus_config_type_item, "janus_exchange"); if(!item || !item->value) { - JANUS_LOG(LOG_FATAL, "Missing name of outgoing queue for RabbitMQ integration...\n"); - goto error; + JANUS_LOG(LOG_INFO, "Missing name of outgoing exchange for RabbitMQ integration, using default\n"); + } else { + janus_exchange = g_strdup(item->value); } - from_janus = g_strdup(item->value); + + // Get exchange type config, or set to default item = janus_config_get(config, config_general, janus_config_type_item, "janus_exchange_type"); if(!item || !item->value) { janus_exchange_type = (char *)JANUS_RABBITMQ_EXCHANGE_TYPE; } else { janus_exchange_type = g_strdup(item->value); } - item = janus_config_get(config, config_general, janus_config_type_item, "janus_exchange"); + + item = janus_config_get(config, config_general, janus_config_type_item, "queue_name"); + if(item && item->value) { + queue_name = g_strdup(item->value); + } + + item = janus_config_get(config, config_general, janus_config_type_item, "to_janus"); if(!item || !item->value) { - JANUS_LOG(LOG_INFO, "Missing name of outgoing exchange for RabbitMQ integration, using default\n"); - } else { - janus_exchange = g_strdup(item->value); + JANUS_LOG(LOG_FATAL, "Missing name of incoming queue/topic for RabbitMQ integration...\n"); + goto error; + } + to_janus = g_strdup(item->value); + + item = janus_config_get(config, config_general, janus_config_type_item, "from_janus"); + if(!item || !item->value) { + JANUS_LOG(LOG_FATAL, "Missing name of outgoing topic for RabbitMQ integration...\n"); + goto error; } + from_janus = g_strdup(item->value); + if (janus_exchange == NULL) { JANUS_LOG(LOG_INFO, "RabbitMQ support for Janus API enabled, %s:%d (%s/%s) exchange_type:%s \n", rmqhost, rmqport, to_janus, from_janus, janus_exchange_type); } else { @@ -349,17 +359,24 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ JANUS_LOG(LOG_WARN, "RabbitMQ support disabled (Admin API)\n"); } else { /* Parse configuration */ + item = janus_config_get(config, config_general, janus_config_type_item, "queue_name_admin"); + if(item && item->value) { + queue_name_admin = g_strdup(item->value); + } + item = janus_config_get(config, config_admin, janus_config_type_item, "to_janus_admin"); if(!item || !item->value) { JANUS_LOG(LOG_FATAL, "Missing name of incoming queue for RabbitMQ integration...\n"); goto error; } to_janus_admin = g_strdup(item->value); + item = janus_config_get(config, config_admin, janus_config_type_item, "from_janus_admin"); if(!item || !item->value) { - JANUS_LOG(LOG_FATAL, "Missing name of outgoing queue for RabbitMQ integration...\n"); + JANUS_LOG(LOG_FATAL, "Missing name of outgoing topic for RabbitMQ integration...\n"); goto error; } + from_janus_admin = g_strdup(item->value); JANUS_LOG(LOG_INFO, "RabbitMQ support for Admin API enabled, %s:%d (%s/%s)\n", rmqhost, rmqport, to_janus_admin, from_janus_admin); rmq_admin_api_enabled = TRUE; @@ -446,22 +463,39 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ rmq_client->janus_api_enabled = FALSE; if(rmq_janus_api_enabled) { rmq_client->janus_api_enabled = TRUE; - JANUS_LOG(LOG_VERB, "Declaring incoming queue... (%s)\n", to_janus); - rmq_client->to_janus_queue = amqp_cstring_bytes(to_janus); - amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, 0, 0, 0, amqp_empty_table); - result = amqp_get_rpc_reply(rmq_client->rmq_conn); - if(result.reply_type != AMQP_RESPONSE_NORMAL) { - JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); - goto error; - } - JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", from_janus); - rmq_client->from_janus_queue = amqp_cstring_bytes(from_janus); - amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->from_janus_queue, 0, 0, 0, 0, amqp_empty_table); - result = amqp_get_rpc_reply(rmq_client->rmq_conn); - if(result.reply_type != AMQP_RESPONSE_NORMAL) { - JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); - goto error; + // Case when we have a queue_name, and to_janus is the name of the topic to bind on (if exchange_type is topic) + if (queue_name != NULL) { + JANUS_LOG(LOG_VERB, "Declaring incoming queue... (%s)\n", queue_name); + rmq_client->to_janus_queue = amqp_cstring_bytes(queue_name); + amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, 0, 0, 0, amqp_empty_table); + result = amqp_get_rpc_reply(rmq_client->rmq_conn); + if(result.reply_type != AMQP_RESPONSE_NORMAL) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + goto error; + } + + if (strcmp(janus_exchange_type, "topic") == 0) { + JANUS_LOG(LOG_VERB, "Binding queue (%s) to topic (%s)\n", queue_name, to_janus); + amqp_queue_bind(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, rmq_client->janus_exchange, amqp_cstring_bytes(to_janus), amqp_empty_table); + result = amqp_get_rpc_reply(rmq_client->rmq_conn); + if(result.reply_type != AMQP_RESPONSE_NORMAL) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error binding queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + goto error; + } + } + + // Case when to_janus is the name of the queue (and there's no binding) + } else { + JANUS_LOG(LOG_VERB, "Declaring incoming queue... (%s)\n", to_janus); + rmq_client->to_janus_queue = amqp_cstring_bytes(to_janus); + amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, 0, 0, 0, amqp_empty_table); + result = amqp_get_rpc_reply(rmq_client->rmq_conn); + if(result.reply_type != AMQP_RESPONSE_NORMAL) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + goto error; + } } + amqp_basic_consume(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); if(result.reply_type != AMQP_RESPONSE_NORMAL) { @@ -472,22 +506,39 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ rmq_client->admin_api_enabled = FALSE; if(rmq_admin_api_enabled) { rmq_client->admin_api_enabled = TRUE; - JANUS_LOG(LOG_VERB, "Declaring incoming queue... (%s)\n", to_janus_admin); - rmq_client->to_janus_admin_queue = amqp_cstring_bytes(to_janus_admin); - amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, 0, 0, 0, amqp_empty_table); - result = amqp_get_rpc_reply(rmq_client->rmq_conn); - if(result.reply_type != AMQP_RESPONSE_NORMAL) { - JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); - goto error; - } - JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", from_janus_admin); - rmq_client->from_janus_admin_queue = amqp_cstring_bytes(from_janus_admin); - amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->from_janus_admin_queue, 0, 0, 0, 0, amqp_empty_table); - result = amqp_get_rpc_reply(rmq_client->rmq_conn); - if(result.reply_type != AMQP_RESPONSE_NORMAL) { - JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); - goto error; + // Case when we have a queue_name_admin, and to_janus_admin is the name of the topic to bind on (if exchange_type is topic) + if (queue_name_admin != NULL) { + JANUS_LOG(LOG_VERB, "Declaring incoming queue... (%s)\n", queue_name_admin); + rmq_client->to_janus_admin_queue = amqp_cstring_bytes(queue_name_admin); + amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, 0, 0, 0, amqp_empty_table); + result = amqp_get_rpc_reply(rmq_client->rmq_conn); + if(result.reply_type != AMQP_RESPONSE_NORMAL) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + goto error; + } + + if (strcmp(janus_exchange_type, "topic") == 0) { + JANUS_LOG(LOG_VERB, "Binding queue (%s) to topic (%s)\n", queue_name_admin, to_janus_admin); + amqp_queue_bind(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, rmq_client->janus_exchange, amqp_cstring_bytes(to_janus_admin), amqp_empty_table); + result = amqp_get_rpc_reply(rmq_client->rmq_conn); + if(result.reply_type != AMQP_RESPONSE_NORMAL) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error binding queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + goto error; + } + } + + // Case when to_janus_admin is the name of the queue (and there's no binding + } else { + JANUS_LOG(LOG_VERB, "Declaring incoming queue... (%s)\n", to_janus_admin); + rmq_client->to_janus_admin_queue = amqp_cstring_bytes(to_janus_admin); + amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, 0, 0, 0, amqp_empty_table); + result = amqp_get_rpc_reply(rmq_client->rmq_conn); + if(result.reply_type != AMQP_RESPONSE_NORMAL) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + goto error; + } } + amqp_basic_consume(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); if(result.reply_type != AMQP_RESPONSE_NORMAL) { @@ -549,6 +600,9 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ g_free(username); g_free(password); g_free(janus_exchange); + g_free(janus_exchange_type); + g_free(queue_name); + g_free(queue_name_admin); g_free(to_janus); g_free(from_janus); g_free(to_janus_admin); @@ -587,6 +641,9 @@ void janus_rabbitmq_destroy(void) { g_free(username); g_free(password); g_free(janus_exchange); + g_free(janus_exchange_type); + g_free(queue_name); + g_free(queue_name_admin); g_free(to_janus); g_free(from_janus); g_free(to_janus_admin); @@ -871,7 +928,7 @@ void *janus_rmq_out_thread(void *data) { props.content_type = amqp_cstring_bytes("application/json"); amqp_bytes_t message = amqp_cstring_bytes(payload_text); int status = amqp_basic_publish(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->janus_exchange, - response->admin ? rmq_client->from_janus_admin_queue : rmq_client->from_janus_queue, + response->admin ? amqp_cstring_bytes(from_janus_admin) : amqp_cstring_bytes(from_janus), 0, 0, &props, message); if(status != AMQP_STATUS_OK) { JANUS_LOG(LOG_ERR, "Error publishing... %d, %s\n", status, amqp_error_string2(status)); From b3f7ad956a98a5790498b107c57af4fda2f29a2f Mon Sep 17 00:00:00 2001 From: Chris Wiggins Date: Tue, 10 Nov 2020 18:19:07 +1300 Subject: [PATCH 2/8] Update rabbitmq logging information --- transports/janus_rabbitmq.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/transports/janus_rabbitmq.c b/transports/janus_rabbitmq.c index 14f70b098e..8b45579e2b 100644 --- a/transports/janus_rabbitmq.c +++ b/transports/janus_rabbitmq.c @@ -465,7 +465,7 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ rmq_client->janus_api_enabled = TRUE; // Case when we have a queue_name, and to_janus is the name of the topic to bind on (if exchange_type is topic) if (queue_name != NULL) { - JANUS_LOG(LOG_VERB, "Declaring incoming queue... (%s)\n", queue_name); + JANUS_LOG(LOG_VERB, "Declaring incoming queue (using queue_name)... (%s)\n", queue_name); rmq_client->to_janus_queue = amqp_cstring_bytes(queue_name); amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, 0, 0, 0, amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); @@ -486,7 +486,7 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ // Case when to_janus is the name of the queue (and there's no binding) } else { - JANUS_LOG(LOG_VERB, "Declaring incoming queue... (%s)\n", to_janus); + JANUS_LOG(LOG_VERB, "Declaring incoming queue (using to_janus)... (%s)\n", to_janus); rmq_client->to_janus_queue = amqp_cstring_bytes(to_janus); amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, 0, 0, 0, amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); @@ -508,7 +508,7 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ rmq_client->admin_api_enabled = TRUE; // Case when we have a queue_name_admin, and to_janus_admin is the name of the topic to bind on (if exchange_type is topic) if (queue_name_admin != NULL) { - JANUS_LOG(LOG_VERB, "Declaring incoming queue... (%s)\n", queue_name_admin); + JANUS_LOG(LOG_VERB, "Declaring incoming admin queue (using queue_name_admin)... (%s)\n", queue_name_admin); rmq_client->to_janus_admin_queue = amqp_cstring_bytes(queue_name_admin); amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, 0, 0, 0, amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); @@ -529,7 +529,7 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ // Case when to_janus_admin is the name of the queue (and there's no binding } else { - JANUS_LOG(LOG_VERB, "Declaring incoming queue... (%s)\n", to_janus_admin); + JANUS_LOG(LOG_VERB, "Declaring incoming admin queue (using to_janus_admin)... (%s)\n", to_janus_admin); rmq_client->to_janus_admin_queue = amqp_cstring_bytes(to_janus_admin); amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, 0, 0, 0, amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); From 505eeef9d9de4e23f62a7db88cbd08a7863639cf Mon Sep 17 00:00:00 2001 From: Chris Wiggins Date: Tue, 10 Nov 2020 18:29:59 +1300 Subject: [PATCH 3/8] Fix queue_name_admin in rabbitmq transport --- transports/janus_rabbitmq.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transports/janus_rabbitmq.c b/transports/janus_rabbitmq.c index 8b45579e2b..d8fa787e3c 100644 --- a/transports/janus_rabbitmq.c +++ b/transports/janus_rabbitmq.c @@ -359,7 +359,7 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ JANUS_LOG(LOG_WARN, "RabbitMQ support disabled (Admin API)\n"); } else { /* Parse configuration */ - item = janus_config_get(config, config_general, janus_config_type_item, "queue_name_admin"); + item = janus_config_get(config, config_admin, janus_config_type_item, "queue_name_admin"); if(item && item->value) { queue_name_admin = g_strdup(item->value); } From 319c6fc265c8ed4310f554a4cd0edc0661553de9 Mon Sep 17 00:00:00 2001 From: Chris Wiggins Date: Wed, 11 Nov 2020 16:09:26 +1300 Subject: [PATCH 4/8] Increase RabbitMQ logging on publish --- transports/janus_rabbitmq.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transports/janus_rabbitmq.c b/transports/janus_rabbitmq.c index d8fa787e3c..417a28baf3 100644 --- a/transports/janus_rabbitmq.c +++ b/transports/janus_rabbitmq.c @@ -914,7 +914,7 @@ void *janus_rmq_out_thread(void *data) { janus_mutex_lock(&rmq_client->mutex); /* Gotcha! Convert json_t to string */ char *payload_text = response->payload; - JANUS_LOG(LOG_VERB, "Sending %s API message to RabbitMQ (%zu bytes)...\n", response->admin ? "Admin" : "Janus", strlen(payload_text)); + JANUS_LOG(LOG_VERB, "Sending %s API message to RabbitMQ (%zu bytes) on exchange %s with topic %s...\n", response->admin ? "Admin" : "Janus", strlen(payload_text), janus_exchange, response->admin ? from_janus_admin : from_janus); JANUS_LOG(LOG_VERB, "%s\n", payload_text); amqp_basic_properties_t props; props._flags = 0; From 24594f74630567f72d3ccb0e6b80de09afb3242c Mon Sep 17 00:00:00 2001 From: Chris Wiggins Date: Wed, 11 Nov 2020 18:05:24 +1300 Subject: [PATCH 5/8] Check RabbitMQ admin topic in a better way --- transports/janus_rabbitmq.c | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/transports/janus_rabbitmq.c b/transports/janus_rabbitmq.c index 417a28baf3..ce98768fd2 100644 --- a/transports/janus_rabbitmq.c +++ b/transports/janus_rabbitmq.c @@ -839,17 +839,10 @@ void *janus_rmq_in_thread(void *data) { JANUS_LOG(LOG_VERB, "Delivery #%u, %.*s\n", (unsigned) d->delivery_tag, (int) d->routing_key.len, (char *) d->routing_key.bytes); /* Check if this is a Janus or Admin API request */ if(rmq_client->admin_api_enabled) { - if(d->routing_key.len == rmq_client->to_janus_admin_queue.len) { - size_t i=0; + char incoming_topic[d->routing_key.len + 2]; + strlcpy(incoming_topic, (char *)d->routing_key.bytes, d->routing_key.len + 1); // Convert the amqp_bytes_t back to char* + if (strcmp(incoming_topic, to_janus_admin) == 0) { admin = TRUE; - char *inq = (char *)d->routing_key.bytes; - char *expq = (char *)rmq_client->to_janus_admin_queue.bytes; - for(i=0; i< d->routing_key.len; i++) { - if(inq[i] != expq[i]) { - admin = FALSE; - break; - } - } } } JANUS_LOG(LOG_VERB, " -- This is %s API request\n", admin ? "an admin" : "a Janus"); From ed1b5c6fcce36a50acf45f00e69aa2879fe64535 Mon Sep 17 00:00:00 2001 From: Chris Wiggins Date: Thu, 12 Nov 2020 13:21:08 +1300 Subject: [PATCH 6/8] Adds RabbitMQ options for queues, durable, exclusive and autodelete --- conf/janus.transport.rabbitmq.jcfg.sample | 12 +++++- transports/janus_rabbitmq.c | 48 +++++++++++++++++++++-- 2 files changed, 54 insertions(+), 6 deletions(-) diff --git a/conf/janus.transport.rabbitmq.jcfg.sample b/conf/janus.transport.rabbitmq.jcfg.sample index 57dcd666c1..c219ce313c 100644 --- a/conf/janus.transport.rabbitmq.jcfg.sample +++ b/conf/janus.transport.rabbitmq.jcfg.sample @@ -33,6 +33,10 @@ general: { to_janus = "to-janus" # Name of the queue for incoming messages if queue_name isn't set, otherwise, the topic that queue_name is bound to from_janus = "from-janus" # Topic of the message sent from janus + #queue_durable = false # Whether or not queue should remain after a RabbitMQ reboot + #queue_autodelete = false # Whether or not queue should autodelete after janus disconnects from RabbitMQ + #queue_exclusive = false # Whether or not queue should only allow one subscriber + #ssl_enabled = false # Whether ssl support must be enabled #ssl_verify_peer = true # Whether peer verification must be enabled #ssl_verify_hostname = true # Whether hostname verification must be enabled @@ -52,7 +56,11 @@ admin: { queue_name_admin = "janus-gateway-admin" # Queue name for incoming admin messages (if this is set and janus_exchange_type is topic, to_janus_admin will be the topic the queue is bound to the exchange on) - # Deprecated config values, ignored if admin_queue_name is set above - #to_janus_admin = "to-janus-admin" # Name of the queue for incoming messages + #to_janus_admin = "to-janus-admin" # Name of the queue for incoming messages if queue_name_admin isn't set, otherwise, the topic that queue_name_admin is bound to #from_janus_admin = "from-janus-admin" # Topic of the message sent from janus + + #queue_durable_admin = false # Whether or not queue should remain after a RabbitMQ reboot + #queue_autodelete_admin = false # Whether or not queue should autodelete after janus disconnects from RabbitMQ + #queue_exclusive_admin = false # Whether or not queue should only allow one subscriber + } diff --git a/transports/janus_rabbitmq.c b/transports/janus_rabbitmq.c index ce98768fd2..958bc49663 100644 --- a/transports/janus_rabbitmq.c +++ b/transports/janus_rabbitmq.c @@ -463,11 +463,31 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ rmq_client->janus_api_enabled = FALSE; if(rmq_janus_api_enabled) { rmq_client->janus_api_enabled = TRUE; + + // Set queue options + amqp_boolean_t queue_durable = 0; + item = janus_config_get(config, config_general, janus_config_type_item, "queue_durable"); + if(item && item->value && janus_is_true(item->value)) { + queue_durable = 1; + } + + amqp_boolean_t queue_exclusive = 0; + item = janus_config_get(config, config_general, janus_config_type_item, "queue_exclusive"); + if(item && item->value && janus_is_true(item->value)) { + queue_exclusive = 1; + } + + amqp_boolean_t queue_autodelete = 0; + item = janus_config_get(config, config_general, janus_config_type_item, "queue_autodelete"); + if(item && item->value && janus_is_true(item->value)) { + queue_autodelete = 1; + } + // Case when we have a queue_name, and to_janus is the name of the topic to bind on (if exchange_type is topic) if (queue_name != NULL) { JANUS_LOG(LOG_VERB, "Declaring incoming queue (using queue_name)... (%s)\n", queue_name); rmq_client->to_janus_queue = amqp_cstring_bytes(queue_name); - amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, 0, 0, 0, amqp_empty_table); + amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, queue_durable, queue_exclusive, queue_autodelete, amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); if(result.reply_type != AMQP_RESPONSE_NORMAL) { JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); @@ -488,7 +508,7 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ } else { JANUS_LOG(LOG_VERB, "Declaring incoming queue (using to_janus)... (%s)\n", to_janus); rmq_client->to_janus_queue = amqp_cstring_bytes(to_janus); - amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, 0, 0, 0, amqp_empty_table); + amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, queue_durable, queue_exclusive, queue_autodelete, amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); if(result.reply_type != AMQP_RESPONSE_NORMAL) { JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); @@ -506,11 +526,31 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ rmq_client->admin_api_enabled = FALSE; if(rmq_admin_api_enabled) { rmq_client->admin_api_enabled = TRUE; + + // Set queue options + amqp_boolean_t queue_durable_admin = 0; + item = janus_config_get(config, config_admin, janus_config_type_item, "queue_durable_admin"); + if(item && item->value && janus_is_true(item->value)) { + queue_durable_admin = 1; + } + + amqp_boolean_t queue_exclusive_admin = 0; + item = janus_config_get(config, config_admin, janus_config_type_item, "queue_exclusive_admin"); + if(item && item->value && janus_is_true(item->value)) { + queue_exclusive_admin = 1; + } + + amqp_boolean_t queue_autodelete_admin = 0; + item = janus_config_get(config, config_admin, janus_config_type_item, "queue_autodelete_admin"); + if(item && item->value && janus_is_true(item->value)) { + queue_autodelete_admin = 1; + } + // Case when we have a queue_name_admin, and to_janus_admin is the name of the topic to bind on (if exchange_type is topic) if (queue_name_admin != NULL) { JANUS_LOG(LOG_VERB, "Declaring incoming admin queue (using queue_name_admin)... (%s)\n", queue_name_admin); rmq_client->to_janus_admin_queue = amqp_cstring_bytes(queue_name_admin); - amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, 0, 0, 0, amqp_empty_table); + amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, queue_durable_admin, queue_exclusive_admin, queue_autodelete_admin, amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); if(result.reply_type != AMQP_RESPONSE_NORMAL) { JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); @@ -531,7 +571,7 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ } else { JANUS_LOG(LOG_VERB, "Declaring incoming admin queue (using to_janus_admin)... (%s)\n", to_janus_admin); rmq_client->to_janus_admin_queue = amqp_cstring_bytes(to_janus_admin); - amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, 0, 0, 0, amqp_empty_table); + amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, queue_durable_admin, queue_exclusive_admin, queue_autodelete_admin, amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); if(result.reply_type != AMQP_RESPONSE_NORMAL) { JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); From bbdd3e4d085cc338b7907c852e13af958faf2686 Mon Sep 17 00:00:00 2001 From: Chris Wiggins Date: Tue, 17 Nov 2020 11:49:42 +1300 Subject: [PATCH 7/8] Adds back in default outgoing queue behaviour. Adds support for auto-generated queue_names --- ...janus.eventhandler.rabbitmqevh.jcfg.sample | 4 +- conf/janus.transport.rabbitmq.jcfg.sample | 20 +++++---- events/janus_rabbitmqevh.c | 17 +++++++ transports/janus_rabbitmq.c | 45 +++++++++++++++---- 4 files changed, 69 insertions(+), 17 deletions(-) diff --git a/conf/janus.eventhandler.rabbitmqevh.jcfg.sample b/conf/janus.eventhandler.rabbitmqevh.jcfg.sample index c7802fcd14..faa0c2450f 100644 --- a/conf/janus.eventhandler.rabbitmqevh.jcfg.sample +++ b/conf/janus.eventhandler.rabbitmqevh.jcfg.sample @@ -18,10 +18,12 @@ general: { #password = "guest" # Password to use to authenticate, if needed #vhost = "/" # Virtual host to specify when logging in, if needed #exchange = "janus-exchange" - route_key = "janus.events" # Routing key to use when publishing messages + route_key = "janus-events" # Routing key to use when publishing messages #exchange_type = "fanout" # Rabbitmq exchange_type can be one of the available types: direct, topic, headers and fanout (fanout by defualt). #heartbeat = 60 # Defines the seconds without communication that should pass before considering the TCP connection has unreachable. + #declare_outgoing_queue = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior + #ssl_enable = false # Whether ssl support must be enabled #ssl_verify_peer = true # Whether peer verification must be enabled #ssl_verify_hostname = true # Whether hostname verification must be enabled diff --git a/conf/janus.transport.rabbitmq.jcfg.sample b/conf/janus.transport.rabbitmq.jcfg.sample index c219ce313c..c0e26db040 100644 --- a/conf/janus.transport.rabbitmq.jcfg.sample +++ b/conf/janus.transport.rabbitmq.jcfg.sample @@ -28,14 +28,16 @@ general: { #janus_exchange = "janus-exchange" # Exchange for outgoing messages, using default if not provided #janus_exchange_type = "fanout" # Rabbitmq exchange_type can be one of the available types: direct, topic, headers and fanout (fanout by defualt). - queue_name = "janus-gateway" # Queue name for incoming messages (if this is set and janus_exchange_type is topic, to_janus will be the topic the queue is bound to the exchange on) + #queue_name = "janus-gateway" # Queue name for incoming messages (if this is set and janus_exchange_type is topic, to_janus will be the topic the queue is bound to the exchange on) to_janus = "to-janus" # Name of the queue for incoming messages if queue_name isn't set, otherwise, the topic that queue_name is bound to from_janus = "from-janus" # Topic of the message sent from janus - #queue_durable = false # Whether or not queue should remain after a RabbitMQ reboot - #queue_autodelete = false # Whether or not queue should autodelete after janus disconnects from RabbitMQ - #queue_exclusive = false # Whether or not queue should only allow one subscriber + #declare_outgoing_queue = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior + + #queue_durable = false # Whether or not incoming queue should remain after a RabbitMQ reboot + #queue_autodelete = false # Whether or not incoming queue should autodelete after janus disconnects from RabbitMQ + #queue_exclusive = false # Whether or not incoming queue should only allow one subscriber #ssl_enabled = false # Whether ssl support must be enabled #ssl_verify_peer = true # Whether peer verification must be enabled @@ -54,13 +56,15 @@ general: { admin: { #admin_enabled = false # Whether the support must be enabled - queue_name_admin = "janus-gateway-admin" # Queue name for incoming admin messages (if this is set and janus_exchange_type is topic, to_janus_admin will be the topic the queue is bound to the exchange on) + #queue_name_admin = "janus-gateway-admin" # Queue name for incoming admin messages (if this is set and janus_exchange_type is topic, to_janus_admin will be the topic the queue is bound to the exchange on) #to_janus_admin = "to-janus-admin" # Name of the queue for incoming messages if queue_name_admin isn't set, otherwise, the topic that queue_name_admin is bound to #from_janus_admin = "from-janus-admin" # Topic of the message sent from janus - #queue_durable_admin = false # Whether or not queue should remain after a RabbitMQ reboot - #queue_autodelete_admin = false # Whether or not queue should autodelete after janus disconnects from RabbitMQ - #queue_exclusive_admin = false # Whether or not queue should only allow one subscriber + #declare_outgoing_queue_admin = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior + + #queue_durable_admin = false # Whether or not incoming queue should remain after a RabbitMQ reboot + #queue_autodelete_admin = false # Whether or not incoming queue should autodelete after janus disconnects from RabbitMQ + #queue_exclusive_admin = false # Whether or not incoming queue should only allow one subscriber } diff --git a/events/janus_rabbitmqevh.c b/events/janus_rabbitmqevh.c index 57cff8ac17..1ba9683a1a 100644 --- a/events/janus_rabbitmqevh.c +++ b/events/janus_rabbitmqevh.c @@ -114,6 +114,7 @@ static gboolean ssl_verify_hostname = FALSE; static const char *route_key = NULL, *exchange = NULL, *exchange_type = NULL ; static uint16_t heartbeat = 0; static uint16_t rmqport = AMQP_PROTOCOL_PORT; +static gboolean declare_outgoing_queue = TRUE; /* Parameter validation (for tweaking via Admin API) */ static struct janus_json_parameter request_parameters[] = { @@ -265,6 +266,12 @@ int janus_rabbitmqevh_init(const char *config_path) { exchange_type = g_strdup(item->value); } + // By default we *DO* declare the outgoing queue + item = janus_config_get(config, config_general, janus_config_type_item, "declare_outgoing_queue"); + if(item && item->value && !janus_is_true(item->value)) { + declare_outgoing_queue = FALSE; + } + item = janus_config_get(config, config_general, janus_config_type_item, "exchange"); if(!item || !item->value) { JANUS_LOG(LOG_INFO, "RabbitMQEventHandler: Missing name of outgoing exchange for RabbitMQ, using default\n"); @@ -408,6 +415,16 @@ int janus_rabbitmqevh_connect(void) { } } + if (declare_outgoing_queue) { + JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", route_key); + amqp_queue_declare(rmq_conn, rmq_channel, amqp_cstring_bytes(route_key), 0, 0, 0, 0, amqp_empty_table); + result = amqp_get_rpc_reply(rmq_conn); + if(result.reply_type != AMQP_RESPONSE_NORMAL) { + JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + return -1; + } + } + return 0; } diff --git a/transports/janus_rabbitmq.c b/transports/janus_rabbitmq.c index 958bc49663..51f10424bc 100644 --- a/transports/janus_rabbitmq.c +++ b/transports/janus_rabbitmq.c @@ -171,7 +171,6 @@ static char *rmqhost = NULL, *vhost = NULL, *username = NULL, *password = NULL, *to_janus = NULL, *from_janus = NULL, *to_janus_admin = NULL, *from_janus_admin = NULL, *janus_exchange = NULL, *janus_exchange_type = NULL, *queue_name = NULL, *queue_name_admin = NULL; - /* Transport implementation */ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_path) { if(g_atomic_int_get(&stopping)) { @@ -390,6 +389,7 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ /* Connect */ rmq_client->rmq_conn = amqp_new_connection(); amqp_socket_t *socket = NULL; + amqp_queue_declare_ok_t *declare = NULL; int status; JANUS_LOG(LOG_VERB, "Creating RabbitMQ socket...\n"); if (ssl_enabled) { @@ -486,8 +486,9 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ // Case when we have a queue_name, and to_janus is the name of the topic to bind on (if exchange_type is topic) if (queue_name != NULL) { JANUS_LOG(LOG_VERB, "Declaring incoming queue (using queue_name)... (%s)\n", queue_name); - rmq_client->to_janus_queue = amqp_cstring_bytes(queue_name); - amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, queue_durable, queue_exclusive, queue_autodelete, amqp_empty_table); + declare = amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(queue_name), 0, queue_durable, queue_exclusive, queue_autodelete, amqp_empty_table); + rmq_client->to_janus_queue = declare->queue; + JANUS_LOG(LOG_VERB, "Incoming queue declared: (%s)\n", (char *) rmq_client->to_janus_queue.bytes); result = amqp_get_rpc_reply(rmq_client->rmq_conn); if(result.reply_type != AMQP_RESPONSE_NORMAL) { JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); @@ -507,8 +508,21 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ // Case when to_janus is the name of the queue (and there's no binding) } else { JANUS_LOG(LOG_VERB, "Declaring incoming queue (using to_janus)... (%s)\n", to_janus); - rmq_client->to_janus_queue = amqp_cstring_bytes(to_janus); - amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, queue_durable, queue_exclusive, queue_autodelete, amqp_empty_table); + declare = amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(to_janus), 0, queue_durable, queue_exclusive, queue_autodelete, amqp_empty_table); + rmq_client->to_janus_queue = declare->queue; + JANUS_LOG(LOG_VERB, "Incoming queue declared: (%s)\n", (char *)rmq_client->to_janus_queue.bytes); + result = amqp_get_rpc_reply(rmq_client->rmq_conn); + if(result.reply_type != AMQP_RESPONSE_NORMAL) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + goto error; + } + } + + // By default, declare the outgoing queue + item = janus_config_get(config, config_general, janus_config_type_item, "declare_outgoing_queue"); + if (!item || !item->value || janus_is_true(item->value)) { + JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", from_janus); + amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(from_janus), 0, 0, 0, 0, amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); if(result.reply_type != AMQP_RESPONSE_NORMAL) { JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); @@ -549,8 +563,9 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ // Case when we have a queue_name_admin, and to_janus_admin is the name of the topic to bind on (if exchange_type is topic) if (queue_name_admin != NULL) { JANUS_LOG(LOG_VERB, "Declaring incoming admin queue (using queue_name_admin)... (%s)\n", queue_name_admin); - rmq_client->to_janus_admin_queue = amqp_cstring_bytes(queue_name_admin); - amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, queue_durable_admin, queue_exclusive_admin, queue_autodelete_admin, amqp_empty_table); + declare = amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(queue_name_admin), 0, queue_durable_admin, queue_exclusive_admin, queue_autodelete_admin, amqp_empty_table); + rmq_client->to_janus_admin_queue = declare->queue; + JANUS_LOG(LOG_VERB, "Incoming admin queue declared: (%s)\n", (char *) rmq_client->to_janus_queue.bytes); result = amqp_get_rpc_reply(rmq_client->rmq_conn); if(result.reply_type != AMQP_RESPONSE_NORMAL) { JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); @@ -571,7 +586,21 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ } else { JANUS_LOG(LOG_VERB, "Declaring incoming admin queue (using to_janus_admin)... (%s)\n", to_janus_admin); rmq_client->to_janus_admin_queue = amqp_cstring_bytes(to_janus_admin); - amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, queue_durable_admin, queue_exclusive_admin, queue_autodelete_admin, amqp_empty_table); + declare = amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, queue_durable_admin, queue_exclusive_admin, queue_autodelete_admin, amqp_empty_table); + rmq_client->to_janus_admin_queue = declare->queue; + JANUS_LOG(LOG_VERB, "Incoming admin queue declared: (%s)\n", (char *) rmq_client->to_janus_queue.bytes); + result = amqp_get_rpc_reply(rmq_client->rmq_conn); + if(result.reply_type != AMQP_RESPONSE_NORMAL) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + goto error; + } + } + + // By default, declare the outgoing queue + item = janus_config_get(config, config_admin, janus_config_type_item, "declare_outgoing_queue_admin"); + if (!item || !item->value || janus_is_true(item->value)) { + JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", from_janus_admin); + amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(from_janus_admin), 0, 0, 0, 0, amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); if(result.reply_type != AMQP_RESPONSE_NORMAL) { JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); From c0f0e1e105de220e40ba08f6b3c1a0d52bc16307 Mon Sep 17 00:00:00 2001 From: Chris Wiggins Date: Wed, 27 Jan 2021 09:59:02 +1300 Subject: [PATCH 8/8] Fix code style comments, also enable routing for direct exchanges --- ...janus.eventhandler.rabbitmqevh.jcfg.sample | 3 +- conf/janus.transport.rabbitmq.jcfg.sample | 35 ++++++++----------- transports/janus_rabbitmq.c | 28 +++++++-------- 3 files changed, 29 insertions(+), 37 deletions(-) diff --git a/conf/janus.eventhandler.rabbitmqevh.jcfg.sample b/conf/janus.eventhandler.rabbitmqevh.jcfg.sample index faa0c2450f..a29ec4e3cb 100644 --- a/conf/janus.eventhandler.rabbitmqevh.jcfg.sample +++ b/conf/janus.eventhandler.rabbitmqevh.jcfg.sample @@ -18,10 +18,9 @@ general: { #password = "guest" # Password to use to authenticate, if needed #vhost = "/" # Virtual host to specify when logging in, if needed #exchange = "janus-exchange" - route_key = "janus-events" # Routing key to use when publishing messages + route_key = "janus-events" # Routing key to use when publishing messages #exchange_type = "fanout" # Rabbitmq exchange_type can be one of the available types: direct, topic, headers and fanout (fanout by defualt). #heartbeat = 60 # Defines the seconds without communication that should pass before considering the TCP connection has unreachable. - #declare_outgoing_queue = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior #ssl_enable = false # Whether ssl support must be enabled diff --git a/conf/janus.transport.rabbitmq.jcfg.sample b/conf/janus.transport.rabbitmq.jcfg.sample index c0e26db040..9aae80ef0a 100644 --- a/conf/janus.transport.rabbitmq.jcfg.sample +++ b/conf/janus.transport.rabbitmq.jcfg.sample @@ -26,20 +26,16 @@ general: { #vhost = "/" # Virtual host to specify when logging in, if needed #janus_exchange = "janus-exchange" # Exchange for outgoing messages, using default if not provided - #janus_exchange_type = "fanout" # Rabbitmq exchange_type can be one of the available types: direct, topic, headers and fanout (fanout by defualt). - - #queue_name = "janus-gateway" # Queue name for incoming messages (if this is set and janus_exchange_type is topic, to_janus will be the topic the queue is bound to the exchange on) - - to_janus = "to-janus" # Name of the queue for incoming messages if queue_name isn't set, otherwise, the topic that queue_name is bound to - from_janus = "from-janus" # Topic of the message sent from janus - - #declare_outgoing_queue = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior - + #janus_exchange_type = "fanout" # Rabbitmq exchange_type can be one of the available types: direct, topic, headers and fanout (fanout by defualt). + #queue_name = "janus-gateway" # Queue name for incoming messages (if set and janus_exchange_type is topic/direct, to_janus will be the routing key the queue is bound to the exchange on) + to_janus = "to-janus" # Name of the queue for incoming messages if queue_name isn't set, otherwise, the routing key that queue_name is bound to + from_janus = "from-janus" # Routing key of the message sent from janus (as well as the name of the outgoing queue if declare_outgoing_queue = true) + #declare_outgoing_queue = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior #queue_durable = false # Whether or not incoming queue should remain after a RabbitMQ reboot - #queue_autodelete = false # Whether or not incoming queue should autodelete after janus disconnects from RabbitMQ + #queue_autodelete = false # Whether or not incoming queue should autodelete after janus disconnects from RabbitMQ #queue_exclusive = false # Whether or not incoming queue should only allow one subscriber - #ssl_enabled = false # Whether ssl support must be enabled + #ssl_enabled = false # Whether ssl support must be enabled #ssl_verify_peer = true # Whether peer verification must be enabled #ssl_verify_hostname = true # Whether hostname verification must be enabled @@ -54,17 +50,14 @@ general: { # Admin API messaging. The same RabbitMQ server is supposed to be used. # Notice that by default the Admin API support via RabbitMQ is disabled. admin: { - #admin_enabled = false # Whether the support must be enabled - - #queue_name_admin = "janus-gateway-admin" # Queue name for incoming admin messages (if this is set and janus_exchange_type is topic, to_janus_admin will be the topic the queue is bound to the exchange on) - - #to_janus_admin = "to-janus-admin" # Name of the queue for incoming messages if queue_name_admin isn't set, otherwise, the topic that queue_name_admin is bound to - #from_janus_admin = "from-janus-admin" # Topic of the message sent from janus - - #declare_outgoing_queue_admin = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior + #admin_enabled = false # Whether the support must be enabled + #queue_name_admin = "janus-gateway-admin" # Queue name for incoming admin messages (if set and janus_exchange_type is topic/direct, to_janus_admin will be the the routing key the queue is bound to the exchange on) + #to_janus_admin = "to-janus-admin" # Name of the queue for incoming messages if queue_name_admin isn't set, otherwise, the routing key that queue_name_admin is bound to + #from_janus_admin = "from-janus-admin" # Routing key of the message sent from janus (as well as the name of the outgoing queue if declare_outgoing_queue_admin = true) + #declare_outgoing_queue_admin = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior #queue_durable_admin = false # Whether or not incoming queue should remain after a RabbitMQ reboot - #queue_autodelete_admin = false # Whether or not incoming queue should autodelete after janus disconnects from RabbitMQ - #queue_exclusive_admin = false # Whether or not incoming queue should only allow one subscriber + #queue_autodelete_admin = false # Whether or not incoming queue should autodelete after janus disconnects from RabbitMQ + #queue_exclusive_admin = false # Whether or not incoming queue should only allow one subscriber } diff --git a/transports/janus_rabbitmq.c b/transports/janus_rabbitmq.c index 51f10424bc..ce3def4666 100644 --- a/transports/janus_rabbitmq.c +++ b/transports/janus_rabbitmq.c @@ -333,12 +333,12 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ item = janus_config_get(config, config_general, janus_config_type_item, "from_janus"); if(!item || !item->value) { - JANUS_LOG(LOG_FATAL, "Missing name of outgoing topic for RabbitMQ integration...\n"); + JANUS_LOG(LOG_FATAL, "Missing name of outgoing routing key for RabbitMQ integration...\n"); goto error; } from_janus = g_strdup(item->value); - if (janus_exchange == NULL) { + if(janus_exchange == NULL) { JANUS_LOG(LOG_INFO, "RabbitMQ support for Janus API enabled, %s:%d (%s/%s) exchange_type:%s \n", rmqhost, rmqport, to_janus, from_janus, janus_exchange_type); } else { JANUS_LOG(LOG_INFO, "RabbitMQ support for Janus API enabled, %s:%d (%s/%s) exch: (%s) exchange_type:%s \n", rmqhost, rmqport, to_janus, from_janus, janus_exchange, janus_exchange_type); @@ -372,7 +372,7 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ item = janus_config_get(config, config_admin, janus_config_type_item, "from_janus_admin"); if(!item || !item->value) { - JANUS_LOG(LOG_FATAL, "Missing name of outgoing topic for RabbitMQ integration...\n"); + JANUS_LOG(LOG_FATAL, "Missing name of outgoing routing key for RabbitMQ integration...\n"); goto error; } @@ -484,7 +484,7 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ } // Case when we have a queue_name, and to_janus is the name of the topic to bind on (if exchange_type is topic) - if (queue_name != NULL) { + if(queue_name != NULL) { JANUS_LOG(LOG_VERB, "Declaring incoming queue (using queue_name)... (%s)\n", queue_name); declare = amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(queue_name), 0, queue_durable, queue_exclusive, queue_autodelete, amqp_empty_table); rmq_client->to_janus_queue = declare->queue; @@ -495,8 +495,8 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ goto error; } - if (strcmp(janus_exchange_type, "topic") == 0) { - JANUS_LOG(LOG_VERB, "Binding queue (%s) to topic (%s)\n", queue_name, to_janus); + if(strcmp(janus_exchange_type, "topic") == 0 || strcmp(janus_exchange_type, "direct") == 0) { + JANUS_LOG(LOG_VERB, "Binding queue (%s) to routing key (%s)\n", queue_name, to_janus); amqp_queue_bind(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, rmq_client->janus_exchange, amqp_cstring_bytes(to_janus), amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); if(result.reply_type != AMQP_RESPONSE_NORMAL) { @@ -520,7 +520,7 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ // By default, declare the outgoing queue item = janus_config_get(config, config_general, janus_config_type_item, "declare_outgoing_queue"); - if (!item || !item->value || janus_is_true(item->value)) { + if(!item || !item->value || janus_is_true(item->value)) { JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", from_janus); amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(from_janus), 0, 0, 0, 0, amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); @@ -560,8 +560,8 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ queue_autodelete_admin = 1; } - // Case when we have a queue_name_admin, and to_janus_admin is the name of the topic to bind on (if exchange_type is topic) - if (queue_name_admin != NULL) { + // Case when we have a queue_name_admin, and to_janus_admin is the name of the routing key to bind on (if exchange_type is topic or direct) + if(queue_name_admin != NULL) { JANUS_LOG(LOG_VERB, "Declaring incoming admin queue (using queue_name_admin)... (%s)\n", queue_name_admin); declare = amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(queue_name_admin), 0, queue_durable_admin, queue_exclusive_admin, queue_autodelete_admin, amqp_empty_table); rmq_client->to_janus_admin_queue = declare->queue; @@ -572,8 +572,8 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ goto error; } - if (strcmp(janus_exchange_type, "topic") == 0) { - JANUS_LOG(LOG_VERB, "Binding queue (%s) to topic (%s)\n", queue_name_admin, to_janus_admin); + if(strcmp(janus_exchange_type, "topic") == 0 || strcmp(janus_exchange_type, "direct") == 0) { + JANUS_LOG(LOG_VERB, "Binding queue (%s) to routing key (%s)\n", queue_name_admin, to_janus_admin); amqp_queue_bind(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, rmq_client->janus_exchange, amqp_cstring_bytes(to_janus_admin), amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); if(result.reply_type != AMQP_RESPONSE_NORMAL) { @@ -598,7 +598,7 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ // By default, declare the outgoing queue item = janus_config_get(config, config_admin, janus_config_type_item, "declare_outgoing_queue_admin"); - if (!item || !item->value || janus_is_true(item->value)) { + if(!item || !item->value || janus_is_true(item->value)) { JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", from_janus_admin); amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(from_janus_admin), 0, 0, 0, 0, amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); @@ -910,7 +910,7 @@ void *janus_rmq_in_thread(void *data) { if(rmq_client->admin_api_enabled) { char incoming_topic[d->routing_key.len + 2]; strlcpy(incoming_topic, (char *)d->routing_key.bytes, d->routing_key.len + 1); // Convert the amqp_bytes_t back to char* - if (strcmp(incoming_topic, to_janus_admin) == 0) { + if(strcmp(incoming_topic, to_janus_admin) == 0) { admin = TRUE; } } @@ -976,7 +976,7 @@ void *janus_rmq_out_thread(void *data) { janus_mutex_lock(&rmq_client->mutex); /* Gotcha! Convert json_t to string */ char *payload_text = response->payload; - JANUS_LOG(LOG_VERB, "Sending %s API message to RabbitMQ (%zu bytes) on exchange %s with topic %s...\n", response->admin ? "Admin" : "Janus", strlen(payload_text), janus_exchange, response->admin ? from_janus_admin : from_janus); + JANUS_LOG(LOG_VERB, "Sending %s API message to RabbitMQ (%zu bytes) on exchange %s with routing key %s...\n", response->admin ? "Admin" : "Janus", strlen(payload_text), janus_exchange, response->admin ? from_janus_admin : from_janus); JANUS_LOG(LOG_VERB, "%s\n", payload_text); amqp_basic_properties_t props; props._flags = 0;