Skip to content

Commit

Permalink
Refactored RTP forwarder internals as a core feature (#3155)
Browse files Browse the repository at this point in the history
  • Loading branch information
lminiero committed Mar 6, 2023
1 parent ca36ad9 commit c3c7ffb
Show file tree
Hide file tree
Showing 6 changed files with 719 additions and 645 deletions.
2 changes: 2 additions & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ janus_SOURCES = \
rtcp.h \
rtp.c \
rtp.h \
rtpfwd.c \
rtpfwd.h \
rtpsrtp.h \
sctp.c \
sctp.h \
Expand Down
8 changes: 8 additions & 0 deletions src/janus.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "debug.h"
#include "ip-utils.h"
#include "rtcp.h"
#include "rtpfwd.h"
#include "auth.h"
#include "record.h"
#include "events.h"
Expand Down Expand Up @@ -5375,6 +5376,12 @@ gint main(int argc, char *argv[]) {
JANUS_LOG(LOG_WARN, "Data Channels support not compiled\n");
#endif

/* Initialize the RTP forwarders functionality */
if(janus_rtp_forwarders_init() < 0) {
janus_options_destroy();
exit(1);
}

/* Sessions */
sessions = g_hash_table_new_full(g_int64_hash, g_int64_equal, (GDestroyNotify)g_free, NULL);
janus_mutex_init(&sessions_mutex);
Expand Down Expand Up @@ -5923,6 +5930,7 @@ gint main(int argc, char *argv[]) {
JANUS_LOG(LOG_INFO, "De-initializing SCTP...\n");
janus_sctp_deinit();
#endif
janus_rtp_forwarders_deinit();
janus_auth_deinit();

JANUS_LOG(LOG_INFO, "Closing plugins:\n");
Expand Down
228 changes: 60 additions & 168 deletions src/plugins/janus_audiobridge.c
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,7 @@ room-<unique room ID>: {
#include "../rtp.h"
#include "../rtpsrtp.h"
#include "../rtcp.h"
#include "../rtpfwd.h"
#include "../record.h"
#include "../sdp-utils.h"
#include "../utils.h"
Expand Down Expand Up @@ -1746,123 +1747,40 @@ static void janus_audiobridge_message_free(janus_audiobridge_message *msg) {
static void janus_audiobridge_recorder_create(janus_audiobridge_participant *participant);
static void janus_audiobridge_recorder_close(janus_audiobridge_participant *participant);

/* RTP forwarder instance: address to send to, and current RTP header info */
typedef struct janus_audiobridge_rtp_forwarder {
struct sockaddr_in serv_addr;
struct sockaddr_in6 serv_addr6;
uint32_t ssrc;
/* RTP forwarder metadata */
typedef struct janus_audiobridge_rtp_forwarder_metadata {
janus_audiocodec codec;
int payload_type;
uint16_t seq_number;
uint32_t timestamp;
uint16_t seq_number;
uint group;
gboolean always_on;
/* Only needed for SRTP forwarders */
gboolean is_srtp;
srtp_t srtp_ctx;
srtp_policy_t srtp_policy;
/* Reference */
volatile gint destroyed;
janus_refcount ref;
} janus_audiobridge_rtp_forwarder;
static void janus_audiobridge_rtp_forwarder_destroy(janus_audiobridge_rtp_forwarder *rf) {
if(rf && g_atomic_int_compare_and_exchange(&rf->destroyed, 0, 1)) {
janus_refcount_decrease(&rf->ref);
}
}
static void janus_audiobridge_rtp_forwarder_free(const janus_refcount *f_ref) {
janus_audiobridge_rtp_forwarder *rf = janus_refcount_containerof(f_ref, janus_audiobridge_rtp_forwarder, ref);
if(rf->is_srtp) {
srtp_dealloc(rf->srtp_ctx);
g_free(rf->srtp_policy.key);
}
g_free(rf);
}
} janus_audiobridge_rtp_forwarder_metadata;
/* Helper to create a new RTP forwarder with the right metadata */
static guint32 janus_audiobridge_rtp_forwarder_add_helper(janus_audiobridge_room *room,
uint group, const gchar *host, uint16_t port, uint32_t ssrc, int pt,
janus_audiocodec codec, int srtp_suite, const char *srtp_crypto,
gboolean always_on, guint32 stream_id) {
if(room == NULL || host == NULL)
return 0;
janus_audiobridge_rtp_forwarder *rf = g_malloc0(sizeof(janus_audiobridge_rtp_forwarder));
/* First of all, let's check if we need to setup an SRTP forwarder */
if(srtp_suite > 0 && srtp_crypto != NULL) {
/* Base64 decode the crypto string and set it as the SRTP context */
gsize len = 0;
guchar *decoded = g_base64_decode(srtp_crypto, &len);
if(len < SRTP_MASTER_LENGTH) {
JANUS_LOG(LOG_ERR, "Invalid SRTP crypto (%s)\n", srtp_crypto);
g_free(decoded);
g_free(rf);
return 0;
}
/* Set SRTP policy */
srtp_policy_t *policy = &rf->srtp_policy;
srtp_crypto_policy_set_rtp_default(&(policy->rtp));
if(srtp_suite == 32) {
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_32(&(policy->rtp));
} else if(srtp_suite == 80) {
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&(policy->rtp));
}
policy->ssrc.type = ssrc_any_outbound;
policy->key = decoded;
policy->next = NULL;
/* Create SRTP context */
srtp_err_status_t res = srtp_create(&rf->srtp_ctx, policy);
if(res != srtp_err_status_ok) {
/* Something went wrong... */
JANUS_LOG(LOG_ERR, "Error creating forwarder SRTP session: %d (%s)\n", res, janus_srtp_error_str(res));
g_free(decoded);
policy->key = NULL;
g_free(rf);
return 0;
}
rf->is_srtp = TRUE;
}
/* Check if the host address is IPv4 or IPv6 */
if(strstr(host, ":") != NULL) {
rf->serv_addr6.sin6_family = AF_INET6;
inet_pton(AF_INET6, host, &(rf->serv_addr6.sin6_addr));
rf->serv_addr6.sin6_port = htons(port);
} else {
rf->serv_addr.sin_family = AF_INET;
inet_pton(AF_INET, host, &(rf->serv_addr.sin_addr));
rf->serv_addr.sin_port = htons(port);
}
/* Setup RTP info (we'll use the stream ID as SSRC) */
rf->codec = codec;
rf->ssrc = ssrc;
rf->payload_type = pt;
if(codec == JANUS_AUDIOCODEC_PCMA)
rf->payload_type = 8;
else if(codec == JANUS_AUDIOCODEC_PCMU)
rf->payload_type = 0;
rf->seq_number = 0;
rf->timestamp = 0;
rf->group = group;
rf->always_on = always_on;

/* Create a new RTP forwarder */
janus_rtp_forwarder *rf = janus_rtp_forwarder_create(JANUS_AUDIOBRIDGE_NAME, stream_id,
room->rtp_udp_sock, host, port, ssrc, pt, srtp_suite, srtp_crypto, FALSE, 0, FALSE, FALSE);
if(rf == NULL)
return 0;
/* Fill in some metadata we'll need */
janus_audiobridge_rtp_forwarder_metadata *metadata = g_malloc0(sizeof(janus_audiobridge_rtp_forwarder_metadata));
metadata->codec = codec;
metadata->group = group;
metadata->always_on = always_on;
rf->metadata = metadata;
/* Add the forwarder to the ones we have in the room */
janus_mutex_lock(&room->rtp_mutex);

guint32 actual_stream_id;
if(stream_id > 0) {
actual_stream_id = stream_id;
} else {
actual_stream_id = janus_random_uint32();
}

while(g_hash_table_lookup(room->rtp_forwarders, GUINT_TO_POINTER(actual_stream_id)) != NULL) {
actual_stream_id = janus_random_uint32();
}
janus_refcount_init(&rf->ref, janus_audiobridge_rtp_forwarder_free);
g_hash_table_insert(room->rtp_forwarders, GUINT_TO_POINTER(actual_stream_id), rf);

g_hash_table_insert(room->rtp_forwarders, GUINT_TO_POINTER(rf->stream_id), rf);
janus_mutex_unlock(&room->rtp_mutex);

/* Done */
JANUS_LOG(LOG_VERB, "Added RTP forwarder to room %s: %s:%d (ID: %"SCNu32")\n",
room->room_id_str, host, port, actual_stream_id);

return actual_stream_id;
room->room_id_str, host, port, rf->stream_id);
return rf->stream_id;
}


Expand Down Expand Up @@ -2665,7 +2583,7 @@ int janus_audiobridge_init(janus_callbacks *callback, const char *config_path) {
}
g_atomic_int_set(&audiobridge->destroyed, 0);
janus_mutex_init(&audiobridge->mutex);
audiobridge->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_audiobridge_rtp_forwarder_destroy);
audiobridge->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_rtp_forwarder_destroy);
audiobridge->rtp_encoder = NULL;
audiobridge->rtp_udp_sock = -1;
janus_mutex_init(&audiobridge->rtp_mutex);
Expand Down Expand Up @@ -3316,7 +3234,7 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s
audiobridge->groups_byid = NULL;
}
}
audiobridge->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_audiobridge_rtp_forwarder_destroy);
audiobridge->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_rtp_forwarder_destroy);
audiobridge->rtp_encoder = NULL;
audiobridge->rtp_udp_sock = -1;
janus_mutex_init(&audiobridge->rtp_mutex);
Expand Down Expand Up @@ -4958,11 +4876,12 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s
g_hash_table_iter_init(&iter, audiobridge->rtp_forwarders);
while(g_hash_table_iter_next(&iter, &key, &value)) {
guint32 stream_id = GPOINTER_TO_UINT(key);
janus_audiobridge_rtp_forwarder *rf = (janus_audiobridge_rtp_forwarder *)value;
janus_rtp_forwarder *rf = (janus_rtp_forwarder *)value;
janus_audiobridge_rtp_forwarder_metadata *rfm = (janus_audiobridge_rtp_forwarder_metadata *)rf->metadata;
json_t *fl = json_object();
json_object_set_new(fl, "stream_id", json_integer(stream_id));
if(rf->group > 0 && audiobridge->groups_byid != NULL) {
char *name = g_hash_table_lookup(audiobridge->groups_byid, GUINT_TO_POINTER(rf->group));
if(rfm->group > 0 && audiobridge->groups_byid != NULL) {
char *name = g_hash_table_lookup(audiobridge->groups_byid, GUINT_TO_POINTER(rfm->group));
if(name != NULL)
json_object_set_new(fl, "group", json_string(name));
}
Expand All @@ -4976,11 +4895,11 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s
}
json_object_set_new(fl, "port", json_integer(ntohs(rf->serv_addr.sin_port)));
json_object_set_new(fl, "ssrc", json_integer(rf->ssrc ? rf->ssrc : stream_id));
json_object_set_new(fl, "codec", json_string(janus_audiocodec_name(rf->codec)));
json_object_set_new(fl, "codec", json_string(janus_audiocodec_name(rfm->codec)));
json_object_set_new(fl, "ptype", json_integer(rf->payload_type));
if(rf->is_srtp)
json_object_set_new(fl, "srtp", json_true());
json_object_set_new(fl, "always_on", rf->always_on ? json_true() : json_false());
json_object_set_new(fl, "always_on", rfm->always_on ? json_true() : json_false());
json_array_append_new(list, fl);
}
janus_mutex_unlock(&audiobridge->rtp_mutex);
Expand Down Expand Up @@ -7961,8 +7880,6 @@ static void *janus_audiobridge_mixer_thread(void *data) {
/* RTP */
guint16 seq = 0;
guint32 ts = 0;
/* SRTP buffer, if needed */
char sbuf[1500];

g_atomic_int_set(&audiobridge->wav_header_added, 0);
/* Loop */
Expand Down Expand Up @@ -8378,8 +8295,9 @@ static void *janus_audiobridge_mixer_thread(void *data) {
gpointer value;
g_hash_table_iter_init(&iter, audiobridge->rtp_forwarders);
while(g_hash_table_iter_next(&iter, NULL, &value)) {
janus_audiobridge_rtp_forwarder *forwarder = (janus_audiobridge_rtp_forwarder *)value;
if(forwarder->always_on) {
janus_rtp_forwarder *rf = (janus_rtp_forwarder *)value;
janus_audiobridge_rtp_forwarder_metadata *rfm = (janus_audiobridge_rtp_forwarder_metadata *)rf->metadata;
if(rfm->always_on) {
go_on = TRUE;
break;
}
Expand Down Expand Up @@ -8407,43 +8325,43 @@ static void *janus_audiobridge_mixer_thread(void *data) {
g_hash_table_iter_init(&iter, audiobridge->rtp_forwarders);
opus_int32 length = 0;
while(audiobridge->rtp_udp_sock > 0 && g_hash_table_iter_next(&iter, &key, &value)) {
guint32 stream_id = GPOINTER_TO_UINT(key);
janus_audiobridge_rtp_forwarder *forwarder = (janus_audiobridge_rtp_forwarder *)value;
if(count == 0 && pf_count == 0 && !forwarder->always_on)
janus_rtp_forwarder *rf = (janus_rtp_forwarder *)value;
janus_audiobridge_rtp_forwarder_metadata *rfm = (janus_audiobridge_rtp_forwarder_metadata *)rf->metadata;
if(count == 0 && pf_count == 0 && !rfm->always_on)
continue;
/* Check if we're forwarding the main mix or a specific group */
if(groups_num > 0) {
if(forwarder->group == 0) {
if(rfm->group == 0) {
/* We're forwarding the main mix */
for(i=0; i<samples; i++)
outBuffer[i] = buffer[i];
} else {
/* We're forwarding a group mix */
index = forwarder->group-1;
index = rfm->group-1;
for(i=0; i<samples; i++)
outBuffer[i] = *(groupBuffers + index*samples + i);
}
}
if(forwarder->codec == JANUS_AUDIOCODEC_OPUS) {
if(rfm->codec == JANUS_AUDIOCODEC_OPUS) {
/* This is an Opus forwarder, check if we have a version for that already */
if(!have_opus[forwarder->group]) {
if(!have_opus[rfm->group]) {
/* We don't, encode now */
OpusEncoder *rtp_encoder = (forwarder->group == 0 ? audiobridge->rtp_encoder : groupEncoders[forwarder->group-1]);
OpusEncoder *rtp_encoder = (rfm->group == 0 ? audiobridge->rtp_encoder : groupEncoders[rfm->group-1]);
length = opus_encode(rtp_encoder, outBuffer,
audiobridge->spatial_audio ? samples/2 : samples,
rtpbuffer + forwarder->group*1500 + 12, 1500-12);
rtpbuffer + rfm->group*1500 + 12, 1500-12);
if(length < 0) {
JANUS_LOG(LOG_ERR, "[Opus] Ops! got an error encoding the Opus frame: %d (%s)\n", length, opus_strerror(length));
continue;
}
have_opus[forwarder->group] = TRUE;
have_opus[rfm->group] = TRUE;
}
rtph = (janus_rtp_header *)(rtpbuffer + forwarder->group*1500);
rtph = (janus_rtp_header *)(rtpbuffer + rfm->group*1500);
rtph->version = 2;
} else if(forwarder->codec == JANUS_AUDIOCODEC_PCMA || forwarder->codec == JANUS_AUDIOCODEC_PCMU) {
} else if(rfm->codec == JANUS_AUDIOCODEC_PCMA || rfm->codec == JANUS_AUDIOCODEC_PCMU) {
/* This is a G.711 forwarder, check if we have a version for that already */
if((forwarder->codec == JANUS_AUDIOCODEC_PCMA && !have_alaw[forwarder->group]) ||
(forwarder->codec == JANUS_AUDIOCODEC_PCMU && !have_ulaw[forwarder->group])) {
if((rfm->codec == JANUS_AUDIOCODEC_PCMA && !have_alaw[rfm->group]) ||
(rfm->codec == JANUS_AUDIOCODEC_PCMU && !have_ulaw[rfm->group])) {
/* We don't, encode now */
if(audiobridge->sampling_rate != 8000) {
/* Downsample this from whatever the mixer uses */
Expand All @@ -8457,56 +8375,30 @@ static void *janus_audiobridge_mixer_thread(void *data) {
memcpy(resampled, outBuffer, samples*2);
}
int i = 0;
if(forwarder->codec == JANUS_AUDIOCODEC_PCMA) {
uint8_t *rtpalaw_buffer = rtpalaw + forwarder->group*G711_SAMPLES + 12;
if(rfm->codec == JANUS_AUDIOCODEC_PCMA) {
uint8_t *rtpalaw_buffer = rtpalaw + rfm->group*G711_SAMPLES + 12;
for(i=0; i<160; i++)
rtpalaw_buffer[i] = janus_audiobridge_g711_alaw_encode(resampled[i]);
have_alaw[forwarder->group] = TRUE;
have_alaw[rfm->group] = TRUE;
} else {
uint8_t *rtpulaw_buffer = rtpulaw + forwarder->group*G711_SAMPLES + 12;
uint8_t *rtpulaw_buffer = rtpulaw + rfm->group*G711_SAMPLES + 12;
for(i=0; i<160; i++)
rtpulaw_buffer[i] = janus_audiobridge_g711_ulaw_encode(resampled[i]);
have_ulaw[forwarder->group] = TRUE;
have_ulaw[rfm->group] = TRUE;
}
}
rtph = (janus_rtp_header *)(forwarder->codec == JANUS_AUDIOCODEC_PCMA ?
(rtpalaw + forwarder->group*G711_SAMPLES) : (rtpulaw + forwarder->group*G711_SAMPLES));
rtph = (janus_rtp_header *)(rfm->codec == JANUS_AUDIOCODEC_PCMA ?
(rtpalaw + rfm->group*G711_SAMPLES) : (rtpulaw + rfm->group*G711_SAMPLES));
rtph->version = 2;
length = 160;
}
/* Update header */
rtph->type = forwarder->payload_type;
rtph->ssrc = htonl(forwarder->ssrc ? forwarder->ssrc : stream_id);
forwarder->seq_number++;
rtph->seq_number = htons(forwarder->seq_number);
forwarder->timestamp += (forwarder->codec == JANUS_AUDIOCODEC_OPUS ? OPUS_SAMPLES : G711_SAMPLES);
rtph->timestamp = htonl(forwarder->timestamp);
/* Check if this packet needs to be encrypted */
char *payload = (char *)rtph;
int plen = length+12;
if(forwarder->is_srtp) {
memcpy(sbuf, payload, plen);
int protected = plen;
int res = srtp_protect(forwarder->srtp_ctx, sbuf, &protected);
if(res != srtp_err_status_ok) {
janus_rtp_header *header = (janus_rtp_header *)sbuf;
guint32 timestamp = ntohl(header->timestamp);
guint16 seq = ntohs(header->seq_number);
JANUS_LOG(LOG_ERR, "Error encrypting RTP packet for room %s... %s (len=%d-->%d, ts=%"SCNu32", seq=%"SCNu16")...\n",
audiobridge->room_id_str, janus_srtp_error_str(res), plen, protected, timestamp, seq);
} else {
payload = (char *)&sbuf;
plen = protected;
}
}
/* No encryption, send the RTP packet as it is */
struct sockaddr *address = (forwarder->serv_addr.sin_family == AF_INET ?
(struct sockaddr *)&forwarder->serv_addr : (struct sockaddr *)&forwarder->serv_addr6);
size_t addrlen = (forwarder->serv_addr.sin_family == AF_INET ? sizeof(forwarder->serv_addr) : sizeof(forwarder->serv_addr6));
if(sendto(audiobridge->rtp_udp_sock, payload, plen, 0, address, addrlen) < 0) {
JANUS_LOG(LOG_HUGE, "Error forwarding mixed RTP packet for room %s... %s (len=%d)...\n",
audiobridge->room_id_str, g_strerror(errno), plen);
}
rfm->seq_number++;
rtph->seq_number = htons(rfm->seq_number);
rfm->timestamp += (rfm->codec == JANUS_AUDIOCODEC_OPUS ? OPUS_SAMPLES : G711_SAMPLES);
rtph->timestamp = htonl(rfm->timestamp);
/* Forward the packet */
janus_rtp_forwarder_send_rtp(rf, (char *)rtph, length+12, -1);
}
}
}
Expand Down
Loading

0 comments on commit c3c7ffb

Please sign in to comment.