From d76b262d1edbaa095e409c786d3a6d69a1caba7d Mon Sep 17 00:00:00 2001 From: Lorenzo Miniero Date: Mon, 3 May 2021 16:02:56 +0200 Subject: [PATCH 1/5] Added support for forwarding groups in AudioBridge --- conf/janus.plugin.audiobridge.jcfg.sample | 2 + html/audiobridgetest.js | 9 +- plugins/janus_audiobridge.c | 483 +++++++++++++++++++--- 3 files changed, 435 insertions(+), 59 deletions(-) diff --git a/conf/janus.plugin.audiobridge.jcfg.sample b/conf/janus.plugin.audiobridge.jcfg.sample index c5f2359ddb..323e358521 100644 --- a/conf/janus.plugin.audiobridge.jcfg.sample +++ b/conf/janus.plugin.audiobridge.jcfg.sample @@ -12,6 +12,7 @@ # default_prebuffering = number of packets to buffer before decoding each particiant (default=6) # record = true|false (whether this room should be recorded, default=false) # record_file = "/path/to/recording.wav" (where to save the recording) +# groups = optional, non-hierarchical, array of groups to tag participants, for external forwarding purposes only # # The following lines are only needed if you want the mixed audio # to be automatically forwarded via plain RTP to an external component @@ -24,6 +25,7 @@ # rtp_forward_ssrc = SSRC to use to use when streaming (optional: stream_id used if missing) # rtp_forward_codec = opus (default), pcma (A-Law) or pcmu (mu-Law) # rtp_forward_ptype = payload type to use when streaming (optional: only read for Opus, 100 used if missing) +# rtp_forward_group = group of participants to forward, if enabled in the room (optional: forwards full mix if missing) # rtp_forward_srtp_suite = length of authentication tag (32 or 80) # rtp_forward_srtp_crypto = "" # rtp_forward_always_on = true|false, whether silence should be forwarded when the room is empty (optional: false used if missing) diff --git a/html/audiobridgetest.js b/html/audiobridgetest.js index 6d0de4f7c5..eae14c2d40 100644 --- a/html/audiobridgetest.js +++ b/html/audiobridgetest.js @@ -57,6 +57,9 @@ var spinner = null; var myroom = 1234; // Demo room if(getQueryStringValue("room") !== "") myroom = parseInt(getQueryStringValue("room")); +var mygroup = null; // Forwarding group, if required by the room +if(getQueryStringValue("group") !== "") + mygroup = getQueryStringValue("group"); var myusername = null; var myid = null; var webrtcUp = false; @@ -370,7 +373,11 @@ function registerUsername() { } var register = { request: "join", room: myroom, display: username }; myusername = username; - mixertest.send({ message: register}); + // If the room uses forwarding groups, this is how we state ours + if(mygroup) + register["group"] = mygroup; + // Send the message + mixertest.send({ message: register }); } } diff --git a/plugins/janus_audiobridge.c b/plugins/janus_audiobridge.c index c57d0e94ae..f4a867161f 100644 --- a/plugins/janus_audiobridge.c +++ b/plugins/janus_audiobridge.c @@ -40,6 +40,7 @@ room-: { default_prebuffering = number of packets to buffer before decoding each participant (default=DEFAULT_PREBUFFERING) record = true|false (whether this room should be recorded, default=false) record_file = /path/to/recording.wav (where to save the recording) + groups = optional, non-hierarchical, array of groups to tag participants, for external forwarding purposes only [The following lines are only needed if you want the mixed audio to be automatically forwarded via plain RTP to an external component @@ -52,6 +53,7 @@ room-: { rtp_forward_ssrc = SSRC to use to use when streaming (optional: stream_id used if missing) rtp_forward_codec = opus (default), pcma (A-Law) or pcmu (mu-Law) rtp_forward_ptype = payload type to use when streaming (optional: only read for Opus, 100 used if missing) + rtp_forward_group = group of participants to forward, if enabled in the room (optional: forwards full mix if missing) rtp_forward_srtp_suite = length of authentication tag, if SRTP is needed (32 or 80) rtp_forward_srtp_crypto = key to use as crypto, if SRTP is needed (base64 encoded key as in SDES) rtp_forward_always_on = true|false, whether silence should be forwarded when the room is empty (optional: false used if missing) @@ -108,7 +110,9 @@ room-: { * existing RTP forwarder; \c listforwarders lists all the current RTP * forwarders on a specific AudioBridge room instance. As an alternative, * you can configure a single static RTP forwarder in the plugin - * configuration file. + * configuration file. A finer grained control of what to forward + * externally, in terms of participants mix, can be achieved using + * groups. * * \c create can be used to create a new audio room, and has to be * formatted as follows: @@ -131,6 +135,7 @@ room-: { "default_prebuffering" : , "record" : , "record_file" : "", + "groups" : [ non-hierarchical array of string group names to use to gat participants, for external forwarding purposes only, optional] } \endverbatim * @@ -384,6 +389,7 @@ room-: { { "request" : "rtp_forward", "room" : , + "group" : "", "ssrc" : , "codec" : "", "ptype" : , @@ -395,6 +401,20 @@ room-: { "always_on" : } \endverbatim + * + * The concept of "groups" is particularly important, here, in case groups were + * enabled when creating a room. By default, in fact, if a room has groups disabled, + * then an RTP forwarder will simply relay the mix of all active participants; + * sometimes, though, an external application may want to only receive the mix + * of some of the participants, and not all of them. This is what groups are + * for: if you tag participants with a specific group name, then creating a + * new forwarder that explicitly references that group name will ensure that + * only a mix of the participants tagged with that name will be forwarded. + * As such, it's important to point out groups \b only impact forwarders, + * and \c NOT participants or how they're mixed in main mix for the room itself. + * Omitting a group name when creating a forwarder for a room where groups + * are enabled will simply fall back to the default behaviour of forwarding + * the full mix. * * Notice that, as explained above, in case you configured an \c admin_key * property and extended it to RTP forwarding as well, you'll need to provide @@ -482,6 +502,7 @@ room-: { "request" : "play_file", "room" : , "secret" : "", + "group" : "", "file_id": "", "filename": "", "loop": @@ -634,6 +655,7 @@ room-: { "request" : "join", "room" : , "id" : , + "group" : "", "pin" : "", "display" : "", "token" : "", @@ -681,7 +703,8 @@ room-: { "quality" : , "volume" : , "record": " + "filename": "", + "group" : "" } \endverbatim * @@ -772,6 +795,7 @@ room-: { "request" : "changeroom", "room" : , "id" : , + "group" : "", "display" : "", "token" : "", "muted" : , @@ -884,8 +908,8 @@ room-: { /* Plugin information */ -#define JANUS_AUDIOBRIDGE_VERSION 11 -#define JANUS_AUDIOBRIDGE_VERSION_STRING "0.0.11" +#define JANUS_AUDIOBRIDGE_VERSION 12 +#define JANUS_AUDIOBRIDGE_VERSION_STRING "0.0.12" #define JANUS_AUDIOBRIDGE_DESCRIPTION "This is a plugin implementing an audio conference bridge for Janus, mixing Opus streams." #define JANUS_AUDIOBRIDGE_NAME "JANUS AudioBridge plugin" #define JANUS_AUDIOBRIDGE_AUTHOR "Meetecho s.r.l." @@ -894,6 +918,8 @@ room-: { #define MIN_SEQUENTIAL 2 #define MAX_MISORDER 50 +#define JANUS_AUDIOBRIDGE_MAX_GROUPS 5 + /* Plugin methods */ janus_plugin *create(void); int janus_audiobridge_init(janus_callbacks *callback, const char *config_path); @@ -977,6 +1003,9 @@ static struct janus_json_parameter idstr_parameters[] = { static struct janus_json_parameter idstropt_parameters[] = { {"id", JSON_STRING, 0} }; +static struct janus_json_parameter group_parameters[] = { + {"group", JSON_STRING, JANUS_JSON_PARAM_REQUIRED} +}; static struct janus_json_parameter create_parameters[] = { {"description", JSON_STRING, 0}, {"secret", JSON_STRING, 0}, @@ -992,7 +1021,8 @@ static struct janus_json_parameter create_parameters[] = { {"audiolevel_event", JANUS_JSON_BOOL, 0}, {"audio_active_packets", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"audio_level_average", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, - {"default_prebuffering", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE} + {"default_prebuffering", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, + {"groups", JSON_ARRAY, 0} }; static struct janus_json_parameter edit_parameters[] = { {"secret", JSON_STRING, 0}, @@ -1016,6 +1046,7 @@ static struct janus_json_parameter secret_parameters[] = { static struct janus_json_parameter join_parameters[] = { {"display", JSON_STRING, 0}, {"token", JSON_STRING, 0}, + {"group", JSON_STRING, 0}, {"muted", JANUS_JSON_BOOL, 0}, {"codec", JSON_STRING, 0}, {"prebuffer", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, @@ -1031,6 +1062,7 @@ static struct janus_json_parameter configure_parameters[] = { {"prebuffer", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"quality", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"volume", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, + {"group", JSON_STRING, 0}, {"record", JANUS_JSON_BOOL, 0}, {"filename", JSON_STRING, 0}, {"display", JSON_STRING, 0}, @@ -1038,6 +1070,7 @@ static struct janus_json_parameter configure_parameters[] = { {"update", JANUS_JSON_BOOL, 0} }; static struct janus_json_parameter rtp_forward_parameters[] = { + {"group", JSON_STRING, 0}, {"ssrc", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"codec", JSON_STRING, 0}, {"ptype", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, @@ -1054,6 +1087,7 @@ static struct janus_json_parameter stop_rtp_forward_parameters[] = { static struct janus_json_parameter play_file_parameters[] = { {"filename", JSON_STRING, JANUS_JSON_PARAM_REQUIRED}, {"file_id", JSON_STRING, 0}, + {"group", JSON_STRING, 0}, {"loop", JANUS_JSON_BOOL, 0} }; static struct janus_json_parameter checkstop_file_parameters[] = { @@ -1119,6 +1153,7 @@ typedef struct janus_audiobridge_room { volatile gint destroyed; /* Whether this room has been destroyed */ janus_mutex mutex; /* Mutex to lock this room instance */ /* RTP forwarders for this room's mix */ + GHashTable *groups; /* Forwarding groups supported in this room, indexed by name */ GHashTable *rtp_forwarders; /* RTP forwarders list (as a hashmap) */ OpusEncoder *rtp_encoder; /* Opus encoder instance to use for all RTP forwarders */ janus_mutex rtp_mutex; /* Mutex to lock the RTP forwarders list */ @@ -1340,6 +1375,7 @@ typedef struct janus_audiobridge_participant { #ifdef HAVE_LIBOGG janus_audiobridge_file *annc; /* In case this is a fake participant, a playable file */ #endif + uint group; /* Forwarding group index, if enabled in the room */ janus_mutex rec_mutex; /* Mutex to protect the recorder from race conditions */ volatile gint destroyed; /* Whether this room has been destroyed */ janus_refcount ref; /* Reference counter for this participant */ @@ -1443,6 +1479,8 @@ static void janus_audiobridge_room_free(const janus_refcount *audiobridge_ref) { if(audiobridge->rtp_encoder) opus_encoder_destroy(audiobridge->rtp_encoder); g_hash_table_destroy(audiobridge->rtp_forwarders); + if(audiobridge->groups) + g_hash_table_destroy(audiobridge->groups); g_free(audiobridge); } @@ -1477,6 +1515,7 @@ typedef struct janus_audiobridge_rtp_forwarder { int payload_type; uint16_t seq_number; uint32_t timestamp; + uint group; gboolean always_on; /* Only needed for SRTP forwarders */ gboolean is_srtp; @@ -1500,7 +1539,7 @@ static void janus_audiobridge_rtp_forwarder_free(const janus_refcount *f_ref) { g_free(rf); } static guint32 janus_audiobridge_rtp_forwarder_add_helper(janus_audiobridge_room *room, - const gchar *host, uint16_t port, uint32_t ssrc, int pt, + uint group, const gchar *host, uint16_t port, uint32_t ssrc, int pt, janus_audiocodec codec, int srtp_suite, const char *srtp_crypto, gboolean always_on, guint32 stream_id) { if(room == NULL || host == NULL) @@ -1560,6 +1599,7 @@ static guint32 janus_audiobridge_rtp_forwarder_add_helper(janus_audiobridge_room rf->payload_type = 0; rf->seq_number = 0; rf->timestamp = 0; + rf->group = group; rf->always_on = always_on; janus_mutex_lock(&room->rtp_mutex); @@ -1822,6 +1862,7 @@ static int janus_audiobridge_resample(int16_t *input, int input_num, int input_r #define JANUS_AUDIOBRIDGE_ERROR_ALREADY_JOINED 491 #define JANUS_AUDIOBRIDGE_ERROR_NO_SUCH_USER 492 #define JANUS_AUDIOBRIDGE_ERROR_INVALID_SDP 493 +#define JANUS_AUDIOBRIDGE_ERROR_NO_SUCH_GROUP 494 static int janus_audiobridge_create_udp_socket_if_needed(janus_audiobridge_room *audiobridge) { if(audiobridge->rtp_udp_sock > 0) { @@ -1908,6 +1949,19 @@ static int janus_audiobridge_create_static_rtp_forwarder(janus_config_category * } } + /* If this room uses groups, check if a valid group name was provided */ + uint group = 0; + if(audiobridge->groups != NULL) { + janus_config_item *group_name = janus_config_get(config, cat, janus_config_type_item, "rtp_forward_group"); + if(group_name != NULL && group_name->value != NULL) { + group = GPOINTER_TO_UINT(g_hash_table_lookup(audiobridge->groups, group_name->value)); + if(group == 0) { + JANUS_LOG(LOG_ERR, "Invalid group name (%s)\n", group_name->value); + return 0; + } + } + } + janus_config_item *port_item = janus_config_get(config, cat, janus_config_type_item, "rtp_forward_port"); uint16_t port = 0; if(port_item != NULL && port_item->value != NULL && janus_string_to_uint16(port_item->value, &port) < 0) { @@ -2005,7 +2059,7 @@ static int janus_audiobridge_create_static_rtp_forwarder(janus_config_category * return -1; } - janus_audiobridge_rtp_forwarder_add_helper(audiobridge, + janus_audiobridge_rtp_forwarder_add_helper(audiobridge, group, host, port, ssrc_value, ptype, codec, srtp_suite, srtp_crypto, always_on, forwarder_id); @@ -2097,6 +2151,7 @@ int janus_audiobridge_init(janus_callbacks *callback, const char *config_path) { janus_config_item *default_prebuffering = janus_config_get(config, cat, janus_config_type_item, "default_prebuffering"); janus_config_item *secret = janus_config_get(config, cat, janus_config_type_item, "secret"); janus_config_item *pin = janus_config_get(config, cat, janus_config_type_item, "pin"); + janus_config_array *groups = janus_config_get(config, cat, janus_config_type_array, "groups"); janus_config_item *record = janus_config_get(config, cat, janus_config_type_item, "record"); janus_config_item *recfile = janus_config_get(config, cat, janus_config_type_item, "record_file"); if(sampling == NULL || sampling->value == NULL) { @@ -2214,6 +2269,38 @@ int janus_audiobridge_init(janus_callbacks *callback, const char *config_path) { (GDestroyNotify)g_free, (GDestroyNotify)janus_audiobridge_participant_unref); audiobridge->check_tokens = FALSE; /* Static rooms can't have an "allowed" list yet, no hooks to the configuration file */ audiobridge->allowed = g_hash_table_new_full(g_str_hash, g_str_equal, (GDestroyNotify)g_free, NULL); + if(groups != NULL) { + /* Populate the group hashtable, and create the related indexes */ + GList *gl = groups->list; + if(g_list_length(gl) > JANUS_AUDIOBRIDGE_MAX_GROUPS) { + JANUS_LOG(LOG_ERR, "Too many groups specified in room %s (max %d allowed)\n", room_num, JANUS_AUDIOBRIDGE_MAX_GROUPS); + janus_refcount_decrease(&audiobridge->ref); + cl = cl->next; + } + int count = 0; + audiobridge->groups = g_hash_table_new_full(g_str_hash, g_str_equal, (GDestroyNotify)g_free, NULL); + while(gl) { + janus_config_item *g = (janus_config_item *)gl->data; + if(g == NULL || g->type != janus_config_type_item || g->name != NULL || g->value == NULL) { + JANUS_LOG(LOG_WARN, " -- Invalid group item (not a string?), skipping in '%s'...\n", cat->name); + gl = gl->next; + continue; + } + const char *name = g->value; + if(g_hash_table_lookup(audiobridge->groups, name)) { + JANUS_LOG(LOG_WARN, "Duplicated group name '%s', skipping\n", name); + } else { + count++; + g_hash_table_insert(audiobridge->groups, g_strdup(name), GUINT_TO_POINTER(count)); + } + gl = gl->next; + } + if(count == 0) { + JANUS_LOG(LOG_WARN, "Empty or invalid groups array provided, groups will be disabled in '%s'...\n", cat->name); + g_hash_table_destroy(audiobridge->groups); + audiobridge->groups = NULL; + } + } g_atomic_int_set(&audiobridge->destroyed, 0); janus_mutex_init(&audiobridge->mutex); audiobridge->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_audiobridge_rtp_forwarder_destroy); @@ -2522,6 +2609,7 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s json_t *audio_active_packets = json_object_get(root, "audio_active_packets"); json_t *audio_level_average = json_object_get(root, "audio_level_average"); json_t *default_prebuffering = json_object_get(root, "default_prebuffering"); + json_t *groups = json_object_get(root, "groups"); json_t *record = json_object_get(root, "record"); json_t *recfile = json_object_get(root, "record_file"); json_t *permanent = json_object_get(root, "permanent"); @@ -2545,6 +2633,32 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s goto prepare_response; } } + if(groups) { + /* Make sure the "groups" array only contains strings */ + gboolean ok = TRUE; + if(json_array_size(groups) > 0) { + if(json_array_size(groups) > JANUS_AUDIOBRIDGE_MAX_GROUPS) { + JANUS_LOG(LOG_ERR, "Too many groups specified (max %d allowed)\n", JANUS_AUDIOBRIDGE_MAX_GROUPS); + error_code = JANUS_AUDIOBRIDGE_ERROR_INVALID_ELEMENT; + g_snprintf(error_cause, 512, "Too many groups specified (max %d allowed)", JANUS_AUDIOBRIDGE_MAX_GROUPS); + goto prepare_response; + } + size_t i = 0; + for(i=0; idestroyed, 0); janus_mutex_init(&audiobridge->mutex); + if(groups != NULL && json_array_size(groups) > 0) { + /* Populate the group hashtable, and create the related indexes */ + audiobridge->groups = g_hash_table_new_full(g_str_hash, g_str_equal, (GDestroyNotify)g_free, NULL); + size_t i = 0; + int count = 0; + for(i=0; igroups, name)) { + JANUS_LOG(LOG_WARN, "Duplicated group name '%s', skipping\n", name); + } else { + count++; + g_hash_table_insert(audiobridge->groups, g_strdup(name), GUINT_TO_POINTER(count)); + } + } + if(count == 0) { + JANUS_LOG(LOG_WARN, "Empty or invalid groups array provided, groups will be disabled\n"); + g_hash_table_destroy(audiobridge->groups); + audiobridge->groups = NULL; + } + } audiobridge->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_audiobridge_rtp_forwarder_destroy); audiobridge->rtp_encoder = NULL; audiobridge->rtp_udp_sock = -1; @@ -2756,6 +2890,17 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s janus_config_add(config, c, janus_config_item_create("audio_level_average", value)); } } + if(audiobridge->groups) { + /* Save array of groups */ + janus_config_array *gl = janus_config_array_create("groups"); + janus_config_add(config, c, gl); + GHashTableIter iter; + gpointer key; + g_hash_table_iter_init(&iter, audiobridge->groups); + while(g_hash_table_iter_next(&iter, &key, NULL)) { + janus_config_add(config, gl, janus_config_item_create(NULL, (char *)key)); + } + } if(audiobridge->record_file) { janus_config_add(config, c, janus_config_item_create("record", "yes")); janus_config_add(config, c, janus_config_item_create("record_file", audiobridge->record_file)); @@ -2898,6 +3043,17 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s janus_config_add(config, c, janus_config_item_create("audio_level_average", value)); } } + if(audiobridge->groups) { + /* Save array of groups */ + janus_config_array *gl = janus_config_array_create("groups"); + janus_config_add(config, c, gl); + GHashTableIter iter; + gpointer key; + g_hash_table_iter_init(&iter, audiobridge->groups); + while(g_hash_table_iter_next(&iter, &key, NULL)) { + janus_config_add(config, gl, janus_config_item_create(NULL, (char *)key)); + } + } if(audiobridge->record_file) { janus_config_add(config, c, janus_config_item_create("record", "yes")); janus_config_add(config, c, janus_config_item_create("record_file", audiobridge->record_file)); @@ -3864,6 +4020,21 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s g_snprintf(error_cause, 512, "No such room (%s)", room_id_str); goto prepare_response; } + /* If this room uses groups, check if a valid group name was provided */ + uint group = 0; + if(audiobridge->groups != NULL) { + const char *group_name = json_string_value(json_object_get(root, "group")); + if(group_name != NULL) { + group = GPOINTER_TO_UINT(g_hash_table_lookup(audiobridge->groups, group_name)); + if(group == 0) { + janus_mutex_unlock(&audiobridge->mutex); + janus_mutex_unlock(&rooms_mutex); + error_code = JANUS_AUDIOBRIDGE_ERROR_NO_SUCH_GROUP; + g_snprintf(error_cause, 512, "No such group"); + goto prepare_response; + } + } + } if(janus_audiobridge_create_udp_socket_if_needed(audiobridge)) { janus_mutex_unlock(&audiobridge->mutex); @@ -3881,7 +4052,7 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s goto prepare_response; } - guint32 stream_id = janus_audiobridge_rtp_forwarder_add_helper(audiobridge, + guint32 stream_id = janus_audiobridge_rtp_forwarder_add_helper(audiobridge, group, host, port, ssrc_value, ptype, codec, srtp_suite, srtp_crypto, always_on, 0); janus_mutex_unlock(&audiobridge->mutex); janus_mutex_unlock(&rooms_mutex); @@ -4127,6 +4298,28 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s g_snprintf(error_cause, 512, "No such room (%s)", room_id_str); goto prepare_response; } + /* If this room uses groups, make sure a valid group name was provided */ + uint group = 0; + if(audiobridge->groups != NULL) { + JANUS_VALIDATE_JSON_OBJECT(root, group_parameters, + error_code, error_cause, TRUE, + JANUS_AUDIOBRIDGE_ERROR_MISSING_ELEMENT, JANUS_AUDIOBRIDGE_ERROR_INVALID_ELEMENT); + if(error_code != 0) { + janus_mutex_unlock(&audiobridge->mutex); + janus_mutex_unlock(&rooms_mutex); + goto prepare_response; + } + const char *group_name = json_string_value(json_object_get(root, "group")); + group = GPOINTER_TO_UINT(g_hash_table_lookup(audiobridge->groups, group_name)); + if(group == 0) { + janus_mutex_unlock(&audiobridge->mutex); + janus_mutex_unlock(&rooms_mutex); + JANUS_LOG(LOG_ERR, "No such group (%s)\n", group_name); + error_code = JANUS_AUDIOBRIDGE_ERROR_NO_SUCH_GROUP; + g_snprintf(error_cause, 512, "No such group (%s)", group_name); + goto prepare_response; + } + } /* Check if an announcement ID has been provided, or generate a random one */ json_t *id = json_object_get(root, "file_id"); char *file_id = (char *)json_string_value(id); @@ -4162,6 +4355,7 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s p->annc = g_malloc0(sizeof(janus_audiobridge_file)); p->annc->id = g_strdup(file_id); p->room = audiobridge; + p->group = group; const char *filename = json_string_value(json_object_get(root, "filename")); p->annc->filename = g_strdup(filename); p->annc->file = fopen(filename, "rb"); @@ -4175,7 +4369,6 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s error_code = JANUS_AUDIOBRIDGE_ERROR_UNKNOWN_ERROR; g_snprintf(error_cause, 512, "Error opening file"); goto prepare_response; - } p->annc->loop = json_is_true(json_object_get(root, "loop")); /* Setup the opus decoder */ @@ -5238,6 +5431,28 @@ static void *janus_audiobridge_handler(void *data) { } admin = TRUE; } + /* If this room uses groups, make sure a valid group name was provided */ + uint group = 0; + if(audiobridge->groups != NULL) { + JANUS_VALIDATE_JSON_OBJECT(root, group_parameters, + error_code, error_cause, TRUE, + JANUS_AUDIOBRIDGE_ERROR_MISSING_ELEMENT, JANUS_AUDIOBRIDGE_ERROR_INVALID_ELEMENT); + if(error_code != 0) { + janus_mutex_unlock(&audiobridge->mutex); + janus_refcount_decrease(&audiobridge->ref); + goto error; + } + const char *group_name = json_string_value(json_object_get(root, "group")); + group = GPOINTER_TO_UINT(g_hash_table_lookup(audiobridge->groups, group_name)); + if(group == 0) { + janus_mutex_unlock(&audiobridge->mutex); + janus_refcount_decrease(&audiobridge->ref); + JANUS_LOG(LOG_ERR, "No such group (%s)\n", group_name); + error_code = JANUS_AUDIOBRIDGE_ERROR_NO_SUCH_GROUP; + g_snprintf(error_cause, 512, "No such group (%s)", group_name); + goto error; + } + } json_t *display = json_object_get(root, "display"); const char *display_text = display ? json_string_value(display) : NULL; json_t *muted = json_object_get(root, "muted"); @@ -5352,6 +5567,7 @@ static void *janus_audiobridge_handler(void *data) { participant->room = audiobridge; participant->user_id = user_id; participant->user_id_str = user_id_str ? g_strdup(user_id_str) : NULL; + participant->group = group; g_free(participant->display); participant->admin = admin; participant->display = display_text ? g_strdup(display_text) : NULL; @@ -5557,6 +5773,7 @@ static void *janus_audiobridge_handler(void *data) { json_t *record = json_object_get(root, "record"); json_t *recfile = json_object_get(root, "filename"); json_t *display = json_object_get(root, "display"); + json_t *group = json_object_get(root, "grpup"); json_t *gen_offer = json_object_get(root, "generate_offer"); json_t *update = json_object_get(root, "update"); if(prebuffer) { @@ -5599,6 +5816,17 @@ static void *janus_audiobridge_handler(void *data) { if(participant->encoder) opus_encoder_ctl(participant->encoder, OPUS_SET_COMPLEXITY(participant->opus_complexity)); } + if(group && participant->room && participant->room->groups != NULL) { + const char *group_name = json_string_value(group); + uint group_id = GPOINTER_TO_UINT(g_hash_table_lookup(participant->room->groups, group_name)); + if(group_id == 0) { + JANUS_LOG(LOG_ERR, "No such group (%s)\n", group_name); + error_code = JANUS_AUDIOBRIDGE_ERROR_NO_SUCH_GROUP; + g_snprintf(error_cause, 512, "No such group (%s)", group_name); + goto error; + } + participant->group = group_id; + } if(muted || display) { if(muted) { participant->muted = json_is_true(muted); @@ -5832,6 +6060,28 @@ static void *janus_audiobridge_handler(void *data) { } admin = TRUE; } + /* If this room uses groups, make sure a valid group name was provided */ + uint group = 0; + if(audiobridge->groups != NULL) { + JANUS_VALIDATE_JSON_OBJECT(root, group_parameters, + error_code, error_cause, TRUE, + JANUS_AUDIOBRIDGE_ERROR_MISSING_ELEMENT, JANUS_AUDIOBRIDGE_ERROR_INVALID_ELEMENT); + if(error_code != 0) { + janus_mutex_unlock(&audiobridge->mutex); + janus_refcount_decrease(&audiobridge->ref); + goto error; + } + const char *group_name = json_string_value(json_object_get(root, "group")); + group = GPOINTER_TO_UINT(g_hash_table_lookup(audiobridge->groups, group_name)); + if(group == 0) { + janus_mutex_unlock(&audiobridge->mutex); + janus_refcount_decrease(&audiobridge->ref); + JANUS_LOG(LOG_ERR, "No such group (%s)\n", group_name); + error_code = JANUS_AUDIOBRIDGE_ERROR_NO_SUCH_GROUP; + g_snprintf(error_cause, 512, "No such group (%s)", group_name); + goto error; + } + } json_t *display = json_object_get(root, "display"); const char *display_text = display ? json_string_value(display) : NULL; json_t *muted = json_object_get(root, "muted"); @@ -6026,6 +6276,7 @@ static void *janus_audiobridge_handler(void *data) { participant->user_id = user_id; g_free(participant->user_id_str); participant->user_id_str = user_id_str ? g_strdup(user_id_str) : NULL; + participant->group = group; participant->admin = admin; g_free(participant->display); participant->display = display_text ? g_strdup(display_text) : NULL; @@ -6483,12 +6734,48 @@ static void *janus_audiobridge_mixer_thread(void *data) { memset(outBuffer, 0, OPUS_SAMPLES*2); memset(resampled, 0, OPUS_SAMPLES*2); - /* Base RTP packet, in case there are forwarders involved */ - unsigned char *rtpbuffer = g_malloc0(1500); - janus_rtp_header *rtph = (janus_rtp_header *)rtpbuffer; - rtph->version = 2; + /* In case forwarding groups are enabled, we need additional buffers */ + uint groups_num = audiobridge->groups ? g_hash_table_size(audiobridge->groups) : 0, index = 0; + opus_int32 *groupBuffers = NULL; + uint32_t groupBufferSize = 0, groupBuffersSize = 0; + OpusEncoder **groupEncoders = NULL; + if(groups_num > 0) { + /* Create buffers */ + groupBufferSize = OPUS_SAMPLES * sizeof(opus_int32); + groupBuffersSize = groups_num * groupBufferSize; + groupBuffers = g_malloc(groupBuffersSize); + groupEncoders = g_malloc(groups_num * sizeof(OpusEncoder *)); + /* Create separate encoders */ + for(index=0; indexsampling_rate, 1, OPUS_APPLICATION_VOIP, &error); + if(audiobridge->sampling_rate == 8000) { + opus_encoder_ctl(rtp_encoder, OPUS_SET_MAX_BANDWIDTH(OPUS_BANDWIDTH_NARROWBAND)); + } else if(audiobridge->sampling_rate == 12000) { + opus_encoder_ctl(rtp_encoder, OPUS_SET_MAX_BANDWIDTH(OPUS_BANDWIDTH_MEDIUMBAND)); + } else if(audiobridge->sampling_rate == 16000) { + opus_encoder_ctl(rtp_encoder, OPUS_SET_MAX_BANDWIDTH(OPUS_BANDWIDTH_WIDEBAND)); + } else if(audiobridge->sampling_rate == 24000) { + opus_encoder_ctl(rtp_encoder, OPUS_SET_MAX_BANDWIDTH(OPUS_BANDWIDTH_SUPERWIDEBAND)); + } else if(audiobridge->sampling_rate == 48000) { + opus_encoder_ctl(rtp_encoder, OPUS_SET_MAX_BANDWIDTH(OPUS_BANDWIDTH_FULLBAND)); + } else { + JANUS_LOG(LOG_WARN, "Unsupported sampling rate %d, setting 16kHz\n", audiobridge->sampling_rate); + opus_encoder_ctl(rtp_encoder, OPUS_SET_MAX_BANDWIDTH(OPUS_BANDWIDTH_WIDEBAND)); + } + groupEncoders[index] = rtp_encoder; + } + } + + /* Base RTP packets, in case there are forwarders involved */ + gboolean have_opus[JANUS_AUDIOBRIDGE_MAX_GROUPS+1], + have_alaw[JANUS_AUDIOBRIDGE_MAX_GROUPS+1], + have_ulaw[JANUS_AUDIOBRIDGE_MAX_GROUPS+1]; + unsigned char *rtpbuffer = g_malloc0(1500 * (groups_num+1)); + janus_rtp_header *rtph = NULL; /* In case we need G.711 forwarders */ - uint8_t rtpalaw[12+G711_SAMPLES], rtpulaw[12+G711_SAMPLES]; + uint8_t *rtpalaw = g_malloc0((12+G711_SAMPLES) * (groups_num+1)), + *rtpulaw = g_malloc0((12+G711_SAMPLES) * (groups_num+1)); /* Timer */ struct timeval now, before; @@ -6567,6 +6854,8 @@ static void *janus_audiobridge_mixer_thread(void *data) { janus_mutex_unlock_nodebug(&audiobridge->mutex); for(i=0; i 0) + memset(groupBuffers, 0, groupBuffersSize); ps = participants_list; while(ps) { janus_audiobridge_participant *p = (janus_audiobridge_participant *)ps->data; @@ -6591,11 +6880,24 @@ static void *janus_audiobridge_mixer_thread(void *data) { memcpy((opus_int16 *)pkt->data, resampled, pkt->length); } curBuffer = (opus_int16 *)pkt->data; - for(i=0; ivolume_gain == 100) { - buffer[i] += curBuffer[i]; - } else { - buffer[i] += (curBuffer[i]*p->volume_gain)/100; + if(groups_num == 0) { + /* Add to the main mix */ + for(i=0; ivolume_gain == 100) { + buffer[i] += curBuffer[i]; + } else { + buffer[i] += (curBuffer[i]*p->volume_gain)/100; + } + } + } else { + /* Add to the group submix */ + int index = p->group-1; + for(i=0; ivolume_gain == 100) { + *(groupBuffers + index*samples + i) += curBuffer[i]; + } else { + *(groupBuffers + index*samples + i) += (curBuffer[i]*p->volume_gain)/100; + } } } } @@ -6665,11 +6967,24 @@ static void *janus_audiobridge_mixer_thread(void *data) { gateway->notify_event(&janus_audiobridge_plugin, NULL, info); } } - for(i=0; ivolume_gain == 100) { - buffer[i] += resampled[i]; - } else { - buffer[i] += (resampled[i]*p->volume_gain)/100; + if(groups_num == 0) { + /* Add to the main mix */ + for(i=0; ivolume_gain == 100) { + buffer[i] += resampled[i]; + } else { + buffer[i] += (resampled[i]*p->volume_gain)/100; + } + } + } else { + /* Add to the group submix */ + index = p->group-1; + for(i=0; ivolume_gain == 100) { + *(groupBuffers + index*samples + i) += resampled[i]; + } else { + *(groupBuffers + index*samples + i) += (resampled[i]*p->volume_gain)/100; + } } } ps = ps->next; @@ -6677,6 +6992,14 @@ static void *janus_audiobridge_mixer_thread(void *data) { g_list_free_full(anncs_list, (GDestroyNotify)janus_audiobridge_participant_unref); } #endif + /* If groups are in use, put them together in the main mix */ + if(groups_num > 0) { + /* Mix all submixes */ + for(index=0; indexrecording != NULL && g_list_length(participants_list) > 0) { for(i=0; iinbuf = g_list_delete_link(p->inbuf, first); } janus_mutex_unlock(&p->qmutex); + /* Remove the participant's own contribution */ curBuffer = (opus_int16 *)((pkt && pkt->length && !pkt->silence) ? pkt->data : NULL); for(i=0; ivolume_gain == 100) @@ -6782,54 +7106,87 @@ static void *janus_audiobridge_mixer_thread(void *data) { go_on = TRUE; } if(go_on) { - /* Send the mixed frame to everybody */ - for(i=0; irtp_forwarders); opus_int32 length = 0; - gboolean have_opus = FALSE, have_alaw = FALSE, have_ulaw = FALSE; while(audiobridge->rtp_udp_sock > 0 && g_hash_table_iter_next(&iter, &key, &value)) { guint32 stream_id = GPOINTER_TO_UINT(key); janus_audiobridge_rtp_forwarder *forwarder = (janus_audiobridge_rtp_forwarder *)value; if(count == 0 && pf_count == 0 && !forwarder->always_on) continue; - if(forwarder->codec == JANUS_AUDIOCODEC_OPUS && !have_opus) { - /* This is an Opus forwarder and we don't have a version for that yet */ - length = opus_encode(audiobridge->rtp_encoder, outBuffer, samples, rtpbuffer+12, 1500-12); - if(length < 0) { - JANUS_LOG(LOG_ERR, "[Opus] Ops! got an error encoding the Opus frame: %d (%s)\n", length, opus_strerror(length)); - continue; + /* Check if we're forwarding the main mix or a specific group */ + if(groups_num > 0) { + if(forwarder->group == 0) { + /* We're forwarding the main mix */ + for(i=0; igroup-1; + for(i=0; icodec == JANUS_AUDIOCODEC_PCMA && !have_alaw) || - (forwarder->codec == JANUS_AUDIOCODEC_PCMU && !have_ulaw)) { - /* This is a G.711 forwarder and we don't have a version for that yet */ - if(audiobridge->sampling_rate != 8000) { - /* Downsample this from whatever the mixer uses */ - i = janus_audiobridge_resample(outBuffer, samples, audiobridge->sampling_rate, resampled, 8000); - if(i == 0) { - JANUS_LOG(LOG_WARN, "[G.711] Error downsampling from %d, skipping audio packet\n", audiobridge->sampling_rate); + } + if(forwarder->codec == JANUS_AUDIOCODEC_OPUS) { + /* This is an Opus forwarder, check if we have a version for that already */ + if(!have_opus[forwarder->group]) { + /* We don't, encode now */ + OpusEncoder *rtp_encoder = (forwarder->group == 0 ? audiobridge->rtp_encoder : groupEncoders[forwarder->group]); + length = opus_encode(rtp_encoder, outBuffer, samples, rtpbuffer + forwarder->group*1500 + 12, 1500-12); + if(length < 0) { + JANUS_LOG(LOG_ERR, "[Opus] Ops! got an error encoding the Opus frame: %d (%s)\n", length, opus_strerror(length)); continue; } - } else { - /* Just copy */ - memcpy(resampled, outBuffer, samples*2); + have_opus[forwarder->group] = TRUE; } - int i = 0; - if(forwarder->codec == JANUS_AUDIOCODEC_PCMA) { - for(i=0; i<160; i++) - rtpalaw[12+i] = janus_audiobridge_g711_alaw_encode(resampled[i]); - have_alaw = TRUE; - rtph = (janus_rtp_header *)rtpalaw; - } else { - for(i=0; i<160; i++) - rtpulaw[12+i] = janus_audiobridge_g711_ulaw_encode(resampled[i]); - have_ulaw = TRUE; - rtph = (janus_rtp_header *)rtpulaw; + rtph = (janus_rtp_header *)(rtpbuffer + forwarder->group*1500); + rtph->version = 2; + } else if(forwarder->codec == JANUS_AUDIOCODEC_PCMA || forwarder->codec == JANUS_AUDIOCODEC_PCMU) { + /* This is a G.711 forwarder, check if we have a version for that already */ + if((forwarder->codec == JANUS_AUDIOCODEC_PCMA && !have_alaw[forwarder->group]) || + (forwarder->codec == JANUS_AUDIOCODEC_PCMU && !have_ulaw[forwarder->group])) { + /* We don't, encode now */ + if(audiobridge->sampling_rate != 8000) { + /* Downsample this from whatever the mixer uses */ + i = janus_audiobridge_resample(outBuffer, samples, audiobridge->sampling_rate, resampled, 8000); + if(i == 0) { + JANUS_LOG(LOG_WARN, "[G.711] Error downsampling from %d, skipping audio packet\n", audiobridge->sampling_rate); + continue; + } + } else { + /* Just copy */ + memcpy(resampled, outBuffer, samples*2); + } + int i = 0; + if(forwarder->codec == JANUS_AUDIOCODEC_PCMA) { + uint8_t *rtpalaw_buffer = rtpalaw + forwarder->group*G711_SAMPLES + 12; + for(i=0; i<160; i++) + rtpalaw_buffer[i] = janus_audiobridge_g711_alaw_encode(resampled[i]); + have_alaw[forwarder->group] = TRUE; + } else { + uint8_t *rtpulaw_buffer = rtpulaw + forwarder->group*G711_SAMPLES + 12; + for(i=0; i<160; i++) + rtpulaw_buffer[i] = janus_audiobridge_g711_ulaw_encode(resampled[i]); + have_ulaw[forwarder->group] = TRUE; + } } + rtph = (janus_rtp_header *)(forwarder->codec == JANUS_AUDIOCODEC_PCMA ? + (rtpalaw + forwarder->group*G711_SAMPLES + 12) : (rtpulaw + forwarder->group*G711_SAMPLES + 12)); rtph->version = 2; length = 160; } @@ -6916,6 +7273,16 @@ static void *janus_audiobridge_mixer_thread(void *data) { } } g_free(rtpbuffer); + g_free(rtpalaw); + g_free(rtpulaw); + g_free(groupBuffers); + if(groupEncoders) { + for(index=0; indexroom_id_str, audiobridge->room_name); janus_refcount_decrease(&audiobridge->ref); From ee0b18d4d5c2040a8004a48e260798a1db6b21dd Mon Sep 17 00:00:00 2001 From: Alessandro Toppi Date: Wed, 5 May 2021 17:01:20 +0200 Subject: [PATCH 2/5] Fix typo. --- plugins/janus_audiobridge.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/janus_audiobridge.c b/plugins/janus_audiobridge.c index 85a8aa707b..bca82e2777 100644 --- a/plugins/janus_audiobridge.c +++ b/plugins/janus_audiobridge.c @@ -6026,7 +6026,7 @@ static void *janus_audiobridge_handler(void *data) { json_t *record = json_object_get(root, "record"); json_t *recfile = json_object_get(root, "filename"); json_t *display = json_object_get(root, "display"); - json_t *group = json_object_get(root, "grpup"); + json_t *group = json_object_get(root, "group"); json_t *gen_offer = json_object_get(root, "generate_offer"); json_t *update = json_object_get(root, "update"); if(prebuffer) { From 336fcacdb88812ab777b73b75e98c72870a09c6b Mon Sep 17 00:00:00 2001 From: Lorenzo Miniero Date: Thu, 6 May 2021 17:26:11 +0200 Subject: [PATCH 3/5] Fixed wrong index for group encoder --- plugins/janus_audiobridge.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/janus_audiobridge.c b/plugins/janus_audiobridge.c index bca82e2777..93e9aafaf0 100644 --- a/plugins/janus_audiobridge.c +++ b/plugins/janus_audiobridge.c @@ -7404,7 +7404,7 @@ static void *janus_audiobridge_mixer_thread(void *data) { /* This is an Opus forwarder, check if we have a version for that already */ if(!have_opus[forwarder->group]) { /* We don't, encode now */ - OpusEncoder *rtp_encoder = (forwarder->group == 0 ? audiobridge->rtp_encoder : groupEncoders[forwarder->group]); + OpusEncoder *rtp_encoder = (forwarder->group == 0 ? audiobridge->rtp_encoder : groupEncoders[forwarder->group-1]); length = opus_encode(rtp_encoder, outBuffer, samples, rtpbuffer + forwarder->group*1500 + 12, 1500-12); if(length < 0) { JANUS_LOG(LOG_ERR, "[Opus] Ops! got an error encoding the Opus frame: %d (%s)\n", length, opus_strerror(length)); From 1c3e23db2e62e0291f168f49253ab1416533d5d6 Mon Sep 17 00:00:00 2001 From: Lorenzo Miniero Date: Fri, 7 May 2021 11:22:45 +0200 Subject: [PATCH 4/5] Added group to forwarder creation response and listings, if available --- plugins/janus_audiobridge.c | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/plugins/janus_audiobridge.c b/plugins/janus_audiobridge.c index 93e9aafaf0..76dde5dd20 100644 --- a/plugins/janus_audiobridge.c +++ b/plugins/janus_audiobridge.c @@ -430,6 +430,7 @@ room-: { { "audiobridge" : "success", "room" : , + "group" : "", "stream_id" : , "host" : "", "port" :