Skip to content

Commit

Permalink
Adds back in default outgoing queue behaviour. Adds support for auto-…
Browse files Browse the repository at this point in the history
…generated queue_names
  • Loading branch information
chriswiggins committed Nov 16, 2020
1 parent ed1b5c6 commit bbdd3e4
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 17 deletions.
4 changes: 3 additions & 1 deletion conf/janus.eventhandler.rabbitmqevh.jcfg.sample
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ general: {
#password = "guest" # Password to use to authenticate, if needed
#vhost = "/" # Virtual host to specify when logging in, if needed
#exchange = "janus-exchange"
route_key = "janus.events" # Routing key to use when publishing messages
route_key = "janus-events" # Routing key to use when publishing messages
#exchange_type = "fanout" # Rabbitmq exchange_type can be one of the available types: direct, topic, headers and fanout (fanout by defualt).
#heartbeat = 60 # Defines the seconds without communication that should pass before considering the TCP connection has unreachable.

#declare_outgoing_queue = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior

#ssl_enable = false # Whether ssl support must be enabled
#ssl_verify_peer = true # Whether peer verification must be enabled
#ssl_verify_hostname = true # Whether hostname verification must be enabled
Expand Down
20 changes: 12 additions & 8 deletions conf/janus.transport.rabbitmq.jcfg.sample
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ general: {
#janus_exchange = "janus-exchange" # Exchange for outgoing messages, using default if not provided
#janus_exchange_type = "fanout" # Rabbitmq exchange_type can be one of the available types: direct, topic, headers and fanout (fanout by defualt).

queue_name = "janus-gateway" # Queue name for incoming messages (if this is set and janus_exchange_type is topic, to_janus will be the topic the queue is bound to the exchange on)
#queue_name = "janus-gateway" # Queue name for incoming messages (if this is set and janus_exchange_type is topic, to_janus will be the topic the queue is bound to the exchange on)

to_janus = "to-janus" # Name of the queue for incoming messages if queue_name isn't set, otherwise, the topic that queue_name is bound to
from_janus = "from-janus" # Topic of the message sent from janus

#queue_durable = false # Whether or not queue should remain after a RabbitMQ reboot
#queue_autodelete = false # Whether or not queue should autodelete after janus disconnects from RabbitMQ
#queue_exclusive = false # Whether or not queue should only allow one subscriber
#declare_outgoing_queue = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior

#queue_durable = false # Whether or not incoming queue should remain after a RabbitMQ reboot
#queue_autodelete = false # Whether or not incoming queue should autodelete after janus disconnects from RabbitMQ
#queue_exclusive = false # Whether or not incoming queue should only allow one subscriber

#ssl_enabled = false # Whether ssl support must be enabled
#ssl_verify_peer = true # Whether peer verification must be enabled
Expand All @@ -54,13 +56,15 @@ general: {
admin: {
#admin_enabled = false # Whether the support must be enabled

queue_name_admin = "janus-gateway-admin" # Queue name for incoming admin messages (if this is set and janus_exchange_type is topic, to_janus_admin will be the topic the queue is bound to the exchange on)
#queue_name_admin = "janus-gateway-admin" # Queue name for incoming admin messages (if this is set and janus_exchange_type is topic, to_janus_admin will be the topic the queue is bound to the exchange on)

#to_janus_admin = "to-janus-admin" # Name of the queue for incoming messages if queue_name_admin isn't set, otherwise, the topic that queue_name_admin is bound to
#from_janus_admin = "from-janus-admin" # Topic of the message sent from janus

#queue_durable_admin = false # Whether or not queue should remain after a RabbitMQ reboot
#queue_autodelete_admin = false # Whether or not queue should autodelete after janus disconnects from RabbitMQ
#queue_exclusive_admin = false # Whether or not queue should only allow one subscriber
#declare_outgoing_queue_admin = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior

#queue_durable_admin = false # Whether or not incoming queue should remain after a RabbitMQ reboot
#queue_autodelete_admin = false # Whether or not incoming queue should autodelete after janus disconnects from RabbitMQ
#queue_exclusive_admin = false # Whether or not incoming queue should only allow one subscriber

}
17 changes: 17 additions & 0 deletions events/janus_rabbitmqevh.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ static gboolean ssl_verify_hostname = FALSE;
static const char *route_key = NULL, *exchange = NULL, *exchange_type = NULL ;
static uint16_t heartbeat = 0;
static uint16_t rmqport = AMQP_PROTOCOL_PORT;
static gboolean declare_outgoing_queue = TRUE;

/* Parameter validation (for tweaking via Admin API) */
static struct janus_json_parameter request_parameters[] = {
Expand Down Expand Up @@ -265,6 +266,12 @@ int janus_rabbitmqevh_init(const char *config_path) {
exchange_type = g_strdup(item->value);
}

// By default we *DO* declare the outgoing queue
item = janus_config_get(config, config_general, janus_config_type_item, "declare_outgoing_queue");
if(item && item->value && !janus_is_true(item->value)) {
declare_outgoing_queue = FALSE;
}

item = janus_config_get(config, config_general, janus_config_type_item, "exchange");
if(!item || !item->value) {
JANUS_LOG(LOG_INFO, "RabbitMQEventHandler: Missing name of outgoing exchange for RabbitMQ, using default\n");
Expand Down Expand Up @@ -408,6 +415,16 @@ int janus_rabbitmqevh_connect(void) {
}
}

if (declare_outgoing_queue) {
JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", route_key);
amqp_queue_declare(rmq_conn, rmq_channel, amqp_cstring_bytes(route_key), 0, 0, 0, 0, amqp_empty_table);
result = amqp_get_rpc_reply(rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
return -1;
}
}

return 0;
}

Expand Down
45 changes: 37 additions & 8 deletions transports/janus_rabbitmq.c
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ static char *rmqhost = NULL, *vhost = NULL, *username = NULL, *password = NULL,
*to_janus = NULL, *from_janus = NULL, *to_janus_admin = NULL, *from_janus_admin = NULL, *janus_exchange = NULL, *janus_exchange_type = NULL,
*queue_name = NULL, *queue_name_admin = NULL;


/* Transport implementation */
int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_path) {
if(g_atomic_int_get(&stopping)) {
Expand Down Expand Up @@ -390,6 +389,7 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_
/* Connect */
rmq_client->rmq_conn = amqp_new_connection();
amqp_socket_t *socket = NULL;
amqp_queue_declare_ok_t *declare = NULL;
int status;
JANUS_LOG(LOG_VERB, "Creating RabbitMQ socket...\n");
if (ssl_enabled) {
Expand Down Expand Up @@ -486,8 +486,9 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_
// Case when we have a queue_name, and to_janus is the name of the topic to bind on (if exchange_type is topic)
if (queue_name != NULL) {
JANUS_LOG(LOG_VERB, "Declaring incoming queue (using queue_name)... (%s)\n", queue_name);
rmq_client->to_janus_queue = amqp_cstring_bytes(queue_name);
amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, queue_durable, queue_exclusive, queue_autodelete, amqp_empty_table);
declare = amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(queue_name), 0, queue_durable, queue_exclusive, queue_autodelete, amqp_empty_table);
rmq_client->to_janus_queue = declare->queue;
JANUS_LOG(LOG_VERB, "Incoming queue declared: (%s)\n", (char *) rmq_client->to_janus_queue.bytes);
result = amqp_get_rpc_reply(rmq_client->rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
Expand All @@ -507,8 +508,21 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_
// Case when to_janus is the name of the queue (and there's no binding)
} else {
JANUS_LOG(LOG_VERB, "Declaring incoming queue (using to_janus)... (%s)\n", to_janus);
rmq_client->to_janus_queue = amqp_cstring_bytes(to_janus);
amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, queue_durable, queue_exclusive, queue_autodelete, amqp_empty_table);
declare = amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(to_janus), 0, queue_durable, queue_exclusive, queue_autodelete, amqp_empty_table);
rmq_client->to_janus_queue = declare->queue;
JANUS_LOG(LOG_VERB, "Incoming queue declared: (%s)\n", (char *)rmq_client->to_janus_queue.bytes);
result = amqp_get_rpc_reply(rmq_client->rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
goto error;
}
}

// By default, declare the outgoing queue
item = janus_config_get(config, config_general, janus_config_type_item, "declare_outgoing_queue");
if (!item || !item->value || janus_is_true(item->value)) {
JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", from_janus);
amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(from_janus), 0, 0, 0, 0, amqp_empty_table);
result = amqp_get_rpc_reply(rmq_client->rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
Expand Down Expand Up @@ -549,8 +563,9 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_
// Case when we have a queue_name_admin, and to_janus_admin is the name of the topic to bind on (if exchange_type is topic)
if (queue_name_admin != NULL) {
JANUS_LOG(LOG_VERB, "Declaring incoming admin queue (using queue_name_admin)... (%s)\n", queue_name_admin);
rmq_client->to_janus_admin_queue = amqp_cstring_bytes(queue_name_admin);
amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, queue_durable_admin, queue_exclusive_admin, queue_autodelete_admin, amqp_empty_table);
declare = amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(queue_name_admin), 0, queue_durable_admin, queue_exclusive_admin, queue_autodelete_admin, amqp_empty_table);
rmq_client->to_janus_admin_queue = declare->queue;
JANUS_LOG(LOG_VERB, "Incoming admin queue declared: (%s)\n", (char *) rmq_client->to_janus_queue.bytes);
result = amqp_get_rpc_reply(rmq_client->rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
Expand All @@ -571,7 +586,21 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_
} else {
JANUS_LOG(LOG_VERB, "Declaring incoming admin queue (using to_janus_admin)... (%s)\n", to_janus_admin);
rmq_client->to_janus_admin_queue = amqp_cstring_bytes(to_janus_admin);
amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, queue_durable_admin, queue_exclusive_admin, queue_autodelete_admin, amqp_empty_table);
declare = amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, queue_durable_admin, queue_exclusive_admin, queue_autodelete_admin, amqp_empty_table);
rmq_client->to_janus_admin_queue = declare->queue;
JANUS_LOG(LOG_VERB, "Incoming admin queue declared: (%s)\n", (char *) rmq_client->to_janus_queue.bytes);
result = amqp_get_rpc_reply(rmq_client->rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
goto error;
}
}

// By default, declare the outgoing queue
item = janus_config_get(config, config_admin, janus_config_type_item, "declare_outgoing_queue_admin");
if (!item || !item->value || janus_is_true(item->value)) {
JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", from_janus_admin);
amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(from_janus_admin), 0, 0, 0, 0, amqp_empty_table);
result = amqp_get_rpc_reply(rmq_client->rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
Expand Down

0 comments on commit bbdd3e4

Please sign in to comment.