Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored RTP forwarder internals as a core feature #3155

Merged
merged 2 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ janus_SOURCES = \
rtcp.h \
rtp.c \
rtp.h \
rtpfwd.c \
rtpfwd.h \
rtpsrtp.h \
sctp.c \
sctp.h \
Expand Down
8 changes: 8 additions & 0 deletions src/janus.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "debug.h"
#include "ip-utils.h"
#include "rtcp.h"
#include "rtpfwd.h"
#include "auth.h"
#include "record.h"
#include "events.h"
Expand Down Expand Up @@ -5339,6 +5340,12 @@ gint main(int argc, char *argv[]) {
JANUS_LOG(LOG_WARN, "Data Channels support not compiled\n");
#endif

/* Initialize the RTP forwarders functionality */
if(janus_rtp_forwarders_init() < 0) {
janus_options_destroy();
exit(1);
}

/* Sessions */
sessions = g_hash_table_new_full(g_int64_hash, g_int64_equal, (GDestroyNotify)g_free, NULL);
janus_mutex_init(&sessions_mutex);
Expand Down Expand Up @@ -5887,6 +5894,7 @@ gint main(int argc, char *argv[]) {
JANUS_LOG(LOG_INFO, "De-initializing SCTP...\n");
janus_sctp_deinit();
#endif
janus_rtp_forwarders_deinit();
janus_auth_deinit();

JANUS_LOG(LOG_INFO, "Closing plugins:\n");
Expand Down
228 changes: 60 additions & 168 deletions src/plugins/janus_audiobridge.c
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,7 @@ room-<unique room ID>: {
#include "../rtp.h"
#include "../rtpsrtp.h"
#include "../rtcp.h"
#include "../rtpfwd.h"
#include "../record.h"
#include "../sdp-utils.h"
#include "../utils.h"
Expand Down Expand Up @@ -1746,123 +1747,40 @@ static void janus_audiobridge_message_free(janus_audiobridge_message *msg) {
static void janus_audiobridge_recorder_create(janus_audiobridge_participant *participant);
static void janus_audiobridge_recorder_close(janus_audiobridge_participant *participant);

/* RTP forwarder instance: address to send to, and current RTP header info */
typedef struct janus_audiobridge_rtp_forwarder {
struct sockaddr_in serv_addr;
struct sockaddr_in6 serv_addr6;
uint32_t ssrc;
/* RTP forwarder metadata */
typedef struct janus_audiobridge_rtp_forwarder_metadata {
janus_audiocodec codec;
int payload_type;
uint16_t seq_number;
uint32_t timestamp;
uint16_t seq_number;
uint group;
gboolean always_on;
/* Only needed for SRTP forwarders */
gboolean is_srtp;
srtp_t srtp_ctx;
srtp_policy_t srtp_policy;
/* Reference */
volatile gint destroyed;
janus_refcount ref;
} janus_audiobridge_rtp_forwarder;
static void janus_audiobridge_rtp_forwarder_destroy(janus_audiobridge_rtp_forwarder *rf) {
if(rf && g_atomic_int_compare_and_exchange(&rf->destroyed, 0, 1)) {
janus_refcount_decrease(&rf->ref);
}
}
static void janus_audiobridge_rtp_forwarder_free(const janus_refcount *f_ref) {
janus_audiobridge_rtp_forwarder *rf = janus_refcount_containerof(f_ref, janus_audiobridge_rtp_forwarder, ref);
if(rf->is_srtp) {
srtp_dealloc(rf->srtp_ctx);
g_free(rf->srtp_policy.key);
}
g_free(rf);
}
} janus_audiobridge_rtp_forwarder_metadata;
/* Helper to create a new RTP forwarder with the right metadata */
static guint32 janus_audiobridge_rtp_forwarder_add_helper(janus_audiobridge_room *room,
uint group, const gchar *host, uint16_t port, uint32_t ssrc, int pt,
janus_audiocodec codec, int srtp_suite, const char *srtp_crypto,
gboolean always_on, guint32 stream_id) {
if(room == NULL || host == NULL)
return 0;
janus_audiobridge_rtp_forwarder *rf = g_malloc0(sizeof(janus_audiobridge_rtp_forwarder));
/* First of all, let's check if we need to setup an SRTP forwarder */
if(srtp_suite > 0 && srtp_crypto != NULL) {
/* Base64 decode the crypto string and set it as the SRTP context */
gsize len = 0;
guchar *decoded = g_base64_decode(srtp_crypto, &len);
if(len < SRTP_MASTER_LENGTH) {
JANUS_LOG(LOG_ERR, "Invalid SRTP crypto (%s)\n", srtp_crypto);
g_free(decoded);
g_free(rf);
return 0;
}
/* Set SRTP policy */
srtp_policy_t *policy = &rf->srtp_policy;
srtp_crypto_policy_set_rtp_default(&(policy->rtp));
if(srtp_suite == 32) {
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_32(&(policy->rtp));
} else if(srtp_suite == 80) {
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&(policy->rtp));
}
policy->ssrc.type = ssrc_any_outbound;
policy->key = decoded;
policy->next = NULL;
/* Create SRTP context */
srtp_err_status_t res = srtp_create(&rf->srtp_ctx, policy);
if(res != srtp_err_status_ok) {
/* Something went wrong... */
JANUS_LOG(LOG_ERR, "Error creating forwarder SRTP session: %d (%s)\n", res, janus_srtp_error_str(res));
g_free(decoded);
policy->key = NULL;
g_free(rf);
return 0;
}
rf->is_srtp = TRUE;
}
/* Check if the host address is IPv4 or IPv6 */
if(strstr(host, ":") != NULL) {
rf->serv_addr6.sin6_family = AF_INET6;
inet_pton(AF_INET6, host, &(rf->serv_addr6.sin6_addr));
rf->serv_addr6.sin6_port = htons(port);
} else {
rf->serv_addr.sin_family = AF_INET;
inet_pton(AF_INET, host, &(rf->serv_addr.sin_addr));
rf->serv_addr.sin_port = htons(port);
}
/* Setup RTP info (we'll use the stream ID as SSRC) */
rf->codec = codec;
rf->ssrc = ssrc;
rf->payload_type = pt;
if(codec == JANUS_AUDIOCODEC_PCMA)
rf->payload_type = 8;
else if(codec == JANUS_AUDIOCODEC_PCMU)
rf->payload_type = 0;
rf->seq_number = 0;
rf->timestamp = 0;
rf->group = group;
rf->always_on = always_on;

/* Create a new RTP forwarder */
janus_rtp_forwarder *rf = janus_rtp_forwarder_create(JANUS_AUDIOBRIDGE_NAME, stream_id,
room->rtp_udp_sock, host, port, ssrc, pt, srtp_suite, srtp_crypto, FALSE, 0, FALSE, FALSE);
if(rf == NULL)
return 0;
/* Fill in some metadata we'll need */
janus_audiobridge_rtp_forwarder_metadata *metadata = g_malloc0(sizeof(janus_audiobridge_rtp_forwarder_metadata));
metadata->codec = codec;
metadata->group = group;
metadata->always_on = always_on;
rf->metadata = metadata;
/* Add the forwarder to the ones we have in the room */
janus_mutex_lock(&room->rtp_mutex);

guint32 actual_stream_id;
if(stream_id > 0) {
actual_stream_id = stream_id;
} else {
actual_stream_id = janus_random_uint32();
}

while(g_hash_table_lookup(room->rtp_forwarders, GUINT_TO_POINTER(actual_stream_id)) != NULL) {
actual_stream_id = janus_random_uint32();
}
janus_refcount_init(&rf->ref, janus_audiobridge_rtp_forwarder_free);
g_hash_table_insert(room->rtp_forwarders, GUINT_TO_POINTER(actual_stream_id), rf);

g_hash_table_insert(room->rtp_forwarders, GUINT_TO_POINTER(rf->stream_id), rf);
janus_mutex_unlock(&room->rtp_mutex);

/* Done */
JANUS_LOG(LOG_VERB, "Added RTP forwarder to room %s: %s:%d (ID: %"SCNu32")\n",
room->room_id_str, host, port, actual_stream_id);

return actual_stream_id;
room->room_id_str, host, port, rf->stream_id);
return rf->stream_id;
}


Expand Down Expand Up @@ -2665,7 +2583,7 @@ int janus_audiobridge_init(janus_callbacks *callback, const char *config_path) {
}
g_atomic_int_set(&audiobridge->destroyed, 0);
janus_mutex_init(&audiobridge->mutex);
audiobridge->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_audiobridge_rtp_forwarder_destroy);
audiobridge->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_rtp_forwarder_destroy);
audiobridge->rtp_encoder = NULL;
audiobridge->rtp_udp_sock = -1;
janus_mutex_init(&audiobridge->rtp_mutex);
Expand Down Expand Up @@ -3316,7 +3234,7 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s
audiobridge->groups_byid = NULL;
}
}
audiobridge->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_audiobridge_rtp_forwarder_destroy);
audiobridge->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_rtp_forwarder_destroy);
audiobridge->rtp_encoder = NULL;
audiobridge->rtp_udp_sock = -1;
janus_mutex_init(&audiobridge->rtp_mutex);
Expand Down Expand Up @@ -4959,11 +4877,12 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s
g_hash_table_iter_init(&iter, audiobridge->rtp_forwarders);
while(g_hash_table_iter_next(&iter, &key, &value)) {
guint32 stream_id = GPOINTER_TO_UINT(key);
janus_audiobridge_rtp_forwarder *rf = (janus_audiobridge_rtp_forwarder *)value;
janus_rtp_forwarder *rf = (janus_rtp_forwarder *)value;
janus_audiobridge_rtp_forwarder_metadata *rfm = (janus_audiobridge_rtp_forwarder_metadata *)rf->metadata;
json_t *fl = json_object();
json_object_set_new(fl, "stream_id", json_integer(stream_id));
if(rf->group > 0 && audiobridge->groups_byid != NULL) {
char *name = g_hash_table_lookup(audiobridge->groups_byid, GUINT_TO_POINTER(rf->group));
if(rfm->group > 0 && audiobridge->groups_byid != NULL) {
char *name = g_hash_table_lookup(audiobridge->groups_byid, GUINT_TO_POINTER(rfm->group));
if(name != NULL)
json_object_set_new(fl, "group", json_string(name));
}
Expand All @@ -4977,11 +4896,11 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s
}
json_object_set_new(fl, "port", json_integer(ntohs(rf->serv_addr.sin_port)));
json_object_set_new(fl, "ssrc", json_integer(rf->ssrc ? rf->ssrc : stream_id));
json_object_set_new(fl, "codec", json_string(janus_audiocodec_name(rf->codec)));
json_object_set_new(fl, "codec", json_string(janus_audiocodec_name(rfm->codec)));
json_object_set_new(fl, "ptype", json_integer(rf->payload_type));
if(rf->is_srtp)
json_object_set_new(fl, "srtp", json_true());
json_object_set_new(fl, "always_on", rf->always_on ? json_true() : json_false());
json_object_set_new(fl, "always_on", rfm->always_on ? json_true() : json_false());
json_array_append_new(list, fl);
}
janus_mutex_unlock(&audiobridge->rtp_mutex);
Expand Down Expand Up @@ -7960,8 +7879,6 @@ static void *janus_audiobridge_mixer_thread(void *data) {
/* RTP */
guint16 seq = 0;
guint32 ts = 0;
/* SRTP buffer, if needed */
char sbuf[1500];

g_atomic_int_set(&audiobridge->wav_header_added, 0);
/* Loop */
Expand Down Expand Up @@ -8377,8 +8294,9 @@ static void *janus_audiobridge_mixer_thread(void *data) {
gpointer value;
g_hash_table_iter_init(&iter, audiobridge->rtp_forwarders);
while(g_hash_table_iter_next(&iter, NULL, &value)) {
janus_audiobridge_rtp_forwarder *forwarder = (janus_audiobridge_rtp_forwarder *)value;
if(forwarder->always_on) {
janus_rtp_forwarder *rf = (janus_rtp_forwarder *)value;
janus_audiobridge_rtp_forwarder_metadata *rfm = (janus_audiobridge_rtp_forwarder_metadata *)rf->metadata;
if(rfm->always_on) {
go_on = TRUE;
break;
}
Expand Down Expand Up @@ -8406,43 +8324,43 @@ static void *janus_audiobridge_mixer_thread(void *data) {
g_hash_table_iter_init(&iter, audiobridge->rtp_forwarders);
opus_int32 length = 0;
while(audiobridge->rtp_udp_sock > 0 && g_hash_table_iter_next(&iter, &key, &value)) {
guint32 stream_id = GPOINTER_TO_UINT(key);
janus_audiobridge_rtp_forwarder *forwarder = (janus_audiobridge_rtp_forwarder *)value;
if(count == 0 && pf_count == 0 && !forwarder->always_on)
janus_rtp_forwarder *rf = (janus_rtp_forwarder *)value;
janus_audiobridge_rtp_forwarder_metadata *rfm = (janus_audiobridge_rtp_forwarder_metadata *)rf->metadata;
if(count == 0 && pf_count == 0 && !rfm->always_on)
continue;
/* Check if we're forwarding the main mix or a specific group */
if(groups_num > 0) {
if(forwarder->group == 0) {
if(rfm->group == 0) {
/* We're forwarding the main mix */
for(i=0; i<samples; i++)
outBuffer[i] = buffer[i];
} else {
/* We're forwarding a group mix */
index = forwarder->group-1;
index = rfm->group-1;
for(i=0; i<samples; i++)
outBuffer[i] = *(groupBuffers + index*samples + i);
}
}
if(forwarder->codec == JANUS_AUDIOCODEC_OPUS) {
if(rfm->codec == JANUS_AUDIOCODEC_OPUS) {
/* This is an Opus forwarder, check if we have a version for that already */
if(!have_opus[forwarder->group]) {
if(!have_opus[rfm->group]) {
/* We don't, encode now */
OpusEncoder *rtp_encoder = (forwarder->group == 0 ? audiobridge->rtp_encoder : groupEncoders[forwarder->group-1]);
OpusEncoder *rtp_encoder = (rfm->group == 0 ? audiobridge->rtp_encoder : groupEncoders[rfm->group-1]);
length = opus_encode(rtp_encoder, outBuffer,
audiobridge->spatial_audio ? samples/2 : samples,
rtpbuffer + forwarder->group*1500 + 12, 1500-12);
rtpbuffer + rfm->group*1500 + 12, 1500-12);
if(length < 0) {
JANUS_LOG(LOG_ERR, "[Opus] Ops! got an error encoding the Opus frame: %d (%s)\n", length, opus_strerror(length));
continue;
}
have_opus[forwarder->group] = TRUE;
have_opus[rfm->group] = TRUE;
}
rtph = (janus_rtp_header *)(rtpbuffer + forwarder->group*1500);
rtph = (janus_rtp_header *)(rtpbuffer + rfm->group*1500);
rtph->version = 2;
} else if(forwarder->codec == JANUS_AUDIOCODEC_PCMA || forwarder->codec == JANUS_AUDIOCODEC_PCMU) {
} else if(rfm->codec == JANUS_AUDIOCODEC_PCMA || rfm->codec == JANUS_AUDIOCODEC_PCMU) {
/* This is a G.711 forwarder, check if we have a version for that already */
if((forwarder->codec == JANUS_AUDIOCODEC_PCMA && !have_alaw[forwarder->group]) ||
(forwarder->codec == JANUS_AUDIOCODEC_PCMU && !have_ulaw[forwarder->group])) {
if((rfm->codec == JANUS_AUDIOCODEC_PCMA && !have_alaw[rfm->group]) ||
(rfm->codec == JANUS_AUDIOCODEC_PCMU && !have_ulaw[rfm->group])) {
/* We don't, encode now */
if(audiobridge->sampling_rate != 8000) {
/* Downsample this from whatever the mixer uses */
Expand All @@ -8456,56 +8374,30 @@ static void *janus_audiobridge_mixer_thread(void *data) {
memcpy(resampled, outBuffer, samples*2);
}
int i = 0;
if(forwarder->codec == JANUS_AUDIOCODEC_PCMA) {
uint8_t *rtpalaw_buffer = rtpalaw + forwarder->group*G711_SAMPLES + 12;
if(rfm->codec == JANUS_AUDIOCODEC_PCMA) {
uint8_t *rtpalaw_buffer = rtpalaw + rfm->group*G711_SAMPLES + 12;
for(i=0; i<160; i++)
rtpalaw_buffer[i] = janus_audiobridge_g711_alaw_encode(resampled[i]);
have_alaw[forwarder->group] = TRUE;
have_alaw[rfm->group] = TRUE;
} else {
uint8_t *rtpulaw_buffer = rtpulaw + forwarder->group*G711_SAMPLES + 12;
uint8_t *rtpulaw_buffer = rtpulaw + rfm->group*G711_SAMPLES + 12;
for(i=0; i<160; i++)
rtpulaw_buffer[i] = janus_audiobridge_g711_ulaw_encode(resampled[i]);
have_ulaw[forwarder->group] = TRUE;
have_ulaw[rfm->group] = TRUE;
}
}
rtph = (janus_rtp_header *)(forwarder->codec == JANUS_AUDIOCODEC_PCMA ?
(rtpalaw + forwarder->group*G711_SAMPLES) : (rtpulaw + forwarder->group*G711_SAMPLES));
rtph = (janus_rtp_header *)(rfm->codec == JANUS_AUDIOCODEC_PCMA ?
(rtpalaw + rfm->group*G711_SAMPLES) : (rtpulaw + rfm->group*G711_SAMPLES));
rtph->version = 2;
length = 160;
}
/* Update header */
rtph->type = forwarder->payload_type;
rtph->ssrc = htonl(forwarder->ssrc ? forwarder->ssrc : stream_id);
forwarder->seq_number++;
rtph->seq_number = htons(forwarder->seq_number);
forwarder->timestamp += (forwarder->codec == JANUS_AUDIOCODEC_OPUS ? OPUS_SAMPLES : G711_SAMPLES);
rtph->timestamp = htonl(forwarder->timestamp);
/* Check if this packet needs to be encrypted */
char *payload = (char *)rtph;
int plen = length+12;
if(forwarder->is_srtp) {
memcpy(sbuf, payload, plen);
int protected = plen;
int res = srtp_protect(forwarder->srtp_ctx, sbuf, &protected);
if(res != srtp_err_status_ok) {
janus_rtp_header *header = (janus_rtp_header *)sbuf;
guint32 timestamp = ntohl(header->timestamp);
guint16 seq = ntohs(header->seq_number);
JANUS_LOG(LOG_ERR, "Error encrypting RTP packet for room %s... %s (len=%d-->%d, ts=%"SCNu32", seq=%"SCNu16")...\n",
audiobridge->room_id_str, janus_srtp_error_str(res), plen, protected, timestamp, seq);
} else {
payload = (char *)&sbuf;
plen = protected;
}
}
/* No encryption, send the RTP packet as it is */
struct sockaddr *address = (forwarder->serv_addr.sin_family == AF_INET ?
(struct sockaddr *)&forwarder->serv_addr : (struct sockaddr *)&forwarder->serv_addr6);
size_t addrlen = (forwarder->serv_addr.sin_family == AF_INET ? sizeof(forwarder->serv_addr) : sizeof(forwarder->serv_addr6));
if(sendto(audiobridge->rtp_udp_sock, payload, plen, 0, address, addrlen) < 0) {
JANUS_LOG(LOG_HUGE, "Error forwarding mixed RTP packet for room %s... %s (len=%d)...\n",
audiobridge->room_id_str, g_strerror(errno), plen);
}
rfm->seq_number++;
rtph->seq_number = htons(rfm->seq_number);
rfm->timestamp += (rfm->codec == JANUS_AUDIOCODEC_OPUS ? OPUS_SAMPLES : G711_SAMPLES);
rtph->timestamp = htonl(rfm->timestamp);
/* Forward the packet */
janus_rtp_forwarder_send_rtp(rf, (char *)rtph, length+12, -1);
}
}
}
Expand Down
Loading