From bbdd3e4d085cc338b7907c852e13af958faf2686 Mon Sep 17 00:00:00 2001 From: Chris Wiggins Date: Tue, 17 Nov 2020 11:49:42 +1300 Subject: [PATCH] Adds back in default outgoing queue behaviour. Adds support for auto-generated queue_names --- ...janus.eventhandler.rabbitmqevh.jcfg.sample | 4 +- conf/janus.transport.rabbitmq.jcfg.sample | 20 +++++---- events/janus_rabbitmqevh.c | 17 +++++++ transports/janus_rabbitmq.c | 45 +++++++++++++++---- 4 files changed, 69 insertions(+), 17 deletions(-) diff --git a/conf/janus.eventhandler.rabbitmqevh.jcfg.sample b/conf/janus.eventhandler.rabbitmqevh.jcfg.sample index c7802fcd14..faa0c2450f 100644 --- a/conf/janus.eventhandler.rabbitmqevh.jcfg.sample +++ b/conf/janus.eventhandler.rabbitmqevh.jcfg.sample @@ -18,10 +18,12 @@ general: { #password = "guest" # Password to use to authenticate, if needed #vhost = "/" # Virtual host to specify when logging in, if needed #exchange = "janus-exchange" - route_key = "janus.events" # Routing key to use when publishing messages + route_key = "janus-events" # Routing key to use when publishing messages #exchange_type = "fanout" # Rabbitmq exchange_type can be one of the available types: direct, topic, headers and fanout (fanout by defualt). #heartbeat = 60 # Defines the seconds without communication that should pass before considering the TCP connection has unreachable. + #declare_outgoing_queue = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior + #ssl_enable = false # Whether ssl support must be enabled #ssl_verify_peer = true # Whether peer verification must be enabled #ssl_verify_hostname = true # Whether hostname verification must be enabled diff --git a/conf/janus.transport.rabbitmq.jcfg.sample b/conf/janus.transport.rabbitmq.jcfg.sample index c219ce313c..c0e26db040 100644 --- a/conf/janus.transport.rabbitmq.jcfg.sample +++ b/conf/janus.transport.rabbitmq.jcfg.sample @@ -28,14 +28,16 @@ general: { #janus_exchange = "janus-exchange" # Exchange for outgoing messages, using default if not provided #janus_exchange_type = "fanout" # Rabbitmq exchange_type can be one of the available types: direct, topic, headers and fanout (fanout by defualt). - queue_name = "janus-gateway" # Queue name for incoming messages (if this is set and janus_exchange_type is topic, to_janus will be the topic the queue is bound to the exchange on) + #queue_name = "janus-gateway" # Queue name for incoming messages (if this is set and janus_exchange_type is topic, to_janus will be the topic the queue is bound to the exchange on) to_janus = "to-janus" # Name of the queue for incoming messages if queue_name isn't set, otherwise, the topic that queue_name is bound to from_janus = "from-janus" # Topic of the message sent from janus - #queue_durable = false # Whether or not queue should remain after a RabbitMQ reboot - #queue_autodelete = false # Whether or not queue should autodelete after janus disconnects from RabbitMQ - #queue_exclusive = false # Whether or not queue should only allow one subscriber + #declare_outgoing_queue = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior + + #queue_durable = false # Whether or not incoming queue should remain after a RabbitMQ reboot + #queue_autodelete = false # Whether or not incoming queue should autodelete after janus disconnects from RabbitMQ + #queue_exclusive = false # Whether or not incoming queue should only allow one subscriber #ssl_enabled = false # Whether ssl support must be enabled #ssl_verify_peer = true # Whether peer verification must be enabled @@ -54,13 +56,15 @@ general: { admin: { #admin_enabled = false # Whether the support must be enabled - queue_name_admin = "janus-gateway-admin" # Queue name for incoming admin messages (if this is set and janus_exchange_type is topic, to_janus_admin will be the topic the queue is bound to the exchange on) + #queue_name_admin = "janus-gateway-admin" # Queue name for incoming admin messages (if this is set and janus_exchange_type is topic, to_janus_admin will be the topic the queue is bound to the exchange on) #to_janus_admin = "to-janus-admin" # Name of the queue for incoming messages if queue_name_admin isn't set, otherwise, the topic that queue_name_admin is bound to #from_janus_admin = "from-janus-admin" # Topic of the message sent from janus - #queue_durable_admin = false # Whether or not queue should remain after a RabbitMQ reboot - #queue_autodelete_admin = false # Whether or not queue should autodelete after janus disconnects from RabbitMQ - #queue_exclusive_admin = false # Whether or not queue should only allow one subscriber + #declare_outgoing_queue_admin = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior + + #queue_durable_admin = false # Whether or not incoming queue should remain after a RabbitMQ reboot + #queue_autodelete_admin = false # Whether or not incoming queue should autodelete after janus disconnects from RabbitMQ + #queue_exclusive_admin = false # Whether or not incoming queue should only allow one subscriber } diff --git a/events/janus_rabbitmqevh.c b/events/janus_rabbitmqevh.c index 57cff8ac17..1ba9683a1a 100644 --- a/events/janus_rabbitmqevh.c +++ b/events/janus_rabbitmqevh.c @@ -114,6 +114,7 @@ static gboolean ssl_verify_hostname = FALSE; static const char *route_key = NULL, *exchange = NULL, *exchange_type = NULL ; static uint16_t heartbeat = 0; static uint16_t rmqport = AMQP_PROTOCOL_PORT; +static gboolean declare_outgoing_queue = TRUE; /* Parameter validation (for tweaking via Admin API) */ static struct janus_json_parameter request_parameters[] = { @@ -265,6 +266,12 @@ int janus_rabbitmqevh_init(const char *config_path) { exchange_type = g_strdup(item->value); } + // By default we *DO* declare the outgoing queue + item = janus_config_get(config, config_general, janus_config_type_item, "declare_outgoing_queue"); + if(item && item->value && !janus_is_true(item->value)) { + declare_outgoing_queue = FALSE; + } + item = janus_config_get(config, config_general, janus_config_type_item, "exchange"); if(!item || !item->value) { JANUS_LOG(LOG_INFO, "RabbitMQEventHandler: Missing name of outgoing exchange for RabbitMQ, using default\n"); @@ -408,6 +415,16 @@ int janus_rabbitmqevh_connect(void) { } } + if (declare_outgoing_queue) { + JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", route_key); + amqp_queue_declare(rmq_conn, rmq_channel, amqp_cstring_bytes(route_key), 0, 0, 0, 0, amqp_empty_table); + result = amqp_get_rpc_reply(rmq_conn); + if(result.reply_type != AMQP_RESPONSE_NORMAL) { + JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + return -1; + } + } + return 0; } diff --git a/transports/janus_rabbitmq.c b/transports/janus_rabbitmq.c index 958bc49663..51f10424bc 100644 --- a/transports/janus_rabbitmq.c +++ b/transports/janus_rabbitmq.c @@ -171,7 +171,6 @@ static char *rmqhost = NULL, *vhost = NULL, *username = NULL, *password = NULL, *to_janus = NULL, *from_janus = NULL, *to_janus_admin = NULL, *from_janus_admin = NULL, *janus_exchange = NULL, *janus_exchange_type = NULL, *queue_name = NULL, *queue_name_admin = NULL; - /* Transport implementation */ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_path) { if(g_atomic_int_get(&stopping)) { @@ -390,6 +389,7 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ /* Connect */ rmq_client->rmq_conn = amqp_new_connection(); amqp_socket_t *socket = NULL; + amqp_queue_declare_ok_t *declare = NULL; int status; JANUS_LOG(LOG_VERB, "Creating RabbitMQ socket...\n"); if (ssl_enabled) { @@ -486,8 +486,9 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ // Case when we have a queue_name, and to_janus is the name of the topic to bind on (if exchange_type is topic) if (queue_name != NULL) { JANUS_LOG(LOG_VERB, "Declaring incoming queue (using queue_name)... (%s)\n", queue_name); - rmq_client->to_janus_queue = amqp_cstring_bytes(queue_name); - amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, queue_durable, queue_exclusive, queue_autodelete, amqp_empty_table); + declare = amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(queue_name), 0, queue_durable, queue_exclusive, queue_autodelete, amqp_empty_table); + rmq_client->to_janus_queue = declare->queue; + JANUS_LOG(LOG_VERB, "Incoming queue declared: (%s)\n", (char *) rmq_client->to_janus_queue.bytes); result = amqp_get_rpc_reply(rmq_client->rmq_conn); if(result.reply_type != AMQP_RESPONSE_NORMAL) { JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); @@ -507,8 +508,21 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ // Case when to_janus is the name of the queue (and there's no binding) } else { JANUS_LOG(LOG_VERB, "Declaring incoming queue (using to_janus)... (%s)\n", to_janus); - rmq_client->to_janus_queue = amqp_cstring_bytes(to_janus); - amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, queue_durable, queue_exclusive, queue_autodelete, amqp_empty_table); + declare = amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(to_janus), 0, queue_durable, queue_exclusive, queue_autodelete, amqp_empty_table); + rmq_client->to_janus_queue = declare->queue; + JANUS_LOG(LOG_VERB, "Incoming queue declared: (%s)\n", (char *)rmq_client->to_janus_queue.bytes); + result = amqp_get_rpc_reply(rmq_client->rmq_conn); + if(result.reply_type != AMQP_RESPONSE_NORMAL) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + goto error; + } + } + + // By default, declare the outgoing queue + item = janus_config_get(config, config_general, janus_config_type_item, "declare_outgoing_queue"); + if (!item || !item->value || janus_is_true(item->value)) { + JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", from_janus); + amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(from_janus), 0, 0, 0, 0, amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); if(result.reply_type != AMQP_RESPONSE_NORMAL) { JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); @@ -549,8 +563,9 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ // Case when we have a queue_name_admin, and to_janus_admin is the name of the topic to bind on (if exchange_type is topic) if (queue_name_admin != NULL) { JANUS_LOG(LOG_VERB, "Declaring incoming admin queue (using queue_name_admin)... (%s)\n", queue_name_admin); - rmq_client->to_janus_admin_queue = amqp_cstring_bytes(queue_name_admin); - amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, queue_durable_admin, queue_exclusive_admin, queue_autodelete_admin, amqp_empty_table); + declare = amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(queue_name_admin), 0, queue_durable_admin, queue_exclusive_admin, queue_autodelete_admin, amqp_empty_table); + rmq_client->to_janus_admin_queue = declare->queue; + JANUS_LOG(LOG_VERB, "Incoming admin queue declared: (%s)\n", (char *) rmq_client->to_janus_queue.bytes); result = amqp_get_rpc_reply(rmq_client->rmq_conn); if(result.reply_type != AMQP_RESPONSE_NORMAL) { JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); @@ -571,7 +586,21 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ } else { JANUS_LOG(LOG_VERB, "Declaring incoming admin queue (using to_janus_admin)... (%s)\n", to_janus_admin); rmq_client->to_janus_admin_queue = amqp_cstring_bytes(to_janus_admin); - amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, queue_durable_admin, queue_exclusive_admin, queue_autodelete_admin, amqp_empty_table); + declare = amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, queue_durable_admin, queue_exclusive_admin, queue_autodelete_admin, amqp_empty_table); + rmq_client->to_janus_admin_queue = declare->queue; + JANUS_LOG(LOG_VERB, "Incoming admin queue declared: (%s)\n", (char *) rmq_client->to_janus_queue.bytes); + result = amqp_get_rpc_reply(rmq_client->rmq_conn); + if(result.reply_type != AMQP_RESPONSE_NORMAL) { + JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); + goto error; + } + } + + // By default, declare the outgoing queue + item = janus_config_get(config, config_admin, janus_config_type_item, "declare_outgoing_queue_admin"); + if (!item || !item->value || janus_is_true(item->value)) { + JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", from_janus_admin); + amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(from_janus_admin), 0, 0, 0, 0, amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); if(result.reply_type != AMQP_RESPONSE_NORMAL) { JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));