Skip to content

Commit

Permalink
Merge pull request meetecho#2430 from vgrid/master
Browse files Browse the repository at this point in the history
Updates RabbitMQ logic
  • Loading branch information
atoppi committed Jan 29, 2021
2 parents 19ecf48 + c0f0e1e commit b7b1e9e
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 80 deletions.
3 changes: 2 additions & 1 deletion conf/janus.eventhandler.rabbitmqevh.jcfg.sample
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ general: {
#password = "guest" # Password to use to authenticate, if needed
#vhost = "/" # Virtual host to specify when logging in, if needed
#exchange = "janus-exchange"
route_key = "janus-events" # Name of the queue for event messages
route_key = "janus-events" # Routing key to use when publishing messages
#exchange_type = "fanout" # Rabbitmq exchange_type can be one of the available types: direct, topic, headers and fanout (fanout by defualt).
#heartbeat = 60 # Defines the seconds without communication that should pass before considering the TCP connection has unreachable.
#declare_outgoing_queue = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior

#ssl_enable = false # Whether ssl support must be enabled
#ssl_verify_peer = true # Whether peer verification must be enabled
Expand Down
28 changes: 21 additions & 7 deletions conf/janus.transport.rabbitmq.jcfg.sample
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,18 @@ general: {
#username = "guest" # Username to use to authenticate, if needed
#password = "guest" # Password to use to authenticate, if needed
#vhost = "/" # Virtual host to specify when logging in, if needed
to_janus = "to-janus" # Name of the queue for incoming messages
from_janus = "from-janus" # Name of the queue for outgoing messages

#janus_exchange = "janus-exchange" # Exchange for outgoing messages, using default if not provided
#janus_exchange_type = "fanout" # Rabbitmq exchange_type can be one of the available types: direct, topic, headers and fanout (fanout by defualt).
#ssl_enabled = false # Whether ssl support must be enabled
#janus_exchange_type = "fanout" # Rabbitmq exchange_type can be one of the available types: direct, topic, headers and fanout (fanout by defualt).
#queue_name = "janus-gateway" # Queue name for incoming messages (if set and janus_exchange_type is topic/direct, to_janus will be the routing key the queue is bound to the exchange on)
to_janus = "to-janus" # Name of the queue for incoming messages if queue_name isn't set, otherwise, the routing key that queue_name is bound to
from_janus = "from-janus" # Routing key of the message sent from janus (as well as the name of the outgoing queue if declare_outgoing_queue = true)
#declare_outgoing_queue = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior
#queue_durable = false # Whether or not incoming queue should remain after a RabbitMQ reboot
#queue_autodelete = false # Whether or not incoming queue should autodelete after janus disconnects from RabbitMQ
#queue_exclusive = false # Whether or not incoming queue should only allow one subscriber

#ssl_enabled = false # Whether ssl support must be enabled
#ssl_verify_peer = true # Whether peer verification must be enabled
#ssl_verify_hostname = true # Whether hostname verification must be enabled

Expand All @@ -43,7 +50,14 @@ general: {
# Admin API messaging. The same RabbitMQ server is supposed to be used.
# Notice that by default the Admin API support via RabbitMQ is disabled.
admin: {
#admin_enabled = false # Whether the support must be enabled
#to_janus_admin = "to-janus-admin" # Name of the queue for incoming messages
#from_janus_admin = "from-janus-admin" # Name of the queue for outgoing messages
#admin_enabled = false # Whether the support must be enabled

#queue_name_admin = "janus-gateway-admin" # Queue name for incoming admin messages (if set and janus_exchange_type is topic/direct, to_janus_admin will be the the routing key the queue is bound to the exchange on)
#to_janus_admin = "to-janus-admin" # Name of the queue for incoming messages if queue_name_admin isn't set, otherwise, the routing key that queue_name_admin is bound to
#from_janus_admin = "from-janus-admin" # Routing key of the message sent from janus (as well as the name of the outgoing queue if declare_outgoing_queue_admin = true)
#declare_outgoing_queue_admin = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior
#queue_durable_admin = false # Whether or not incoming queue should remain after a RabbitMQ reboot
#queue_autodelete_admin = false # Whether or not incoming queue should autodelete after janus disconnects from RabbitMQ
#queue_exclusive_admin = false # Whether or not incoming queue should only allow one subscriber

}
29 changes: 18 additions & 11 deletions events/janus_rabbitmqevh.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ static size_t json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
static amqp_connection_state_t rmq_conn;
static amqp_channel_t rmq_channel = 0;
static amqp_bytes_t rmq_exchange;
static amqp_bytes_t rmq_route_key;

static janus_mutex mutex;

Expand All @@ -115,6 +114,7 @@ static gboolean ssl_verify_hostname = FALSE;
static const char *route_key = NULL, *exchange = NULL, *exchange_type = NULL ;
static uint16_t heartbeat = 0;
static uint16_t rmqport = AMQP_PROTOCOL_PORT;
static gboolean declare_outgoing_queue = TRUE;

/* Parameter validation (for tweaking via Admin API) */
static struct janus_json_parameter request_parameters[] = {
Expand Down Expand Up @@ -266,6 +266,12 @@ int janus_rabbitmqevh_init(const char *config_path) {
exchange_type = g_strdup(item->value);
}

// By default we *DO* declare the outgoing queue
item = janus_config_get(config, config_general, janus_config_type_item, "declare_outgoing_queue");
if(item && item->value && !janus_is_true(item->value)) {
declare_outgoing_queue = FALSE;
}

item = janus_config_get(config, config_general, janus_config_type_item, "exchange");
if(!item || !item->value) {
JANUS_LOG(LOG_INFO, "RabbitMQEventHandler: Missing name of outgoing exchange for RabbitMQ, using default\n");
Expand Down Expand Up @@ -408,14 +414,17 @@ int janus_rabbitmqevh_connect(void) {
return -1;
}
}
JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", route_key);
rmq_route_key = amqp_cstring_bytes(route_key);
amqp_queue_declare(rmq_conn, rmq_channel, rmq_route_key, 0, 0, 0, 0, amqp_empty_table);
result = amqp_get_rpc_reply(rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
return -1;

if (declare_outgoing_queue) {
JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", route_key);
amqp_queue_declare(rmq_conn, rmq_channel, amqp_cstring_bytes(route_key), 0, 0, 0, 0, amqp_empty_table);
result = amqp_get_rpc_reply(rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
return -1;
}
}

return 0;
}

Expand Down Expand Up @@ -444,8 +453,6 @@ void janus_rabbitmqevh_destroy(void) {
}
if(rmq_exchange.bytes)
g_free((char *)rmq_exchange.bytes);
if(rmq_route_key.bytes)
g_free((char *)rmq_route_key.bytes);
if(rmqhost)
g_free((char *)rmqhost);
if(vhost)
Expand Down Expand Up @@ -611,7 +618,7 @@ static void *jns_rmqevh_hdlr(void *data) {
props.content_type = amqp_cstring_bytes("application/json");
amqp_bytes_t message = amqp_cstring_bytes(event_text);
janus_mutex_lock(&mutex);
int status = amqp_basic_publish(rmq_conn, rmq_channel, rmq_exchange, rmq_route_key, 0, 0, &props, message);
int status = amqp_basic_publish(rmq_conn, rmq_channel, rmq_exchange, amqp_cstring_bytes(route_key), 0, 0, &props, message);
if(status != AMQP_STATUS_OK) {
JANUS_LOG(LOG_ERR, "RabbitMQEventHandler: Error publishing... %d, %s\n", status, amqp_error_string2(status));
}
Expand Down
Loading

0 comments on commit b7b1e9e

Please sign in to comment.