From c0f0e1e105de220e40ba08f6b3c1a0d52bc16307 Mon Sep 17 00:00:00 2001 From: Chris Wiggins Date: Wed, 27 Jan 2021 09:59:02 +1300 Subject: [PATCH] Fix code style comments, also enable routing for direct exchanges --- ...janus.eventhandler.rabbitmqevh.jcfg.sample | 3 +- conf/janus.transport.rabbitmq.jcfg.sample | 35 ++++++++----------- transports/janus_rabbitmq.c | 28 +++++++-------- 3 files changed, 29 insertions(+), 37 deletions(-) diff --git a/conf/janus.eventhandler.rabbitmqevh.jcfg.sample b/conf/janus.eventhandler.rabbitmqevh.jcfg.sample index faa0c2450f..a29ec4e3cb 100644 --- a/conf/janus.eventhandler.rabbitmqevh.jcfg.sample +++ b/conf/janus.eventhandler.rabbitmqevh.jcfg.sample @@ -18,10 +18,9 @@ general: { #password = "guest" # Password to use to authenticate, if needed #vhost = "/" # Virtual host to specify when logging in, if needed #exchange = "janus-exchange" - route_key = "janus-events" # Routing key to use when publishing messages + route_key = "janus-events" # Routing key to use when publishing messages #exchange_type = "fanout" # Rabbitmq exchange_type can be one of the available types: direct, topic, headers and fanout (fanout by defualt). #heartbeat = 60 # Defines the seconds without communication that should pass before considering the TCP connection has unreachable. - #declare_outgoing_queue = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior #ssl_enable = false # Whether ssl support must be enabled diff --git a/conf/janus.transport.rabbitmq.jcfg.sample b/conf/janus.transport.rabbitmq.jcfg.sample index c0e26db040..9aae80ef0a 100644 --- a/conf/janus.transport.rabbitmq.jcfg.sample +++ b/conf/janus.transport.rabbitmq.jcfg.sample @@ -26,20 +26,16 @@ general: { #vhost = "/" # Virtual host to specify when logging in, if needed #janus_exchange = "janus-exchange" # Exchange for outgoing messages, using default if not provided - #janus_exchange_type = "fanout" # Rabbitmq exchange_type can be one of the available types: direct, topic, headers and fanout (fanout by defualt). - - #queue_name = "janus-gateway" # Queue name for incoming messages (if this is set and janus_exchange_type is topic, to_janus will be the topic the queue is bound to the exchange on) - - to_janus = "to-janus" # Name of the queue for incoming messages if queue_name isn't set, otherwise, the topic that queue_name is bound to - from_janus = "from-janus" # Topic of the message sent from janus - - #declare_outgoing_queue = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior - + #janus_exchange_type = "fanout" # Rabbitmq exchange_type can be one of the available types: direct, topic, headers and fanout (fanout by defualt). + #queue_name = "janus-gateway" # Queue name for incoming messages (if set and janus_exchange_type is topic/direct, to_janus will be the routing key the queue is bound to the exchange on) + to_janus = "to-janus" # Name of the queue for incoming messages if queue_name isn't set, otherwise, the routing key that queue_name is bound to + from_janus = "from-janus" # Routing key of the message sent from janus (as well as the name of the outgoing queue if declare_outgoing_queue = true) + #declare_outgoing_queue = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior #queue_durable = false # Whether or not incoming queue should remain after a RabbitMQ reboot - #queue_autodelete = false # Whether or not incoming queue should autodelete after janus disconnects from RabbitMQ + #queue_autodelete = false # Whether or not incoming queue should autodelete after janus disconnects from RabbitMQ #queue_exclusive = false # Whether or not incoming queue should only allow one subscriber - #ssl_enabled = false # Whether ssl support must be enabled + #ssl_enabled = false # Whether ssl support must be enabled #ssl_verify_peer = true # Whether peer verification must be enabled #ssl_verify_hostname = true # Whether hostname verification must be enabled @@ -54,17 +50,14 @@ general: { # Admin API messaging. The same RabbitMQ server is supposed to be used. # Notice that by default the Admin API support via RabbitMQ is disabled. admin: { - #admin_enabled = false # Whether the support must be enabled - - #queue_name_admin = "janus-gateway-admin" # Queue name for incoming admin messages (if this is set and janus_exchange_type is topic, to_janus_admin will be the topic the queue is bound to the exchange on) - - #to_janus_admin = "to-janus-admin" # Name of the queue for incoming messages if queue_name_admin isn't set, otherwise, the topic that queue_name_admin is bound to - #from_janus_admin = "from-janus-admin" # Topic of the message sent from janus - - #declare_outgoing_queue_admin = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior + #admin_enabled = false # Whether the support must be enabled + #queue_name_admin = "janus-gateway-admin" # Queue name for incoming admin messages (if set and janus_exchange_type is topic/direct, to_janus_admin will be the the routing key the queue is bound to the exchange on) + #to_janus_admin = "to-janus-admin" # Name of the queue for incoming messages if queue_name_admin isn't set, otherwise, the routing key that queue_name_admin is bound to + #from_janus_admin = "from-janus-admin" # Routing key of the message sent from janus (as well as the name of the outgoing queue if declare_outgoing_queue_admin = true) + #declare_outgoing_queue_admin = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior #queue_durable_admin = false # Whether or not incoming queue should remain after a RabbitMQ reboot - #queue_autodelete_admin = false # Whether or not incoming queue should autodelete after janus disconnects from RabbitMQ - #queue_exclusive_admin = false # Whether or not incoming queue should only allow one subscriber + #queue_autodelete_admin = false # Whether or not incoming queue should autodelete after janus disconnects from RabbitMQ + #queue_exclusive_admin = false # Whether or not incoming queue should only allow one subscriber } diff --git a/transports/janus_rabbitmq.c b/transports/janus_rabbitmq.c index 51f10424bc..ce3def4666 100644 --- a/transports/janus_rabbitmq.c +++ b/transports/janus_rabbitmq.c @@ -333,12 +333,12 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ item = janus_config_get(config, config_general, janus_config_type_item, "from_janus"); if(!item || !item->value) { - JANUS_LOG(LOG_FATAL, "Missing name of outgoing topic for RabbitMQ integration...\n"); + JANUS_LOG(LOG_FATAL, "Missing name of outgoing routing key for RabbitMQ integration...\n"); goto error; } from_janus = g_strdup(item->value); - if (janus_exchange == NULL) { + if(janus_exchange == NULL) { JANUS_LOG(LOG_INFO, "RabbitMQ support for Janus API enabled, %s:%d (%s/%s) exchange_type:%s \n", rmqhost, rmqport, to_janus, from_janus, janus_exchange_type); } else { JANUS_LOG(LOG_INFO, "RabbitMQ support for Janus API enabled, %s:%d (%s/%s) exch: (%s) exchange_type:%s \n", rmqhost, rmqport, to_janus, from_janus, janus_exchange, janus_exchange_type); @@ -372,7 +372,7 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ item = janus_config_get(config, config_admin, janus_config_type_item, "from_janus_admin"); if(!item || !item->value) { - JANUS_LOG(LOG_FATAL, "Missing name of outgoing topic for RabbitMQ integration...\n"); + JANUS_LOG(LOG_FATAL, "Missing name of outgoing routing key for RabbitMQ integration...\n"); goto error; } @@ -484,7 +484,7 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ } // Case when we have a queue_name, and to_janus is the name of the topic to bind on (if exchange_type is topic) - if (queue_name != NULL) { + if(queue_name != NULL) { JANUS_LOG(LOG_VERB, "Declaring incoming queue (using queue_name)... (%s)\n", queue_name); declare = amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(queue_name), 0, queue_durable, queue_exclusive, queue_autodelete, amqp_empty_table); rmq_client->to_janus_queue = declare->queue; @@ -495,8 +495,8 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ goto error; } - if (strcmp(janus_exchange_type, "topic") == 0) { - JANUS_LOG(LOG_VERB, "Binding queue (%s) to topic (%s)\n", queue_name, to_janus); + if(strcmp(janus_exchange_type, "topic") == 0 || strcmp(janus_exchange_type, "direct") == 0) { + JANUS_LOG(LOG_VERB, "Binding queue (%s) to routing key (%s)\n", queue_name, to_janus); amqp_queue_bind(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, rmq_client->janus_exchange, amqp_cstring_bytes(to_janus), amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); if(result.reply_type != AMQP_RESPONSE_NORMAL) { @@ -520,7 +520,7 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ // By default, declare the outgoing queue item = janus_config_get(config, config_general, janus_config_type_item, "declare_outgoing_queue"); - if (!item || !item->value || janus_is_true(item->value)) { + if(!item || !item->value || janus_is_true(item->value)) { JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", from_janus); amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(from_janus), 0, 0, 0, 0, amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); @@ -560,8 +560,8 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ queue_autodelete_admin = 1; } - // Case when we have a queue_name_admin, and to_janus_admin is the name of the topic to bind on (if exchange_type is topic) - if (queue_name_admin != NULL) { + // Case when we have a queue_name_admin, and to_janus_admin is the name of the routing key to bind on (if exchange_type is topic or direct) + if(queue_name_admin != NULL) { JANUS_LOG(LOG_VERB, "Declaring incoming admin queue (using queue_name_admin)... (%s)\n", queue_name_admin); declare = amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(queue_name_admin), 0, queue_durable_admin, queue_exclusive_admin, queue_autodelete_admin, amqp_empty_table); rmq_client->to_janus_admin_queue = declare->queue; @@ -572,8 +572,8 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ goto error; } - if (strcmp(janus_exchange_type, "topic") == 0) { - JANUS_LOG(LOG_VERB, "Binding queue (%s) to topic (%s)\n", queue_name_admin, to_janus_admin); + if(strcmp(janus_exchange_type, "topic") == 0 || strcmp(janus_exchange_type, "direct") == 0) { + JANUS_LOG(LOG_VERB, "Binding queue (%s) to routing key (%s)\n", queue_name_admin, to_janus_admin); amqp_queue_bind(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, rmq_client->janus_exchange, amqp_cstring_bytes(to_janus_admin), amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); if(result.reply_type != AMQP_RESPONSE_NORMAL) { @@ -598,7 +598,7 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ // By default, declare the outgoing queue item = janus_config_get(config, config_admin, janus_config_type_item, "declare_outgoing_queue_admin"); - if (!item || !item->value || janus_is_true(item->value)) { + if(!item || !item->value || janus_is_true(item->value)) { JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", from_janus_admin); amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_cstring_bytes(from_janus_admin), 0, 0, 0, 0, amqp_empty_table); result = amqp_get_rpc_reply(rmq_client->rmq_conn); @@ -910,7 +910,7 @@ void *janus_rmq_in_thread(void *data) { if(rmq_client->admin_api_enabled) { char incoming_topic[d->routing_key.len + 2]; strlcpy(incoming_topic, (char *)d->routing_key.bytes, d->routing_key.len + 1); // Convert the amqp_bytes_t back to char* - if (strcmp(incoming_topic, to_janus_admin) == 0) { + if(strcmp(incoming_topic, to_janus_admin) == 0) { admin = TRUE; } } @@ -976,7 +976,7 @@ void *janus_rmq_out_thread(void *data) { janus_mutex_lock(&rmq_client->mutex); /* Gotcha! Convert json_t to string */ char *payload_text = response->payload; - JANUS_LOG(LOG_VERB, "Sending %s API message to RabbitMQ (%zu bytes) on exchange %s with topic %s...\n", response->admin ? "Admin" : "Janus", strlen(payload_text), janus_exchange, response->admin ? from_janus_admin : from_janus); + JANUS_LOG(LOG_VERB, "Sending %s API message to RabbitMQ (%zu bytes) on exchange %s with routing key %s...\n", response->admin ? "Admin" : "Janus", strlen(payload_text), janus_exchange, response->admin ? from_janus_admin : from_janus); JANUS_LOG(LOG_VERB, "%s\n", payload_text); amqp_basic_properties_t props; props._flags = 0;