Skip to content

Commit

Permalink
Updates RabbitMQ logic
Browse files Browse the repository at this point in the history
- Publishing to a topic does not require an outgoing queue, just the topic, so the outgoing queues are no longer declared
- When the janus_exchange_type is topic, we want to be able to name the queue, and then bind an incoming topic from the exchange to that queue, so that functionality has been added
- This is all backwards compatible with original logic, and won't break existing logic
  • Loading branch information
chriswiggins committed Nov 10, 2020
1 parent a3b840f commit f604aeb
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 66 deletions.
2 changes: 1 addition & 1 deletion conf/janus.eventhandler.rabbitmqevh.jcfg.sample
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ general: {
#password = "guest" # Password to use to authenticate, if needed
#vhost = "/" # Virtual host to specify when logging in, if needed
#exchange = "janus-exchange"
route_key = "janus-events" # Name of the queue for event messages
route_key = "janus.events" # Routing key to use when publishing messages
#exchange_type = "fanout" # Rabbitmq exchange_type can be one of the available types: direct, topic, headers and fanout (fanout by defualt).
#heartbeat = 60 # Defines the seconds without communication that should pass before considering the TCP connection has unreachable.

Expand Down
15 changes: 12 additions & 3 deletions conf/janus.transport.rabbitmq.jcfg.sample
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@ general: {
#username = "guest" # Username to use to authenticate, if needed
#password = "guest" # Password to use to authenticate, if needed
#vhost = "/" # Virtual host to specify when logging in, if needed
to_janus = "to-janus" # Name of the queue for incoming messages
from_janus = "from-janus" # Name of the queue for outgoing messages

#janus_exchange = "janus-exchange" # Exchange for outgoing messages, using default if not provided
#janus_exchange_type = "fanout" # Rabbitmq exchange_type can be one of the available types: direct, topic, headers and fanout (fanout by defualt).

queue_name = "janus-gateway" # Queue name for incoming messages (if this is set and janus_exchange_type is topic, to_janus will be the topic the queue is bound to the exchange on)

to_janus = "to-janus" # Name of the queue for incoming messages if queue_name isn't set, otherwise, the topic that queue_name is bound to
from_janus = "from-janus" # Topic of the message sent from janus

#ssl_enabled = false # Whether ssl support must be enabled
#ssl_verify_peer = true # Whether peer verification must be enabled
#ssl_verify_hostname = true # Whether hostname verification must be enabled
Expand All @@ -44,6 +49,10 @@ general: {
# Notice that by default the Admin API support via RabbitMQ is disabled.
admin: {
#admin_enabled = false # Whether the support must be enabled

queue_name_admin = "janus-gateway-admin" # Queue name for incoming admin messages (if this is set and janus_exchange_type is topic, to_janus_admin will be the topic the queue is bound to the exchange on)

# Deprecated config values, ignored if admin_queue_name is set above
#to_janus_admin = "to-janus-admin" # Name of the queue for incoming messages
#from_janus_admin = "from-janus-admin" # Name of the queue for outgoing messages
#from_janus_admin = "from-janus-admin" # Topic of the message sent from janus
}
14 changes: 2 additions & 12 deletions events/janus_rabbitmqevh.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ static size_t json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
static amqp_connection_state_t rmq_conn;
static amqp_channel_t rmq_channel = 0;
static amqp_bytes_t rmq_exchange;
static amqp_bytes_t rmq_route_key;

static janus_mutex mutex;

Expand Down Expand Up @@ -408,14 +407,7 @@ int janus_rabbitmqevh_connect(void) {
return -1;
}
}
JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", route_key);
rmq_route_key = amqp_cstring_bytes(route_key);
amqp_queue_declare(rmq_conn, rmq_channel, rmq_route_key, 0, 0, 0, 0, amqp_empty_table);
result = amqp_get_rpc_reply(rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
return -1;
}

return 0;
}

Expand Down Expand Up @@ -444,8 +436,6 @@ void janus_rabbitmqevh_destroy(void) {
}
if(rmq_exchange.bytes)
g_free((char *)rmq_exchange.bytes);
if(rmq_route_key.bytes)
g_free((char *)rmq_route_key.bytes);
if(rmqhost)
g_free((char *)rmqhost);
if(vhost)
Expand Down Expand Up @@ -611,7 +601,7 @@ static void *jns_rmqevh_hdlr(void *data) {
props.content_type = amqp_cstring_bytes("application/json");
amqp_bytes_t message = amqp_cstring_bytes(event_text);
janus_mutex_lock(&mutex);
int status = amqp_basic_publish(rmq_conn, rmq_channel, rmq_exchange, rmq_route_key, 0, 0, &props, message);
int status = amqp_basic_publish(rmq_conn, rmq_channel, rmq_exchange, amqp_cstring_bytes(route_key), 0, 0, &props, message);
if(status != AMQP_STATUS_OK) {
JANUS_LOG(LOG_ERR, "RabbitMQEventHandler: Error publishing... %d, %s\n", status, amqp_error_string2(status));
}
Expand Down
157 changes: 107 additions & 50 deletions transports/janus_rabbitmq.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,8 @@ typedef struct janus_rabbitmq_client {
gboolean janus_api_enabled; /* Whether the Janus API via RabbitMQ is enabled */
amqp_bytes_t janus_exchange; /* AMQP exchange for outgoing messages */
amqp_bytes_t to_janus_queue; /* AMQP outgoing messages queue (Janus API) */
amqp_bytes_t from_janus_queue; /* AMQP incoming messages queue (Janus API) */
gboolean admin_api_enabled; /* Whether the Janus API via RabbitMQ is enabled */
amqp_bytes_t to_janus_admin_queue; /* AMQP outgoing messages queue (Admin API) */
amqp_bytes_t from_janus_admin_queue; /* AMQP incoming messages queue (Admin API) */
GThread *in_thread, *out_thread; /* Threads to handle incoming and outgoing queues */
GAsyncQueue *messages; /* Queue of outgoing messages to push */
janus_mutex mutex; /* Mutex to lock/unlock this session */
Expand Down Expand Up @@ -170,7 +168,8 @@ static janus_transport_session *rmq_session = NULL;
/* Global properties */
static char *rmqhost = NULL, *vhost = NULL, *username = NULL, *password = NULL,
*ssl_cacert_file = NULL, *ssl_cert_file = NULL, *ssl_key_file = NULL,
*to_janus = NULL, *from_janus = NULL, *to_janus_admin = NULL, *from_janus_admin = NULL, *janus_exchange = NULL, *janus_exchange_type = NULL;
*to_janus = NULL, *from_janus = NULL, *to_janus_admin = NULL, *from_janus_admin = NULL, *janus_exchange = NULL, *janus_exchange_type = NULL,
*queue_name = NULL, *queue_name_admin = NULL;


/* Transport implementation */
Expand Down Expand Up @@ -304,31 +303,42 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_
if(!item || !item->value || !janus_is_true(item->value)) {
JANUS_LOG(LOG_WARN, "RabbitMQ support disabled (Janus API)\n");
} else {
/* Parse configuration */
item = janus_config_get(config, config_general, janus_config_type_item, "to_janus");
if(!item || !item->value) {
JANUS_LOG(LOG_FATAL, "Missing name of incoming queue for RabbitMQ integration...\n");
goto error;
}
to_janus = g_strdup(item->value);
item = janus_config_get(config, config_general, janus_config_type_item, "from_janus");

// Get exchange name config, or set to default exchange
item = janus_config_get(config, config_general, janus_config_type_item, "janus_exchange");
if(!item || !item->value) {
JANUS_LOG(LOG_FATAL, "Missing name of outgoing queue for RabbitMQ integration...\n");
goto error;
JANUS_LOG(LOG_INFO, "Missing name of outgoing exchange for RabbitMQ integration, using default\n");
} else {
janus_exchange = g_strdup(item->value);
}
from_janus = g_strdup(item->value);

// Get exchange type config, or set to default
item = janus_config_get(config, config_general, janus_config_type_item, "janus_exchange_type");
if(!item || !item->value) {
janus_exchange_type = (char *)JANUS_RABBITMQ_EXCHANGE_TYPE;
} else {
janus_exchange_type = g_strdup(item->value);
}
item = janus_config_get(config, config_general, janus_config_type_item, "janus_exchange");

item = janus_config_get(config, config_general, janus_config_type_item, "queue_name");
if(item && item->value) {
queue_name = g_strdup(item->value);
}

item = janus_config_get(config, config_general, janus_config_type_item, "to_janus");
if(!item || !item->value) {
JANUS_LOG(LOG_INFO, "Missing name of outgoing exchange for RabbitMQ integration, using default\n");
} else {
janus_exchange = g_strdup(item->value);
JANUS_LOG(LOG_FATAL, "Missing name of incoming queue/topic for RabbitMQ integration...\n");
goto error;
}
to_janus = g_strdup(item->value);

item = janus_config_get(config, config_general, janus_config_type_item, "from_janus");
if(!item || !item->value) {
JANUS_LOG(LOG_FATAL, "Missing name of outgoing topic for RabbitMQ integration...\n");
goto error;
}
from_janus = g_strdup(item->value);

if (janus_exchange == NULL) {
JANUS_LOG(LOG_INFO, "RabbitMQ support for Janus API enabled, %s:%d (%s/%s) exchange_type:%s \n", rmqhost, rmqport, to_janus, from_janus, janus_exchange_type);
} else {
Expand All @@ -349,17 +359,24 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_
JANUS_LOG(LOG_WARN, "RabbitMQ support disabled (Admin API)\n");
} else {
/* Parse configuration */
item = janus_config_get(config, config_general, janus_config_type_item, "queue_name_admin");
if(item && item->value) {
queue_name_admin = g_strdup(item->value);
}

item = janus_config_get(config, config_admin, janus_config_type_item, "to_janus_admin");
if(!item || !item->value) {
JANUS_LOG(LOG_FATAL, "Missing name of incoming queue for RabbitMQ integration...\n");
goto error;
}
to_janus_admin = g_strdup(item->value);

item = janus_config_get(config, config_admin, janus_config_type_item, "from_janus_admin");
if(!item || !item->value) {
JANUS_LOG(LOG_FATAL, "Missing name of outgoing queue for RabbitMQ integration...\n");
JANUS_LOG(LOG_FATAL, "Missing name of outgoing topic for RabbitMQ integration...\n");
goto error;
}

from_janus_admin = g_strdup(item->value);
JANUS_LOG(LOG_INFO, "RabbitMQ support for Admin API enabled, %s:%d (%s/%s)\n", rmqhost, rmqport, to_janus_admin, from_janus_admin);
rmq_admin_api_enabled = TRUE;
Expand Down Expand Up @@ -446,22 +463,39 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_
rmq_client->janus_api_enabled = FALSE;
if(rmq_janus_api_enabled) {
rmq_client->janus_api_enabled = TRUE;
JANUS_LOG(LOG_VERB, "Declaring incoming queue... (%s)\n", to_janus);
rmq_client->to_janus_queue = amqp_cstring_bytes(to_janus);
amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, 0, 0, 0, amqp_empty_table);
result = amqp_get_rpc_reply(rmq_client->rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
goto error;
}
JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", from_janus);
rmq_client->from_janus_queue = amqp_cstring_bytes(from_janus);
amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->from_janus_queue, 0, 0, 0, 0, amqp_empty_table);
result = amqp_get_rpc_reply(rmq_client->rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
goto error;
// Case when we have a queue_name, and to_janus is the name of the topic to bind on (if exchange_type is topic)
if (queue_name != NULL) {
JANUS_LOG(LOG_VERB, "Declaring incoming queue... (%s)\n", queue_name);
rmq_client->to_janus_queue = amqp_cstring_bytes(queue_name);
amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, 0, 0, 0, amqp_empty_table);
result = amqp_get_rpc_reply(rmq_client->rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
goto error;
}

if (strcmp(janus_exchange_type, "topic") == 0) {
JANUS_LOG(LOG_VERB, "Binding queue (%s) to topic (%s)\n", queue_name, to_janus);
amqp_queue_bind(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, rmq_client->janus_exchange, amqp_cstring_bytes(to_janus), amqp_empty_table);
result = amqp_get_rpc_reply(rmq_client->rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error binding queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
goto error;
}
}

// Case when to_janus is the name of the queue (and there's no binding)
} else {
JANUS_LOG(LOG_VERB, "Declaring incoming queue... (%s)\n", to_janus);
rmq_client->to_janus_queue = amqp_cstring_bytes(to_janus);
amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, 0, 0, 0, 0, amqp_empty_table);
result = amqp_get_rpc_reply(rmq_client->rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
goto error;
}
}

amqp_basic_consume(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_queue, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
result = amqp_get_rpc_reply(rmq_client->rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
Expand All @@ -472,22 +506,39 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_
rmq_client->admin_api_enabled = FALSE;
if(rmq_admin_api_enabled) {
rmq_client->admin_api_enabled = TRUE;
JANUS_LOG(LOG_VERB, "Declaring incoming queue... (%s)\n", to_janus_admin);
rmq_client->to_janus_admin_queue = amqp_cstring_bytes(to_janus_admin);
amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, 0, 0, 0, amqp_empty_table);
result = amqp_get_rpc_reply(rmq_client->rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
goto error;
}
JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", from_janus_admin);
rmq_client->from_janus_admin_queue = amqp_cstring_bytes(from_janus_admin);
amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->from_janus_admin_queue, 0, 0, 0, 0, amqp_empty_table);
result = amqp_get_rpc_reply(rmq_client->rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
goto error;
// Case when we have a queue_name_admin, and to_janus_admin is the name of the topic to bind on (if exchange_type is topic)
if (queue_name_admin != NULL) {
JANUS_LOG(LOG_VERB, "Declaring incoming queue... (%s)\n", queue_name_admin);
rmq_client->to_janus_admin_queue = amqp_cstring_bytes(queue_name_admin);
amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, 0, 0, 0, amqp_empty_table);
result = amqp_get_rpc_reply(rmq_client->rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
goto error;
}

if (strcmp(janus_exchange_type, "topic") == 0) {
JANUS_LOG(LOG_VERB, "Binding queue (%s) to topic (%s)\n", queue_name_admin, to_janus_admin);
amqp_queue_bind(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, rmq_client->janus_exchange, amqp_cstring_bytes(to_janus_admin), amqp_empty_table);
result = amqp_get_rpc_reply(rmq_client->rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error binding queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
goto error;
}
}

// Case when to_janus_admin is the name of the queue (and there's no binding
} else {
JANUS_LOG(LOG_VERB, "Declaring incoming queue... (%s)\n", to_janus_admin);
rmq_client->to_janus_admin_queue = amqp_cstring_bytes(to_janus_admin);
amqp_queue_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, 0, 0, 0, 0, amqp_empty_table);
result = amqp_get_rpc_reply(rmq_client->rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
goto error;
}
}

amqp_basic_consume(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->to_janus_admin_queue, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
result = amqp_get_rpc_reply(rmq_client->rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
Expand Down Expand Up @@ -549,6 +600,9 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_
g_free(username);
g_free(password);
g_free(janus_exchange);
g_free(janus_exchange_type);
g_free(queue_name);
g_free(queue_name_admin);
g_free(to_janus);
g_free(from_janus);
g_free(to_janus_admin);
Expand Down Expand Up @@ -587,6 +641,9 @@ void janus_rabbitmq_destroy(void) {
g_free(username);
g_free(password);
g_free(janus_exchange);
g_free(janus_exchange_type);
g_free(queue_name);
g_free(queue_name_admin);
g_free(to_janus);
g_free(from_janus);
g_free(to_janus_admin);
Expand Down Expand Up @@ -871,7 +928,7 @@ void *janus_rmq_out_thread(void *data) {
props.content_type = amqp_cstring_bytes("application/json");
amqp_bytes_t message = amqp_cstring_bytes(payload_text);
int status = amqp_basic_publish(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->janus_exchange,
response->admin ? rmq_client->from_janus_admin_queue : rmq_client->from_janus_queue,
response->admin ? amqp_cstring_bytes(from_janus_admin) : amqp_cstring_bytes(from_janus),
0, 0, &props, message);
if(status != AMQP_STATUS_OK) {
JANUS_LOG(LOG_ERR, "Error publishing... %d, %s\n", status, amqp_error_string2(status));
Expand Down

0 comments on commit f604aeb

Please sign in to comment.