Skip to content

Commit

Permalink
Adds RabbitMQ options for queues, durable, exclusive and autodelete
Browse files Browse the repository at this point in the history
  • Loading branch information
chriswiggins committed Nov 12, 2020
1 parent 24594f7 commit ed1b5c6
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 6 deletions.
12 changes: 10 additions & 2 deletions conf/janus.transport.rabbitmq.jcfg.sample
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ general: {
to_janus = "to-janus" # Name of the queue for incoming messages if queue_name isn't set, otherwise, the topic that queue_name is bound to
from_janus = "from-janus" # Topic of the message sent from janus

#queue_durable = false # Whether or not queue should remain after a RabbitMQ reboot
#queue_autodelete = false # Whether or not queue should autodelete after janus disconnects from RabbitMQ
#queue_exclusive = false # Whether or not queue should only allow one subscriber

#ssl_enabled = false # Whether ssl support must be enabled
#ssl_verify_peer = true # Whether peer verification must be enabled
#ssl_verify_hostname = true # Whether hostname verification must be enabled
Expand All @@ -52,7 +56,11 @@ admin: {

queue_name_admin = "janus-gateway-admin" # Queue name for incoming admin messages (if this is set and janus_exchange_type is topic, to_janus_admin will be the topic the queue is bound to the exchange on)

# Deprecated config values, ignored if admin_queue_name is set above
#to_janus_admin = "to-janus-admin" # Name of the queue for incoming messages
#to_janus_admin = "to-janus-admin" # Name of the queue for incoming messages if queue_name_admin isn't set, otherwise, the topic that queue_name_admin is bound to
#from_janus_admin = "from-janus-admin" # Topic of the message sent from janus

#queue_durable_admin = false # Whether or not queue should remain after a RabbitMQ reboot
#queue_autodelete_admin = false # Whether or not queue should autodelete after janus disconnects from RabbitMQ
#queue_exclusive_admin = false # Whether or not queue should only allow one subscriber

}
48 changes: 44 additions & 4 deletions transports/janus_rabbitmq.c
Original file line number Diff line number Diff line change
Expand Up @@ -463,11 +463,31 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_
rmq_client->janus_api_enabled = FALSE;
if(rmq_janus_api_enabled) {
rmq_client->janus_api_enabled = TRUE;

// Set queue options
amqp_boolean_t queue_durable = 0;
item = janus_config_get(config, config_general, janus_config_type_item, "queue_durable");
if(item && item->value && janus_is_true(item->value)) {
queue_durable = 1;
}

amqp_boolean_t queue_exclusive = 0;
item = janus_config_get(config, config_general, janus_config_type_item, "queue_exclusive");
if(item && item->value && janus_is_true(item->value)) {
queue_exclusive = 1;
}

amqp_boolean_t queue_autodelete = 0;
item = janus_config_get(config, config_general, janus_config_type_item, "queue_autodelete");
if(item && item->value && janus_is_true(item->value)) {
queue_autodelete = 1;
}

// Case when we have a queue_name, and to_janus is the name of the topic to bind on (if exchange_type is topic)
if (queue_name != NULL) {
JANUS_LOG(LOG_VERB, "Declaring incoming queue (using queue_name)... (%s)\n", queue_name);
rmq_client->to_janus_queue = amqp_cstring_bytes(queue_name);
amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, 0, 0, 0, amqp_empty_table);
amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, queue_durable, queue_exclusive, queue_autodelete, amqp_empty_table);
result = amqp_get_rpc_reply(rmq_client->rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
Expand All @@ -488,7 +508,7 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_
} else {
JANUS_LOG(LOG_VERB, "Declaring incoming queue (using to_janus)... (%s)\n", to_janus);
rmq_client->to_janus_queue = amqp_cstring_bytes(to_janus);
amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, 0, 0, 0, amqp_empty_table);
amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, queue_durable, queue_exclusive, queue_autodelete, amqp_empty_table);
result = amqp_get_rpc_reply(rmq_client->rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
Expand All @@ -506,11 +526,31 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_
rmq_client->admin_api_enabled = FALSE;
if(rmq_admin_api_enabled) {
rmq_client->admin_api_enabled = TRUE;

// Set queue options
amqp_boolean_t queue_durable_admin = 0;
item = janus_config_get(config, config_admin, janus_config_type_item, "queue_durable_admin");
if(item && item->value && janus_is_true(item->value)) {
queue_durable_admin = 1;
}

amqp_boolean_t queue_exclusive_admin = 0;
item = janus_config_get(config, config_admin, janus_config_type_item, "queue_exclusive_admin");
if(item && item->value && janus_is_true(item->value)) {
queue_exclusive_admin = 1;
}

amqp_boolean_t queue_autodelete_admin = 0;
item = janus_config_get(config, config_admin, janus_config_type_item, "queue_autodelete_admin");
if(item && item->value && janus_is_true(item->value)) {
queue_autodelete_admin = 1;
}

// Case when we have a queue_name_admin, and to_janus_admin is the name of the topic to bind on (if exchange_type is topic)
if (queue_name_admin != NULL) {
JANUS_LOG(LOG_VERB, "Declaring incoming admin queue (using queue_name_admin)... (%s)\n", queue_name_admin);
rmq_client->to_janus_admin_queue = amqp_cstring_bytes(queue_name_admin);
amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, 0, 0, 0, amqp_empty_table);
amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, queue_durable_admin, queue_exclusive_admin, queue_autodelete_admin, amqp_empty_table);
result = amqp_get_rpc_reply(rmq_client->rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
Expand All @@ -531,7 +571,7 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_
} else {
JANUS_LOG(LOG_VERB, "Declaring incoming admin queue (using to_janus_admin)... (%s)\n", to_janus_admin);
rmq_client->to_janus_admin_queue = amqp_cstring_bytes(to_janus_admin);
amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, 0, 0, 0, amqp_empty_table);
amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, queue_durable_admin, queue_exclusive_admin, queue_autodelete_admin, amqp_empty_table);
result = amqp_get_rpc_reply(rmq_client->rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
Expand Down

0 comments on commit ed1b5c6

Please sign in to comment.