From ed1b5c6fcce36a50acf45f00e69aa2879fe64535 Mon Sep 17 00:00:00 2001 From: Chris Wiggins Date: Thu, 12 Nov 2020 13:21:08 +1300 Subject: [PATCH] Adds RabbitMQ options for queues, durable, exclusive and autodelete --- conf/janus.transport.rabbitmq.jcfg.sample | 12 +++++- transports/janus_rabbitmq.c | 48 +++++++++++++++++++++-- 2 files changed, 54 insertions(+), 6 deletions(-) diff --git a/conf/janus.transport.rabbitmq.jcfg.sample b/conf/janus.transport.rabbitmq.jcfg.sample index 57dcd666c1..c219ce313c 100644 --- a/conf/janus.transport.rabbitmq.jcfg.sample +++ b/conf/janus.transport.rabbitmq.jcfg.sample @@ -33,6 +33,10 @@ general: { to_janus = "to-janus" # Name of the queue for incoming messages if queue_name isn't set, otherwise, the topic that queue_name is bound to from_janus = "from-janus" # Topic of the message sent from janus + #queue_durable = false # Whether or not queue should remain after a RabbitMQ reboot + #queue_autodelete = false # Whether or not queue should autodelete after janus disconnects from RabbitMQ + #queue_exclusive = false # Whether or not queue should only allow one subscriber + #ssl_enabled = false # Whether ssl support must be enabled #ssl_verify_peer = true # Whether peer verification must be enabled #ssl_verify_hostname = true # Whether hostname verification must be enabled @@ -52,7 +56,11 @@ admin: { queue_name_admin = "janus-gateway-admin" # Queue name for incoming admin messages (if this is set and janus_exchange_type is topic, to_janus_admin will be the topic the queue is bound to the exchange on) - # Deprecated config values, ignored if admin_queue_name is set above - #to_janus_admin = "to-janus-admin" # Name of the queue for incoming messages + #to_janus_admin = "to-janus-admin" # Name of the queue for incoming messages if queue_name_admin isn't set, otherwise, the topic that queue_name_admin is bound to #from_janus_admin = "from-janus-admin" # Topic of the message sent from janus + + #queue_durable_admin = false # Whether or not queue should remain after a RabbitMQ reboot + #queue_autodelete_admin = false # Whether or not queue should autodelete after janus disconnects from RabbitMQ + #queue_exclusive_admin = false # Whether or not queue should only allow one subscriber + } diff --git a/transports/janus_rabbitmq.c b/transports/janus_rabbitmq.c index ce98768fd2..958bc49663 100644 --- a/transports/janus_rabbitmq.c +++ b/transports/janus_rabbitmq.c @@ -463,11 +463,31 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ rmq_client->janus_api_enabled = FALSE; if(rmq_janus_api_enabled) { rmq_client->janus_api_enabled = TRUE; + + // Set queue options + amqp_boolean_t queue_durable = 0; + item = janus_config_get(config, config_general, janus_config_type_item, "queue_durable"); + if(item && item->value && janus_is_true(item->value)) { + queue_durable = 1; + } + + amqp_boolean_t queue_exclusive = 0; + item = janus_config_get(config, config_general, janus_config_type_item, "queue_exclusive"); + if(item && item->value && janus_is_true(item->value)) { + queue_exclusive = 1; + } + + amqp_boolean_t queue_autodelete = 0; + item = janus_config_get(config, config_general, janus_config_type_item, "queue_autodelete"); + if(item && item->value && janus_is_true(item->value)) { + queue_autodelete = 1; + } + // Case when we have a queue_name, and to_janus is the name of the topic to bind on (if exchange_type is topic) if (queue_name != NULL) { JANUS_LOG(LOG_VERB, "Declaring incoming queue (using queue_name)... (%s)\n", queue_name); rmq_client->to_janus_queue = amqp_cstring_bytes(queue_name); - amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, 0, 0, 0, amqp_empty_table); + amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, queue_durable, queue_exclusive, queue_autodelete, amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); if(result.reply_type != AMQP_RESPONSE_NORMAL) { JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); @@ -488,7 +508,7 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ } else { JANUS_LOG(LOG_VERB, "Declaring incoming queue (using to_janus)... (%s)\n", to_janus); rmq_client->to_janus_queue = amqp_cstring_bytes(to_janus); - amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, 0, 0, 0, amqp_empty_table); + amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, queue_durable, queue_exclusive, queue_autodelete, amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); if(result.reply_type != AMQP_RESPONSE_NORMAL) { JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); @@ -506,11 +526,31 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ rmq_client->admin_api_enabled = FALSE; if(rmq_admin_api_enabled) { rmq_client->admin_api_enabled = TRUE; + + // Set queue options + amqp_boolean_t queue_durable_admin = 0; + item = janus_config_get(config, config_admin, janus_config_type_item, "queue_durable_admin"); + if(item && item->value && janus_is_true(item->value)) { + queue_durable_admin = 1; + } + + amqp_boolean_t queue_exclusive_admin = 0; + item = janus_config_get(config, config_admin, janus_config_type_item, "queue_exclusive_admin"); + if(item && item->value && janus_is_true(item->value)) { + queue_exclusive_admin = 1; + } + + amqp_boolean_t queue_autodelete_admin = 0; + item = janus_config_get(config, config_admin, janus_config_type_item, "queue_autodelete_admin"); + if(item && item->value && janus_is_true(item->value)) { + queue_autodelete_admin = 1; + } + // Case when we have a queue_name_admin, and to_janus_admin is the name of the topic to bind on (if exchange_type is topic) if (queue_name_admin != NULL) { JANUS_LOG(LOG_VERB, "Declaring incoming admin queue (using queue_name_admin)... (%s)\n", queue_name_admin); rmq_client->to_janus_admin_queue = amqp_cstring_bytes(queue_name_admin); - amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, 0, 0, 0, amqp_empty_table); + amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, queue_durable_admin, queue_exclusive_admin, queue_autodelete_admin, amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); if(result.reply_type != AMQP_RESPONSE_NORMAL) { JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id)); @@ -531,7 +571,7 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ } else { JANUS_LOG(LOG_VERB, "Declaring incoming admin queue (using to_janus_admin)... (%s)\n", to_janus_admin); rmq_client->to_janus_admin_queue = amqp_cstring_bytes(to_janus_admin); - amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, 0, 0, 0, amqp_empty_table); + amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, queue_durable_admin, queue_exclusive_admin, queue_autodelete_admin, amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); if(result.reply_type != AMQP_RESPONSE_NORMAL) { JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));