From c3c7ffb61c46bedcd4008ac73de840b3ef415563 Mon Sep 17 00:00:00 2001 From: Lorenzo Miniero Date: Mon, 6 Mar 2023 17:30:58 +0100 Subject: [PATCH] Refactored RTP forwarder internals as a core feature (#3155) --- src/Makefile.am | 2 + src/janus.c | 8 + src/plugins/janus_audiobridge.c | 228 ++++---------- src/plugins/janus_videoroom.c | 543 ++++---------------------------- src/rtpfwd.c | 444 ++++++++++++++++++++++++++ src/rtpfwd.h | 139 ++++++++ 6 files changed, 719 insertions(+), 645 deletions(-) create mode 100644 src/rtpfwd.c create mode 100644 src/rtpfwd.h diff --git a/src/Makefile.am b/src/Makefile.am index baaafde989..9b467a7f51 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -97,6 +97,8 @@ janus_SOURCES = \ rtcp.h \ rtp.c \ rtp.h \ + rtpfwd.c \ + rtpfwd.h \ rtpsrtp.h \ sctp.c \ sctp.h \ diff --git a/src/janus.c b/src/janus.c index 900a63f2bc..e07867231e 100644 --- a/src/janus.c +++ b/src/janus.c @@ -39,6 +39,7 @@ #include "debug.h" #include "ip-utils.h" #include "rtcp.h" +#include "rtpfwd.h" #include "auth.h" #include "record.h" #include "events.h" @@ -5375,6 +5376,12 @@ gint main(int argc, char *argv[]) { JANUS_LOG(LOG_WARN, "Data Channels support not compiled\n"); #endif + /* Initialize the RTP forwarders functionality */ + if(janus_rtp_forwarders_init() < 0) { + janus_options_destroy(); + exit(1); + } + /* Sessions */ sessions = g_hash_table_new_full(g_int64_hash, g_int64_equal, (GDestroyNotify)g_free, NULL); janus_mutex_init(&sessions_mutex); @@ -5923,6 +5930,7 @@ gint main(int argc, char *argv[]) { JANUS_LOG(LOG_INFO, "De-initializing SCTP...\n"); janus_sctp_deinit(); #endif + janus_rtp_forwarders_deinit(); janus_auth_deinit(); JANUS_LOG(LOG_INFO, "Closing plugins:\n"); diff --git a/src/plugins/janus_audiobridge.c b/src/plugins/janus_audiobridge.c index 9ae6030d9b..59926fff32 100644 --- a/src/plugins/janus_audiobridge.c +++ b/src/plugins/janus_audiobridge.c @@ -1048,6 +1048,7 @@ room-: { #include "../rtp.h" #include "../rtpsrtp.h" #include "../rtcp.h" +#include "../rtpfwd.h" #include "../record.h" #include "../sdp-utils.h" #include "../utils.h" @@ -1746,123 +1747,40 @@ static void janus_audiobridge_message_free(janus_audiobridge_message *msg) { static void janus_audiobridge_recorder_create(janus_audiobridge_participant *participant); static void janus_audiobridge_recorder_close(janus_audiobridge_participant *participant); -/* RTP forwarder instance: address to send to, and current RTP header info */ -typedef struct janus_audiobridge_rtp_forwarder { - struct sockaddr_in serv_addr; - struct sockaddr_in6 serv_addr6; - uint32_t ssrc; +/* RTP forwarder metadata */ +typedef struct janus_audiobridge_rtp_forwarder_metadata { janus_audiocodec codec; - int payload_type; - uint16_t seq_number; uint32_t timestamp; + uint16_t seq_number; uint group; gboolean always_on; - /* Only needed for SRTP forwarders */ - gboolean is_srtp; - srtp_t srtp_ctx; - srtp_policy_t srtp_policy; - /* Reference */ - volatile gint destroyed; - janus_refcount ref; -} janus_audiobridge_rtp_forwarder; -static void janus_audiobridge_rtp_forwarder_destroy(janus_audiobridge_rtp_forwarder *rf) { - if(rf && g_atomic_int_compare_and_exchange(&rf->destroyed, 0, 1)) { - janus_refcount_decrease(&rf->ref); - } -} -static void janus_audiobridge_rtp_forwarder_free(const janus_refcount *f_ref) { - janus_audiobridge_rtp_forwarder *rf = janus_refcount_containerof(f_ref, janus_audiobridge_rtp_forwarder, ref); - if(rf->is_srtp) { - srtp_dealloc(rf->srtp_ctx); - g_free(rf->srtp_policy.key); - } - g_free(rf); -} +} janus_audiobridge_rtp_forwarder_metadata; +/* Helper to create a new RTP forwarder with the right metadata */ static guint32 janus_audiobridge_rtp_forwarder_add_helper(janus_audiobridge_room *room, uint group, const gchar *host, uint16_t port, uint32_t ssrc, int pt, janus_audiocodec codec, int srtp_suite, const char *srtp_crypto, gboolean always_on, guint32 stream_id) { if(room == NULL || host == NULL) return 0; - janus_audiobridge_rtp_forwarder *rf = g_malloc0(sizeof(janus_audiobridge_rtp_forwarder)); - /* First of all, let's check if we need to setup an SRTP forwarder */ - if(srtp_suite > 0 && srtp_crypto != NULL) { - /* Base64 decode the crypto string and set it as the SRTP context */ - gsize len = 0; - guchar *decoded = g_base64_decode(srtp_crypto, &len); - if(len < SRTP_MASTER_LENGTH) { - JANUS_LOG(LOG_ERR, "Invalid SRTP crypto (%s)\n", srtp_crypto); - g_free(decoded); - g_free(rf); - return 0; - } - /* Set SRTP policy */ - srtp_policy_t *policy = &rf->srtp_policy; - srtp_crypto_policy_set_rtp_default(&(policy->rtp)); - if(srtp_suite == 32) { - srtp_crypto_policy_set_aes_cm_128_hmac_sha1_32(&(policy->rtp)); - } else if(srtp_suite == 80) { - srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&(policy->rtp)); - } - policy->ssrc.type = ssrc_any_outbound; - policy->key = decoded; - policy->next = NULL; - /* Create SRTP context */ - srtp_err_status_t res = srtp_create(&rf->srtp_ctx, policy); - if(res != srtp_err_status_ok) { - /* Something went wrong... */ - JANUS_LOG(LOG_ERR, "Error creating forwarder SRTP session: %d (%s)\n", res, janus_srtp_error_str(res)); - g_free(decoded); - policy->key = NULL; - g_free(rf); - return 0; - } - rf->is_srtp = TRUE; - } - /* Check if the host address is IPv4 or IPv6 */ - if(strstr(host, ":") != NULL) { - rf->serv_addr6.sin6_family = AF_INET6; - inet_pton(AF_INET6, host, &(rf->serv_addr6.sin6_addr)); - rf->serv_addr6.sin6_port = htons(port); - } else { - rf->serv_addr.sin_family = AF_INET; - inet_pton(AF_INET, host, &(rf->serv_addr.sin_addr)); - rf->serv_addr.sin_port = htons(port); - } - /* Setup RTP info (we'll use the stream ID as SSRC) */ - rf->codec = codec; - rf->ssrc = ssrc; - rf->payload_type = pt; - if(codec == JANUS_AUDIOCODEC_PCMA) - rf->payload_type = 8; - else if(codec == JANUS_AUDIOCODEC_PCMU) - rf->payload_type = 0; - rf->seq_number = 0; - rf->timestamp = 0; - rf->group = group; - rf->always_on = always_on; - + /* Create a new RTP forwarder */ + janus_rtp_forwarder *rf = janus_rtp_forwarder_create(JANUS_AUDIOBRIDGE_NAME, stream_id, + room->rtp_udp_sock, host, port, ssrc, pt, srtp_suite, srtp_crypto, FALSE, 0, FALSE, FALSE); + if(rf == NULL) + return 0; + /* Fill in some metadata we'll need */ + janus_audiobridge_rtp_forwarder_metadata *metadata = g_malloc0(sizeof(janus_audiobridge_rtp_forwarder_metadata)); + metadata->codec = codec; + metadata->group = group; + metadata->always_on = always_on; + rf->metadata = metadata; + /* Add the forwarder to the ones we have in the room */ janus_mutex_lock(&room->rtp_mutex); - - guint32 actual_stream_id; - if(stream_id > 0) { - actual_stream_id = stream_id; - } else { - actual_stream_id = janus_random_uint32(); - } - - while(g_hash_table_lookup(room->rtp_forwarders, GUINT_TO_POINTER(actual_stream_id)) != NULL) { - actual_stream_id = janus_random_uint32(); - } - janus_refcount_init(&rf->ref, janus_audiobridge_rtp_forwarder_free); - g_hash_table_insert(room->rtp_forwarders, GUINT_TO_POINTER(actual_stream_id), rf); - + g_hash_table_insert(room->rtp_forwarders, GUINT_TO_POINTER(rf->stream_id), rf); janus_mutex_unlock(&room->rtp_mutex); - + /* Done */ JANUS_LOG(LOG_VERB, "Added RTP forwarder to room %s: %s:%d (ID: %"SCNu32")\n", - room->room_id_str, host, port, actual_stream_id); - - return actual_stream_id; + room->room_id_str, host, port, rf->stream_id); + return rf->stream_id; } @@ -2665,7 +2583,7 @@ int janus_audiobridge_init(janus_callbacks *callback, const char *config_path) { } g_atomic_int_set(&audiobridge->destroyed, 0); janus_mutex_init(&audiobridge->mutex); - audiobridge->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_audiobridge_rtp_forwarder_destroy); + audiobridge->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_rtp_forwarder_destroy); audiobridge->rtp_encoder = NULL; audiobridge->rtp_udp_sock = -1; janus_mutex_init(&audiobridge->rtp_mutex); @@ -3316,7 +3234,7 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s audiobridge->groups_byid = NULL; } } - audiobridge->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_audiobridge_rtp_forwarder_destroy); + audiobridge->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_rtp_forwarder_destroy); audiobridge->rtp_encoder = NULL; audiobridge->rtp_udp_sock = -1; janus_mutex_init(&audiobridge->rtp_mutex); @@ -4958,11 +4876,12 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s g_hash_table_iter_init(&iter, audiobridge->rtp_forwarders); while(g_hash_table_iter_next(&iter, &key, &value)) { guint32 stream_id = GPOINTER_TO_UINT(key); - janus_audiobridge_rtp_forwarder *rf = (janus_audiobridge_rtp_forwarder *)value; + janus_rtp_forwarder *rf = (janus_rtp_forwarder *)value; + janus_audiobridge_rtp_forwarder_metadata *rfm = (janus_audiobridge_rtp_forwarder_metadata *)rf->metadata; json_t *fl = json_object(); json_object_set_new(fl, "stream_id", json_integer(stream_id)); - if(rf->group > 0 && audiobridge->groups_byid != NULL) { - char *name = g_hash_table_lookup(audiobridge->groups_byid, GUINT_TO_POINTER(rf->group)); + if(rfm->group > 0 && audiobridge->groups_byid != NULL) { + char *name = g_hash_table_lookup(audiobridge->groups_byid, GUINT_TO_POINTER(rfm->group)); if(name != NULL) json_object_set_new(fl, "group", json_string(name)); } @@ -4976,11 +4895,11 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s } json_object_set_new(fl, "port", json_integer(ntohs(rf->serv_addr.sin_port))); json_object_set_new(fl, "ssrc", json_integer(rf->ssrc ? rf->ssrc : stream_id)); - json_object_set_new(fl, "codec", json_string(janus_audiocodec_name(rf->codec))); + json_object_set_new(fl, "codec", json_string(janus_audiocodec_name(rfm->codec))); json_object_set_new(fl, "ptype", json_integer(rf->payload_type)); if(rf->is_srtp) json_object_set_new(fl, "srtp", json_true()); - json_object_set_new(fl, "always_on", rf->always_on ? json_true() : json_false()); + json_object_set_new(fl, "always_on", rfm->always_on ? json_true() : json_false()); json_array_append_new(list, fl); } janus_mutex_unlock(&audiobridge->rtp_mutex); @@ -7961,8 +7880,6 @@ static void *janus_audiobridge_mixer_thread(void *data) { /* RTP */ guint16 seq = 0; guint32 ts = 0; - /* SRTP buffer, if needed */ - char sbuf[1500]; g_atomic_int_set(&audiobridge->wav_header_added, 0); /* Loop */ @@ -8378,8 +8295,9 @@ static void *janus_audiobridge_mixer_thread(void *data) { gpointer value; g_hash_table_iter_init(&iter, audiobridge->rtp_forwarders); while(g_hash_table_iter_next(&iter, NULL, &value)) { - janus_audiobridge_rtp_forwarder *forwarder = (janus_audiobridge_rtp_forwarder *)value; - if(forwarder->always_on) { + janus_rtp_forwarder *rf = (janus_rtp_forwarder *)value; + janus_audiobridge_rtp_forwarder_metadata *rfm = (janus_audiobridge_rtp_forwarder_metadata *)rf->metadata; + if(rfm->always_on) { go_on = TRUE; break; } @@ -8407,43 +8325,43 @@ static void *janus_audiobridge_mixer_thread(void *data) { g_hash_table_iter_init(&iter, audiobridge->rtp_forwarders); opus_int32 length = 0; while(audiobridge->rtp_udp_sock > 0 && g_hash_table_iter_next(&iter, &key, &value)) { - guint32 stream_id = GPOINTER_TO_UINT(key); - janus_audiobridge_rtp_forwarder *forwarder = (janus_audiobridge_rtp_forwarder *)value; - if(count == 0 && pf_count == 0 && !forwarder->always_on) + janus_rtp_forwarder *rf = (janus_rtp_forwarder *)value; + janus_audiobridge_rtp_forwarder_metadata *rfm = (janus_audiobridge_rtp_forwarder_metadata *)rf->metadata; + if(count == 0 && pf_count == 0 && !rfm->always_on) continue; /* Check if we're forwarding the main mix or a specific group */ if(groups_num > 0) { - if(forwarder->group == 0) { + if(rfm->group == 0) { /* We're forwarding the main mix */ for(i=0; igroup-1; + index = rfm->group-1; for(i=0; icodec == JANUS_AUDIOCODEC_OPUS) { + if(rfm->codec == JANUS_AUDIOCODEC_OPUS) { /* This is an Opus forwarder, check if we have a version for that already */ - if(!have_opus[forwarder->group]) { + if(!have_opus[rfm->group]) { /* We don't, encode now */ - OpusEncoder *rtp_encoder = (forwarder->group == 0 ? audiobridge->rtp_encoder : groupEncoders[forwarder->group-1]); + OpusEncoder *rtp_encoder = (rfm->group == 0 ? audiobridge->rtp_encoder : groupEncoders[rfm->group-1]); length = opus_encode(rtp_encoder, outBuffer, audiobridge->spatial_audio ? samples/2 : samples, - rtpbuffer + forwarder->group*1500 + 12, 1500-12); + rtpbuffer + rfm->group*1500 + 12, 1500-12); if(length < 0) { JANUS_LOG(LOG_ERR, "[Opus] Ops! got an error encoding the Opus frame: %d (%s)\n", length, opus_strerror(length)); continue; } - have_opus[forwarder->group] = TRUE; + have_opus[rfm->group] = TRUE; } - rtph = (janus_rtp_header *)(rtpbuffer + forwarder->group*1500); + rtph = (janus_rtp_header *)(rtpbuffer + rfm->group*1500); rtph->version = 2; - } else if(forwarder->codec == JANUS_AUDIOCODEC_PCMA || forwarder->codec == JANUS_AUDIOCODEC_PCMU) { + } else if(rfm->codec == JANUS_AUDIOCODEC_PCMA || rfm->codec == JANUS_AUDIOCODEC_PCMU) { /* This is a G.711 forwarder, check if we have a version for that already */ - if((forwarder->codec == JANUS_AUDIOCODEC_PCMA && !have_alaw[forwarder->group]) || - (forwarder->codec == JANUS_AUDIOCODEC_PCMU && !have_ulaw[forwarder->group])) { + if((rfm->codec == JANUS_AUDIOCODEC_PCMA && !have_alaw[rfm->group]) || + (rfm->codec == JANUS_AUDIOCODEC_PCMU && !have_ulaw[rfm->group])) { /* We don't, encode now */ if(audiobridge->sampling_rate != 8000) { /* Downsample this from whatever the mixer uses */ @@ -8457,56 +8375,30 @@ static void *janus_audiobridge_mixer_thread(void *data) { memcpy(resampled, outBuffer, samples*2); } int i = 0; - if(forwarder->codec == JANUS_AUDIOCODEC_PCMA) { - uint8_t *rtpalaw_buffer = rtpalaw + forwarder->group*G711_SAMPLES + 12; + if(rfm->codec == JANUS_AUDIOCODEC_PCMA) { + uint8_t *rtpalaw_buffer = rtpalaw + rfm->group*G711_SAMPLES + 12; for(i=0; i<160; i++) rtpalaw_buffer[i] = janus_audiobridge_g711_alaw_encode(resampled[i]); - have_alaw[forwarder->group] = TRUE; + have_alaw[rfm->group] = TRUE; } else { - uint8_t *rtpulaw_buffer = rtpulaw + forwarder->group*G711_SAMPLES + 12; + uint8_t *rtpulaw_buffer = rtpulaw + rfm->group*G711_SAMPLES + 12; for(i=0; i<160; i++) rtpulaw_buffer[i] = janus_audiobridge_g711_ulaw_encode(resampled[i]); - have_ulaw[forwarder->group] = TRUE; + have_ulaw[rfm->group] = TRUE; } } - rtph = (janus_rtp_header *)(forwarder->codec == JANUS_AUDIOCODEC_PCMA ? - (rtpalaw + forwarder->group*G711_SAMPLES) : (rtpulaw + forwarder->group*G711_SAMPLES)); + rtph = (janus_rtp_header *)(rfm->codec == JANUS_AUDIOCODEC_PCMA ? + (rtpalaw + rfm->group*G711_SAMPLES) : (rtpulaw + rfm->group*G711_SAMPLES)); rtph->version = 2; length = 160; } /* Update header */ - rtph->type = forwarder->payload_type; - rtph->ssrc = htonl(forwarder->ssrc ? forwarder->ssrc : stream_id); - forwarder->seq_number++; - rtph->seq_number = htons(forwarder->seq_number); - forwarder->timestamp += (forwarder->codec == JANUS_AUDIOCODEC_OPUS ? OPUS_SAMPLES : G711_SAMPLES); - rtph->timestamp = htonl(forwarder->timestamp); - /* Check if this packet needs to be encrypted */ - char *payload = (char *)rtph; - int plen = length+12; - if(forwarder->is_srtp) { - memcpy(sbuf, payload, plen); - int protected = plen; - int res = srtp_protect(forwarder->srtp_ctx, sbuf, &protected); - if(res != srtp_err_status_ok) { - janus_rtp_header *header = (janus_rtp_header *)sbuf; - guint32 timestamp = ntohl(header->timestamp); - guint16 seq = ntohs(header->seq_number); - JANUS_LOG(LOG_ERR, "Error encrypting RTP packet for room %s... %s (len=%d-->%d, ts=%"SCNu32", seq=%"SCNu16")...\n", - audiobridge->room_id_str, janus_srtp_error_str(res), plen, protected, timestamp, seq); - } else { - payload = (char *)&sbuf; - plen = protected; - } - } - /* No encryption, send the RTP packet as it is */ - struct sockaddr *address = (forwarder->serv_addr.sin_family == AF_INET ? - (struct sockaddr *)&forwarder->serv_addr : (struct sockaddr *)&forwarder->serv_addr6); - size_t addrlen = (forwarder->serv_addr.sin_family == AF_INET ? sizeof(forwarder->serv_addr) : sizeof(forwarder->serv_addr6)); - if(sendto(audiobridge->rtp_udp_sock, payload, plen, 0, address, addrlen) < 0) { - JANUS_LOG(LOG_HUGE, "Error forwarding mixed RTP packet for room %s... %s (len=%d)...\n", - audiobridge->room_id_str, g_strerror(errno), plen); - } + rfm->seq_number++; + rtph->seq_number = htons(rfm->seq_number); + rfm->timestamp += (rfm->codec == JANUS_AUDIOCODEC_OPUS ? OPUS_SAMPLES : G711_SAMPLES); + rtph->timestamp = htonl(rfm->timestamp); + /* Forward the packet */ + janus_rtp_forwarder_send_rtp(rf, (char *)rtph, length+12, -1); } } } diff --git a/src/plugins/janus_videoroom.c b/src/plugins/janus_videoroom.c index 6d9d2b9e6d..a7d0cc2a66 100644 --- a/src/plugins/janus_videoroom.c +++ b/src/plugins/janus_videoroom.c @@ -1511,6 +1511,7 @@ room-: { #include "../rtp.h" #include "../rtpsrtp.h" #include "../rtcp.h" +#include "../rtpfwd.h" #include "../record.h" #include "../sdp-utils.h" #include "../utils.h" @@ -2057,91 +2058,6 @@ typedef struct janus_videoroom_session { static GHashTable *sessions; static janus_mutex sessions_mutex = JANUS_MUTEX_INITIALIZER; -/* A host whose ports gets streamed RTP packets of the corresponding type */ -typedef struct janus_videoroom_srtp_context janus_videoroom_srtp_context; -typedef struct janus_videoroom_rtp_forwarder { - void *source; - uint32_t stream_id; - gboolean is_video; - gboolean is_data; - uint32_t ssrc; - int payload_type; - int substream; - struct sockaddr_in serv_addr; - struct sockaddr_in6 serv_addr6; - /* Only needed for RTCP */ - int rtcp_fd; - uint16_t local_rtcp_port, remote_rtcp_port; - GSource *rtcp_recv; - /* Only needed when forwarding simulcasted streams to a single endpoint */ - gboolean simulcast; - janus_rtp_switching_context context; - janus_rtp_simulcasting_context sim_context; - /* Only needed for SRTP forwarders */ - gboolean is_srtp; - janus_videoroom_srtp_context *srtp_ctx; - /* In case this is part of the remotization of publisher */ - char *remote_id; - /* Reference */ - volatile gint destroyed; - janus_refcount ref; -} janus_videoroom_rtp_forwarder; -static void janus_videoroom_rtp_forwarder_destroy(janus_videoroom_rtp_forwarder *forward); -static void janus_videoroom_rtp_forwarder_free(const janus_refcount *f_ref); -/* SRTP encryption may be needed, and potentially shared */ -struct janus_videoroom_srtp_context { - GHashTable *contexts; - char *id; - srtp_t ctx; - srtp_policy_t policy; - char sbuf[1500]; - int slen; - /* Keep track of how many forwarders are using this context */ - uint8_t count; -}; -static void janus_videoroom_srtp_context_free(gpointer data); -/* RTCP support in RTP forwarders */ -typedef struct janus_videoroom_rtcp_receiver { - GSource parent; - janus_videoroom_rtp_forwarder *forward; - GDestroyNotify destroy; -} janus_videoroom_rtcp_receiver; -static void janus_videoroom_rtp_forwarder_rtcp_receive(janus_videoroom_rtp_forwarder *forward); -static gboolean janus_videoroom_rtp_forwarder_rtcp_prepare(GSource *source, gint *timeout) { - *timeout = -1; - return FALSE; -} -static gboolean janus_videoroom_rtp_forwarder_rtcp_dispatch(GSource *source, GSourceFunc callback, gpointer user_data) { - janus_videoroom_rtcp_receiver *r = (janus_videoroom_rtcp_receiver *)source; - /* Receive the packet */ - if(r) - janus_videoroom_rtp_forwarder_rtcp_receive(r->forward); - return G_SOURCE_CONTINUE; -} -static void janus_videoroom_publisher_stream_dereference_void(void *ps); -static void janus_videoroom_rtp_forwarder_rtcp_finalize(GSource *source) { - janus_videoroom_rtcp_receiver *r = (janus_videoroom_rtcp_receiver *)source; - /* Remove the reference to the forwarder */ - if(r && r->forward) { - if(r->forward->source) { - janus_videoroom_publisher_stream_dereference_void(r->forward->source); - r->forward->source = NULL; - } - janus_refcount_decrease(&r->forward->ref); - } -} -static GSourceFuncs janus_videoroom_rtp_forwarder_rtcp_funcs = { - janus_videoroom_rtp_forwarder_rtcp_prepare, - NULL, - janus_videoroom_rtp_forwarder_rtcp_dispatch, - janus_videoroom_rtp_forwarder_rtcp_finalize, - NULL, NULL -}; -static GMainContext *rtcpfwd_ctx = NULL; -static GMainLoop *rtcpfwd_loop = NULL; -static GThread *rtcpfwd_thread = NULL; -static void *janus_videoroom_rtp_forwarder_rtcp_thread(void *data); - typedef struct janus_videoroom_publisher { janus_videoroom_session *session; janus_videoroom *room; /* Room */ @@ -2172,7 +2088,6 @@ typedef struct janus_videoroom_publisher { GSList *subscriptions; /* Subscriptions this publisher has created (who this publisher is watching) */ janus_mutex subscribers_mutex; janus_mutex own_subscriptions_mutex; - GHashTable *srtp_contexts; /* SRTP contexts that we can share among RTP forwarders */ /* In case this local publisher is being forwarder remotely */ GHashTable *remote_recipients; /* In case this is a remote publisher */ @@ -2246,12 +2161,13 @@ typedef struct janus_videoroom_publisher_stream { janus_refcount ref; } janus_videoroom_publisher_stream; /* Helper to add a new RTP forwarder for a specific stream sent by publisher */ -static janus_videoroom_rtp_forwarder *janus_videoroom_rtp_forwarder_add_helper(janus_videoroom_publisher *p, +static janus_rtp_forwarder *janus_videoroom_rtp_forwarder_add_helper(janus_videoroom_publisher *p, janus_videoroom_publisher_stream *ps, const gchar *host, int port, int rtcp_port, int pt, uint32_t ssrc, gboolean simulcast, int srtp_suite, const char *srtp_crypto, int substream, gboolean is_video, gboolean is_data); -static json_t *janus_videoroom_rtp_forwarder_summary(janus_videoroom_rtp_forwarder *f); +static void janus_videoroom_rtp_forwarder_rtcp_receive(janus_rtp_forwarder *rf, char *buffer, int len); +static json_t *janus_videoroom_rtp_forwarder_summary(janus_rtp_forwarder *f); static void janus_videoroom_create_dummy_publisher(janus_videoroom *room, GHashTable *streams); /* We support remote publishers as well, for which we use plain RTP, @@ -2470,7 +2386,7 @@ static void janus_videoroom_publisher_destroy(janus_videoroom_publisher *p) { gpointer key_f, value_f; g_hash_table_iter_init(&iter_f, ps->rtp_forwarders); while(g_hash_table_iter_next(&iter_f, &key_f, &value_f)) { - janus_videoroom_rtp_forwarder *rpv = value_f; + janus_rtp_forwarder *rpv = value_f; if(rpv->rtcp_recv) { GSource *source = rpv->rtcp_recv; rpv->rtcp_recv = NULL; @@ -2502,7 +2418,6 @@ static void janus_videoroom_publisher_free(const janus_refcount *p_ref) { close(p->udp_sock); g_hash_table_destroy(p->remote_recipients); g_hash_table_destroy(p->rtp_forwarders); - g_hash_table_destroy(p->srtp_contexts); g_slist_free(p->subscriptions); if(p->remote_fd > 0) @@ -2706,228 +2621,44 @@ static void janus_videoroom_reqpli(janus_videoroom_publisher_stream *ps, const c /* RTP forwarder helpers */ -static janus_videoroom_rtp_forwarder *janus_videoroom_rtp_forwarder_add_helper(janus_videoroom_publisher *p, +static janus_rtp_forwarder *janus_videoroom_rtp_forwarder_add_helper(janus_videoroom_publisher *p, janus_videoroom_publisher_stream *ps, const gchar *host, int port, int rtcp_port, int pt, uint32_t ssrc, gboolean simulcast, int srtp_suite, const char *srtp_crypto, int substream, gboolean is_video, gboolean is_data) { - if(!p || !ps || !host) { + if(!p || !ps || !host) return NULL; - } - if(ipv6_disabled && strstr(host, ":") != NULL) { - JANUS_LOG(LOG_ERR, "Attempt to create an IPv6 forwarder, but IPv6 networking is not available\n"); - return NULL; - } janus_refcount_increase(&p->ref); janus_refcount_increase(&ps->ref); + /* Create a new RTP forwarder */ + janus_rtp_forwarder *rf = janus_rtp_forwarder_create(JANUS_VIDEOROOM_NAME, 0, + p->udp_sock, host, port, ssrc, pt, srtp_suite, srtp_crypto, simulcast, substream, is_video, is_data); + if(rf == NULL) + return NULL; + rf->source = ps; + if(simulcast && ps->rid_extmap_id > 0) + rf->sim_context.rid_ext_id = ps->rid_extmap_id; + /* Add the forwarder to the ones we have for the publisher stream */ janus_mutex_lock(&ps->rtp_forwarders_mutex); - /* Do we need to bind to a port for RTCP? */ - int fd = -1; - uint16_t local_rtcp_port = 0; - if(!is_data && rtcp_port > 0) { - fd = socket(!ipv6_disabled ? AF_INET6 : AF_INET, SOCK_DGRAM, IPPROTO_UDP); - if(fd < 0) { - janus_mutex_unlock(&ps->rtp_forwarders_mutex); - janus_refcount_decrease(&ps->ref); - janus_refcount_decrease(&p->ref); - JANUS_LOG(LOG_ERR, "Error creating RTCP socket for new RTP forwarder... %d (%s)\n", - errno, g_strerror(errno)); - return NULL; - } - struct sockaddr *address = NULL; - struct sockaddr_in addr4 = { 0 }; - struct sockaddr_in6 addr6 = { 0 }; - socklen_t len = 0; - if(!ipv6_disabled) { - /* Configure the socket so that it can be used both on IPv4 and IPv6 */ - int v6only = 0; - if(setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &v6only, sizeof(v6only)) != 0) { - janus_mutex_unlock(&ps->rtp_forwarders_mutex); - janus_refcount_decrease(&ps->ref); - janus_refcount_decrease(&p->ref); - JANUS_LOG(LOG_ERR, "Error configuring RTCP socket for new RTP forwarder... %d (%s)\n", - errno, g_strerror(errno)); - close(fd); - return NULL; - } - len = sizeof(addr6); - addr6.sin6_family = AF_INET6; - addr6.sin6_port = htons(0); /* The RTCP port we received is the remote one */ - addr6.sin6_addr = in6addr_any; - address = (struct sockaddr *)&addr6; - } else { - /* IPv6 is disabled, only do IPv4 */ - len = sizeof(addr4); - addr4.sin_family = AF_INET; - addr4.sin_port = htons(0); /* The RTCP port we received is the remote one */ - addr4.sin_addr.s_addr = INADDR_ANY; - address = (struct sockaddr *)&addr4; - } - if(bind(fd, (struct sockaddr *)address, len) < 0 || - getsockname(fd, (struct sockaddr *)address, &len) < 0) { - janus_mutex_unlock(&ps->rtp_forwarders_mutex); - janus_refcount_decrease(&ps->ref); - janus_refcount_decrease(&p->ref); - JANUS_LOG(LOG_ERR, "Error binding RTCP socket for new RTP forwarder... %d (%s)\n", - errno, g_strerror(errno)); - close(fd); - return NULL; + g_hash_table_insert(ps->rtp_forwarders, GUINT_TO_POINTER(rf->stream_id), rf); + g_hash_table_insert(p->rtp_forwarders, GUINT_TO_POINTER(rf->stream_id), GUINT_TO_POINTER(rf->stream_id)); + janus_mutex_unlock(&ps->rtp_forwarders_mutex); + /* If we need to add RTCP too, do that now */ + if(rtcp_port > 0) { + int res = janus_rtp_forwarder_add_rtcp(rf, rtcp_port, &janus_videoroom_rtp_forwarder_rtcp_receive); + if(res < 0) { + JANUS_LOG(LOG_WARN, "Error adding RTCP support to new RTP forwarder (%d)...\n", res); } - local_rtcp_port = ntohs(!ipv6_disabled ? addr6.sin6_port : addr4.sin_port); - JANUS_LOG(LOG_VERB, "Bound local %s RTCP port: %"SCNu16"\n", - is_video ? "video" : "audio", local_rtcp_port); - } - janus_videoroom_rtp_forwarder *forward = g_malloc0(sizeof(janus_videoroom_rtp_forwarder)); - forward->source = ps; - forward->rtcp_fd = fd; - forward->local_rtcp_port = local_rtcp_port; - forward->remote_rtcp_port = rtcp_port > 0 ? rtcp_port : 0; - /* First of all, let's check if we need to setup an SRTP forwarder */ - if(!is_data && srtp_suite > 0 && srtp_crypto != NULL) { - /* First of all, let's check if there's already an RTP forwarder with - * the same SRTP context: make sure SSRC and pt are the same too */ - char media[10] = {0}; - if(!is_video) { - g_sprintf(media, "audio"); - } else if(is_video) { - g_sprintf(media, "video%d", substream); - } - char srtp_id[256] = {0}; - g_snprintf(srtp_id, 255, "%s-%s-%"SCNu32"-%d", srtp_crypto, media, ssrc, pt); - JANUS_LOG(LOG_VERB, "SRTP context ID: %s\n", srtp_id); - janus_videoroom_srtp_context *srtp_ctx = g_hash_table_lookup(p->srtp_contexts, srtp_id); - if(srtp_ctx != NULL) { - JANUS_LOG(LOG_VERB, " -- Reusing existing SRTP context\n"); - srtp_ctx->count++; - forward->srtp_ctx = srtp_ctx; - } else { - /* Nope, base64 decode the crypto string and set it as a new SRTP context */ - JANUS_LOG(LOG_VERB, " -- Creating new SRTP context\n"); - srtp_ctx = g_malloc0(sizeof(janus_videoroom_srtp_context)); - gsize len = 0; - guchar *decoded = g_base64_decode(srtp_crypto, &len); - if(len < SRTP_MASTER_LENGTH) { - janus_mutex_unlock(&ps->rtp_forwarders_mutex); - janus_refcount_decrease(&ps->ref); - janus_refcount_decrease(&p->ref); - JANUS_LOG(LOG_ERR, "Invalid SRTP crypto (%s)\n", srtp_crypto); - g_free(decoded); - g_free(srtp_ctx); - if(forward->rtcp_fd > -1) - close(forward->rtcp_fd); - g_free(forward); - return NULL; - } - /* Set SRTP policy */ - srtp_policy_t *policy = &srtp_ctx->policy; - srtp_crypto_policy_set_rtp_default(&(policy->rtp)); - if(srtp_suite == 32) { - srtp_crypto_policy_set_aes_cm_128_hmac_sha1_32(&(policy->rtp)); - } else if(srtp_suite == 80) { - srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&(policy->rtp)); - } - policy->ssrc.type = ssrc_any_outbound; - policy->key = decoded; - policy->next = NULL; - /* Create SRTP context */ - srtp_err_status_t res = srtp_create(&srtp_ctx->ctx, policy); - if(res != srtp_err_status_ok) { - /* Something went wrong... */ - janus_mutex_unlock(&ps->rtp_forwarders_mutex); - janus_refcount_decrease(&ps->ref); - janus_refcount_decrease(&p->ref); - JANUS_LOG(LOG_ERR, "Error creating forwarder SRTP session: %d (%s)\n", res, janus_srtp_error_str(res)); - g_free(decoded); - policy->key = NULL; - g_free(srtp_ctx); - if(forward->rtcp_fd > -1) - close(forward->rtcp_fd); - g_free(forward); - return NULL; - } - srtp_ctx->contexts = p->srtp_contexts; - srtp_ctx->id = g_strdup(srtp_id); - srtp_ctx->count = 1; - g_hash_table_insert(p->srtp_contexts, srtp_ctx->id, srtp_ctx); - forward->srtp_ctx = srtp_ctx; - } - forward->is_srtp = TRUE; - } - forward->is_video = is_video; - forward->payload_type = pt; - forward->ssrc = ssrc; - forward->substream = substream; - forward->is_data = is_data; - /* Check if the host address is IPv4 or IPv6 */ - if(strstr(host, ":") != NULL) { - forward->serv_addr6.sin6_family = AF_INET6; - inet_pton(AF_INET6, host, &(forward->serv_addr6.sin6_addr)); - forward->serv_addr6.sin6_port = htons(port); - } else { - forward->serv_addr.sin_family = AF_INET; - inet_pton(AF_INET, host, &(forward->serv_addr.sin_addr)); - forward->serv_addr.sin_port = htons(port); - } - if(is_video && simulcast) { - forward->simulcast = TRUE; - janus_rtp_switching_context_reset(&forward->context); - janus_rtp_simulcasting_context_reset(&forward->sim_context); - forward->sim_context.rid_ext_id = ps->rid_extmap_id; - forward->sim_context.substream_target = 2; - forward->sim_context.templayer_target = 2; } - janus_refcount_init(&forward->ref, janus_videoroom_rtp_forwarder_free); - guint32 stream_id = janus_random_uint32(); - while(g_hash_table_lookup(ps->rtp_forwarders, GUINT_TO_POINTER(stream_id)) != NULL && - g_hash_table_lookup(p->rtp_forwarders, GUINT_TO_POINTER(stream_id)) != NULL) { - stream_id = janus_random_uint32(); - } - forward->stream_id = stream_id; - g_hash_table_insert(ps->rtp_forwarders, GUINT_TO_POINTER(stream_id), forward); - g_hash_table_insert(p->rtp_forwarders, GUINT_TO_POINTER(stream_id), GUINT_TO_POINTER(stream_id)); - if(fd > -1) { - /* We need RTCP: track this file descriptor, and ref the forwarder */ - janus_refcount_increase(&ps->ref); - janus_refcount_increase(&forward->ref); - forward->rtcp_recv = g_source_new(&janus_videoroom_rtp_forwarder_rtcp_funcs, sizeof(janus_videoroom_rtcp_receiver)); - janus_videoroom_rtcp_receiver *rr = (janus_videoroom_rtcp_receiver *)forward->rtcp_recv; - rr->forward = forward; - g_source_set_priority(forward->rtcp_recv, G_PRIORITY_DEFAULT); - g_source_add_unix_fd(forward->rtcp_recv, fd, G_IO_IN | G_IO_ERR); - g_source_attach((GSource *)forward->rtcp_recv, rtcpfwd_ctx); - /* Send a couple of empty RTP packets to the remote port to do latching */ - struct sockaddr *address = NULL; - struct sockaddr_in addr4 = { 0 }; - struct sockaddr_in6 addr6 = { 0 }; - socklen_t addrlen = 0; - if(forward->serv_addr.sin_family == AF_INET) { - addr4.sin_family = AF_INET; - addr4.sin_addr.s_addr = forward->serv_addr.sin_addr.s_addr; - addr4.sin_port = htons(forward->remote_rtcp_port); - address = (struct sockaddr *)&addr4; - addrlen = sizeof(addr4); - } else { - addr6.sin6_family = AF_INET6; - memcpy(&addr6.sin6_addr, &forward->serv_addr6.sin6_addr, sizeof(struct in6_addr)); - addr6.sin6_port = htons(forward->remote_rtcp_port); - address = (struct sockaddr *)&addr6; - addrlen = sizeof(addr6); - } - janus_rtp_header rtp; - memset(&rtp, 0, sizeof(rtp)); - rtp.version = 2; - (void)sendto(fd, &rtp, 12, 0, address, addrlen); - (void)sendto(fd, &rtp, 12, 0, address, addrlen); - } - janus_mutex_unlock(&ps->rtp_forwarders_mutex); + /* Done */ janus_refcount_decrease(&ps->ref); janus_refcount_decrease(&p->ref); JANUS_LOG(LOG_VERB, "Added %s/%d rtp_forward to participant %s host: %s:%d stream_id: %"SCNu32"\n", - is_data ? "data" : (is_video ? "video" : "audio"), substream, p->user_id_str, host, port, stream_id); - return forward; + is_data ? "data" : (is_video ? "video" : "audio"), substream, p->user_id_str, host, port, rf->stream_id); + return rf; } -static json_t *janus_videoroom_rtp_forwarder_summary(janus_videoroom_rtp_forwarder *f) { +static json_t *janus_videoroom_rtp_forwarder_summary(janus_rtp_forwarder *f) { if(f == NULL) return NULL; json_t *json = json_object(); @@ -2971,42 +2702,6 @@ static json_t *janus_videoroom_rtp_forwarder_summary(janus_videoroom_rtp_forward return json; } -static void janus_videoroom_rtp_forwarder_destroy(janus_videoroom_rtp_forwarder *forward) { - if(forward && g_atomic_int_compare_and_exchange(&forward->destroyed, 0, 1)) { - if(forward->rtcp_fd > -1 && forward->rtcp_recv != NULL) { - g_source_destroy(forward->rtcp_recv); - g_source_unref(forward->rtcp_recv); - } - janus_refcount_decrease(&forward->ref); - } -} -static void janus_videoroom_rtp_forwarder_free(const janus_refcount *f_ref) { - janus_videoroom_rtp_forwarder *forward = janus_refcount_containerof(f_ref, janus_videoroom_rtp_forwarder, ref); - if(forward->rtcp_fd > -1) - close(forward->rtcp_fd); - if(forward->is_srtp && forward->srtp_ctx) { - forward->srtp_ctx->count--; - if(forward->srtp_ctx->count == 0 && forward->srtp_ctx->contexts != NULL) - g_hash_table_remove(forward->srtp_ctx->contexts, forward->srtp_ctx->id); - } - g_free(forward->remote_id); - g_free(forward); - forward = NULL; -} - -static void janus_videoroom_srtp_context_free(gpointer data) { - if(data) { - janus_videoroom_srtp_context *srtp_ctx = (janus_videoroom_srtp_context *)data; - if(srtp_ctx) { - g_free(srtp_ctx->id); - srtp_dealloc(srtp_ctx->ctx); - g_free(srtp_ctx->policy.key); - g_free(srtp_ctx); - srtp_ctx = NULL; - } - } -} - /* Helper to create a dummy publisher, with placeholder streams for each supported codec */ static void janus_videoroom_create_dummy_publisher(janus_videoroom *room, GHashTable *streams) { if(room == NULL || !room->dummy_publisher) @@ -3045,7 +2740,6 @@ static void janus_videoroom_create_dummy_publisher(janus_videoroom *room, GHashT publisher->remote_recipients = g_hash_table_new_full(g_str_hash, g_str_equal, (GDestroyNotify)g_free, (GDestroyNotify)janus_videoroom_remote_recipient_free); publisher->rtp_forwarders = g_hash_table_new(NULL, NULL); - publisher->srtp_contexts = g_hash_table_new_full(g_str_hash, g_str_equal, NULL, (GDestroyNotify)janus_videoroom_srtp_context_free); publisher->udp_sock = -1; g_atomic_int_set(&publisher->destroyed, 0); janus_refcount_init(&publisher->ref, janus_videoroom_publisher_free); @@ -3093,7 +2787,7 @@ static void janus_videoroom_create_dummy_publisher(janus_videoroom *room, GHashT janus_mutex_init(&ps->subscribers_mutex); janus_mutex_init(&ps->rtp_forwarders_mutex); janus_mutex_init(&ps->rid_mutex); - ps->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_videoroom_rtp_forwarder_destroy); + ps->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_rtp_forwarder_destroy); publisher->streams = g_list_append(publisher->streams, ps); g_hash_table_insert(publisher->streams_byid, GINT_TO_POINTER(ps->mindex), ps); g_hash_table_insert(publisher->streams_bymid, g_strdup(ps->mid), ps); @@ -3140,7 +2834,7 @@ static void janus_videoroom_create_dummy_publisher(janus_videoroom *room, GHashT janus_mutex_init(&ps->subscribers_mutex); janus_mutex_init(&ps->rtp_forwarders_mutex); janus_mutex_init(&ps->rid_mutex); - ps->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_videoroom_rtp_forwarder_destroy); + ps->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_rtp_forwarder_destroy); publisher->streams = g_list_append(publisher->streams, ps); g_hash_table_insert(publisher->streams_byid, GINT_TO_POINTER(ps->mindex), ps); g_hash_table_insert(publisher->streams_bymid, g_strdup(ps->mid), ps); @@ -3887,18 +3581,6 @@ int janus_videoroom_init(janus_callbacks *callback, const char *config_path) { } janus_mutex_unlock(&rooms_mutex); - /* Thread for handling incoming RTCP packets from RTP forwarders, if any */ - rtcpfwd_ctx = g_main_context_new(); - rtcpfwd_loop = g_main_loop_new(rtcpfwd_ctx, FALSE); - GError *error = NULL; - rtcpfwd_thread = g_thread_try_new("videoroom rtcpfwd", janus_videoroom_rtp_forwarder_rtcp_thread, NULL, &error); - if(error != NULL) { - /* We show the error but it's not fatal */ - JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the VideoRoom RTCP thread for RTP forwarders...\n", - error->code, error->message ? error->message : "??"); - g_error_free(error); - } - /* Finally, let's check if IPv6 is disabled, as we may need to know for forwarders */ int fd = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP); if(fd < 0) { @@ -3917,7 +3599,7 @@ int janus_videoroom_init(janus_callbacks *callback, const char *config_path) { g_atomic_int_set(&initialized, 1); /* Launch the thread that will handle incoming messages */ - error = NULL; + GError *error = NULL; handler_thread = g_thread_try_new("videoroom handler", janus_videoroom_handler, NULL, &error); if(error != NULL) { g_atomic_int_set(&initialized, 0); @@ -3941,14 +3623,6 @@ void janus_videoroom_destroy(void) { g_thread_join(handler_thread); handler_thread = NULL; } - if(rtcpfwd_thread != NULL) { - if(g_main_loop_is_running(rtcpfwd_loop)) { - g_main_loop_quit(rtcpfwd_loop); - g_main_context_wakeup(rtcpfwd_ctx); - } - g_thread_join(rtcpfwd_thread); - rtcpfwd_thread = NULL; - } /* FIXME We should destroy the sessions cleanly */ janus_mutex_lock(&sessions_mutex); @@ -5656,7 +5330,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi JANUS_LOG(LOG_WARN, "No such stream with mid '%s', skipping forwarder...\n", mid); continue; } - janus_videoroom_rtp_forwarder *f = NULL; + janus_rtp_forwarder *f = NULL; json_t *stream_host = json_object_get(s, "host"); host = json_string_value(stream_host) ? json_string_value(stream_host) : json_string_value(json_host); json_t *stream_port = json_object_get(s, "port"); @@ -5873,7 +5547,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi video_port[2] = -1; } /* Create all the forwarders we need */ - janus_videoroom_rtp_forwarder *f = NULL; + janus_rtp_forwarder *f = NULL; guint32 audio_handle = 0; guint32 video_handle[3] = {0, 0, 0}; guint32 data_handle = 0; @@ -6153,9 +5827,9 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi while(temp) { janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)temp->data; janus_mutex_lock(&ps->rtp_forwarders_mutex); - janus_videoroom_rtp_forwarder *f = g_hash_table_lookup(ps->rtp_forwarders, GUINT_TO_POINTER(stream_id)); + janus_rtp_forwarder *f = g_hash_table_lookup(ps->rtp_forwarders, GUINT_TO_POINTER(stream_id)); if(f != NULL) { - if(f->remote_id != NULL) { + if(f->metadata != NULL) { /* This belongs to a remotization, ignore */ janus_mutex_unlock(&ps->rtp_forwarders_mutex); found = FALSE; @@ -6725,9 +6399,9 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi gpointer key_f, value_f; g_hash_table_iter_init(&iter_f, ps->rtp_forwarders); while(g_hash_table_iter_next(&iter_f, &key_f, &value_f)) { - janus_videoroom_rtp_forwarder *rpv = value_f; + janus_rtp_forwarder *rpv = value_f; /* If this belongs to a remotization, skip it */ - if(rpv->remote_id != NULL) + if(rpv->metadata != NULL) continue; /* Return a different, media-agnostic, format */ json_t *fl = janus_videoroom_rtp_forwarder_summary(rpv); @@ -6986,7 +6660,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi /* Add a new RTP forwarder for each of the publisher streams */ janus_mutex_lock(&publisher->streams_mutex); janus_videoroom_publisher_stream *ps = NULL; - janus_videoroom_rtp_forwarder *f = NULL; + janus_rtp_forwarder *f = NULL; gboolean rtcp_added = FALSE, add_rtcp = FALSE; GList *temp = publisher->streams; while(temp) { @@ -7002,7 +6676,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi (REMOTE_PUBLISHER_BASE_SSRC + ps->mindex*REMOTE_PUBLISHER_SSRC_STEP), FALSE, 0, NULL, 0, FALSE, FALSE); if(f != NULL) - f->remote_id = g_strdup(remote_id); + f->metadata = g_strdup(remote_id); } else if(ps->type == JANUS_VIDEOROOM_MEDIA_VIDEO) { /* Video stream */ add_rtcp = (!rtcp_added && rtcp_port > 0); @@ -7011,7 +6685,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi (REMOTE_PUBLISHER_BASE_SSRC + ps->mindex*REMOTE_PUBLISHER_SSRC_STEP), FALSE, 0, NULL, 0, TRUE, FALSE); if(f != NULL) - f->remote_id = g_strdup(remote_id); + f->metadata = g_strdup(remote_id); if(add_rtcp) rtcp_added = TRUE; /* Check if there's simulcast substreams we need to relay too */ @@ -7021,7 +6695,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi (REMOTE_PUBLISHER_BASE_SSRC + ps->mindex*REMOTE_PUBLISHER_SSRC_STEP + 1), FALSE, 0, NULL, 1, TRUE, FALSE); if(f != NULL) - f->remote_id = g_strdup(remote_id); + f->metadata = g_strdup(remote_id); } if(ps->vssrc[2] || ps->rid[2]) { f = janus_videoroom_rtp_forwarder_add_helper(publisher, ps, @@ -7029,7 +6703,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi (REMOTE_PUBLISHER_BASE_SSRC + ps->mindex*REMOTE_PUBLISHER_SSRC_STEP + 2), FALSE, 0, NULL, 2, TRUE, FALSE); if(f != NULL) - f->remote_id = g_strdup(remote_id); + f->metadata = g_strdup(remote_id); } } else { /* Data stream */ @@ -7038,7 +6712,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi (REMOTE_PUBLISHER_BASE_SSRC + ps->mindex*REMOTE_PUBLISHER_SSRC_STEP), FALSE, 0, NULL, 0, FALSE, TRUE); if(f != NULL) - f->remote_id = g_strdup(remote_id); + f->metadata = g_strdup(remote_id); } temp = temp->next; } @@ -7142,8 +6816,8 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi gpointer value; g_hash_table_iter_init(&iter, ps->rtp_forwarders); while(g_hash_table_iter_next(&iter, NULL, &value)) { - janus_videoroom_rtp_forwarder *f = (janus_videoroom_rtp_forwarder *)value; - if(f->remote_id != NULL && !strcmp(f->remote_id, remote_id)) { + janus_rtp_forwarder *f = (janus_rtp_forwarder *)value; + if(f->metadata != NULL && !strcmp((char *)f->metadata, remote_id)) { /* We found one, get rid of it */ uint32_t stream_id = f->stream_id; g_hash_table_iter_remove(&iter); @@ -7480,7 +7154,6 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi publisher->remote_recipients = g_hash_table_new_full(g_str_hash, g_str_equal, (GDestroyNotify)g_free, (GDestroyNotify)janus_videoroom_remote_recipient_free); publisher->rtp_forwarders = g_hash_table_new(NULL, NULL); - publisher->srtp_contexts = g_hash_table_new_full(g_str_hash, g_str_equal, NULL, (GDestroyNotify)janus_videoroom_srtp_context_free); publisher->udp_sock = -1; g_atomic_int_set(&publisher->destroyed, 0); janus_refcount_init(&publisher->ref, janus_videoroom_publisher_free); @@ -7587,7 +7260,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi janus_refcount_increase(&ps->ref); /* This is for the mid-indexed hashtable */ janus_mutex_init(&ps->subscribers_mutex); janus_mutex_init(&ps->rtp_forwarders_mutex); - ps->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_videoroom_rtp_forwarder_destroy); + ps->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_rtp_forwarder_destroy); publisher->streams = g_list_append(publisher->streams, ps); g_hash_table_insert(publisher->streams_byid, GINT_TO_POINTER(ps->mindex), ps); g_hash_table_insert(publisher->streams_bymid, g_strdup(ps->mid), ps); @@ -7863,7 +7536,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi janus_refcount_increase(&ps->ref); /* This is for the mid-indexed hashtable */ janus_mutex_init(&ps->subscribers_mutex); janus_mutex_init(&ps->rtp_forwarders_mutex); - ps->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_videoroom_rtp_forwarder_destroy); + ps->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_rtp_forwarder_destroy); publisher->streams = g_list_append(publisher->streams, ps); g_hash_table_insert(publisher->streams_byid, GINT_TO_POINTER(ps->mindex), ps); g_hash_table_insert(publisher->streams_bymid, g_strdup(ps->mid), ps); @@ -8303,85 +7976,15 @@ static void janus_videoroom_incoming_rtp_internal(janus_videoroom_session *sessi } /* Forward RTP to the appropriate port for the rtp_forwarders associated with this publisher, if there are any */ janus_mutex_lock(&ps->rtp_forwarders_mutex); - if(participant->srtp_contexts && g_hash_table_size(participant->srtp_contexts) > 0) { - GHashTableIter iter; - gpointer value; - g_hash_table_iter_init(&iter, participant->srtp_contexts); - while(g_hash_table_iter_next(&iter, NULL, &value)) { - janus_videoroom_srtp_context *srtp_ctx = (janus_videoroom_srtp_context *)value; - srtp_ctx->slen = 0; - } - } GHashTableIter iter; gpointer value; g_hash_table_iter_init(&iter, ps->rtp_forwarders); while(participant->udp_sock > 0 && g_hash_table_iter_next(&iter, NULL, &value)) { - janus_videoroom_rtp_forwarder *rtp_forward = (janus_videoroom_rtp_forwarder *)value; + janus_rtp_forwarder *rtp_forward = (janus_rtp_forwarder *)value; if(rtp_forward->is_data || (video && !rtp_forward->is_video) || (!video && rtp_forward->is_video)) continue; - /* Backup the RTP header info, as we may rewrite part of it */ - uint32_t seq_number = ntohs(rtp->seq_number); - uint32_t timestamp = ntohl(rtp->timestamp); - int pt = rtp->type; - uint32_t ssrc = ntohl(rtp->ssrc); - /* First of all, check if we're simulcasting and if we need to forward or ignore this frame */ - if(video && !rtp_forward->simulcast && rtp_forward->substream != sc) { - continue; - } else if(video && rtp_forward->simulcast) { - /* This is video and we're simulcasting, check if we need to forward this frame */ - if(!janus_rtp_simulcasting_context_process_rtp(&rtp_forward->sim_context, - buf, len, ps->vssrc, ps->rid, ps->vcodec, &rtp_forward->context, &ps->rid_mutex)) - continue; - janus_rtp_header_update(rtp, &rtp_forward->context, TRUE, 0); - /* By default we use a fixed SSRC (it may be overwritten later) */ - rtp->ssrc = htonl(participant->user_id & 0xffffffff); - } - /* Check if payload type and/or SSRC need to be overwritten for this forwarder */ - if(rtp_forward->payload_type > 0) - rtp->type = rtp_forward->payload_type; - if(rtp_forward->ssrc > 0) - rtp->ssrc = htonl(rtp_forward->ssrc); - /* Check if this is an RTP or SRTP forwarder */ - if(!rtp_forward->is_srtp) { - /* Plain RTP */ - struct sockaddr *address = (rtp_forward->serv_addr.sin_family == AF_INET ? - (struct sockaddr *)&rtp_forward->serv_addr : (struct sockaddr *)&rtp_forward->serv_addr6); - size_t addrlen = (rtp_forward->serv_addr.sin_family == AF_INET ? sizeof(rtp_forward->serv_addr) : sizeof(rtp_forward->serv_addr6)); - if(sendto(participant->udp_sock, buf, len, 0, address, addrlen) < 0) { - JANUS_LOG(LOG_HUGE, "Error forwarding RTP %s packet for %s... %s (len=%d)...\n", - (video ? "video" : "audio"), participant->display, g_strerror(errno), len); - } - } else { - /* SRTP: check if we already encrypted the packet before */ - if(rtp_forward->srtp_ctx->slen == 0) { - memcpy(&rtp_forward->srtp_ctx->sbuf, buf, len); - int protected = len; - int res = srtp_protect(rtp_forward->srtp_ctx->ctx, &rtp_forward->srtp_ctx->sbuf, &protected); - if(res != srtp_err_status_ok) { - janus_rtp_header *header = (janus_rtp_header *)&rtp_forward->srtp_ctx->sbuf; - guint32 timestamp = ntohl(header->timestamp); - guint16 seq = ntohs(header->seq_number); - JANUS_LOG(LOG_ERR, "Error encrypting %s packet for %s... %s (len=%d-->%d, ts=%"SCNu32", seq=%"SCNu16")...\n", - (video ? "Video" : "Audio"), participant->display, janus_srtp_error_str(res), len, protected, timestamp, seq); - } else { - rtp_forward->srtp_ctx->slen = protected; - } - } - if(rtp_forward->srtp_ctx->slen > 0) { - struct sockaddr *address = (rtp_forward->serv_addr.sin_family == AF_INET ? - (struct sockaddr *)&rtp_forward->serv_addr : (struct sockaddr *)&rtp_forward->serv_addr6); - size_t addrlen = (rtp_forward->serv_addr.sin_family == AF_INET ? sizeof(rtp_forward->serv_addr) : sizeof(rtp_forward->serv_addr6)); - if(sendto(participant->udp_sock, rtp_forward->srtp_ctx->sbuf, rtp_forward->srtp_ctx->slen, 0, address, addrlen) < 0) { - JANUS_LOG(LOG_HUGE, "Error forwarding SRTP %s packet for %s... %s (len=%d)...\n", - (video ? "video" : "audio"), participant->display, g_strerror(errno), rtp_forward->srtp_ctx->slen); - } - } - } - /* Restore original values of payload type and SSRC before going on */ - rtp->type = pt; - rtp->ssrc = htonl(ssrc); - rtp->timestamp = htonl(timestamp); - rtp->seq_number = htons(seq_number); + janus_rtp_forwarder_send_rtp_full(rtp_forward, buf, len, sc, + ps->vssrc, ps->rid, ps->vcodec, &ps->rid_mutex); } janus_mutex_unlock(&ps->rtp_forwarders_mutex); /* Set the payload type of the publisher */ @@ -8608,13 +8211,13 @@ static void janus_videoroom_incoming_data_internal(janus_videoroom_session *sess gpointer value; g_hash_table_iter_init(&iter, ps->rtp_forwarders); while(participant->udp_sock > 0 && g_hash_table_iter_next(&iter, NULL, &value)) { - janus_videoroom_rtp_forwarder *rtp_forward = (janus_videoroom_rtp_forwarder *)value; + janus_rtp_forwarder *rtp_forward = (janus_rtp_forwarder *)value; if(rtp_forward->is_data) { struct sockaddr *address = (rtp_forward->serv_addr.sin_family == AF_INET ? (struct sockaddr *)&rtp_forward->serv_addr : (struct sockaddr *)&rtp_forward->serv_addr6); size_t addrlen = (rtp_forward->serv_addr.sin_family == AF_INET ? sizeof(rtp_forward->serv_addr) : sizeof(rtp_forward->serv_addr6)); /* Check if this is a regular RTP forwarder, or a publisher remotization */ - if(rtp_forward->remote_id == NULL) { + if(rtp_forward->metadata == NULL) { /* Regular forwarder, send the payload as it is */ if(sendto(participant->udp_sock, buf, len, 0, address, addrlen) < 0) { JANUS_LOG(LOG_HUGE, "Error forwarding data packet for %s... %s (len=%d)...\n", @@ -9262,7 +8865,6 @@ static void *janus_videoroom_handler(void *data) { publisher->remote_recipients = g_hash_table_new_full(g_str_hash, g_str_equal, (GDestroyNotify)g_free, (GDestroyNotify)janus_videoroom_remote_recipient_free); publisher->rtp_forwarders = g_hash_table_new(NULL, NULL); - publisher->srtp_contexts = g_hash_table_new_full(g_str_hash, g_str_equal, NULL, (GDestroyNotify)janus_videoroom_srtp_context_free); publisher->udp_sock = -1; /* Finally, generate a private ID: this is only needed in case the participant * wants to allow the plugin to know which subscriptions belong to them */ @@ -11953,7 +11555,7 @@ static void *janus_videoroom_handler(void *data) { janus_mutex_init(&ps->subscribers_mutex); janus_mutex_init(&ps->rtp_forwarders_mutex); janus_mutex_init(&ps->rid_mutex); - ps->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_videoroom_rtp_forwarder_destroy); + ps->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_rtp_forwarder_destroy); } if(m->type == JANUS_SDP_AUDIO || m->type == JANUS_SDP_VIDEO) { /* Are the extmaps we care about there? */ @@ -12251,7 +11853,7 @@ static void *janus_videoroom_handler(void *data) { g_hash_table_iter_init(&iter, participant->remote_recipients); while(g_hash_table_iter_next(&iter, NULL, &value)) { janus_videoroom_remote_recipient *r = (janus_videoroom_remote_recipient *)value; - janus_videoroom_rtp_forwarder *f = NULL; + janus_rtp_forwarder *f = NULL; if(r) { if(ps->type == JANUS_VIDEOROOM_MEDIA_AUDIO) { /* Audio stream */ @@ -12260,7 +11862,7 @@ static void *janus_videoroom_handler(void *data) { (REMOTE_PUBLISHER_BASE_SSRC + ps->mindex*REMOTE_PUBLISHER_SSRC_STEP), FALSE, 0, NULL, 0, FALSE, FALSE); if(f != NULL) - f->remote_id = g_strdup(r->remote_id); + f->metadata = g_strdup(r->remote_id); } else if(ps->type == JANUS_VIDEOROOM_MEDIA_VIDEO) { /* Video stream */ gboolean add_rtcp = (!r->rtcp_added && r->rtcp_port > 0); @@ -12269,7 +11871,7 @@ static void *janus_videoroom_handler(void *data) { (REMOTE_PUBLISHER_BASE_SSRC + ps->mindex*REMOTE_PUBLISHER_SSRC_STEP), FALSE, 0, NULL, 0, TRUE, FALSE); if(f != NULL) - f->remote_id = g_strdup(r->remote_id); + f->metadata = g_strdup(r->remote_id); if(add_rtcp) r->rtcp_added = TRUE; /* Check if there's simulcast substreams we need to relay too */ @@ -12279,7 +11881,7 @@ static void *janus_videoroom_handler(void *data) { (REMOTE_PUBLISHER_BASE_SSRC + ps->mindex*REMOTE_PUBLISHER_SSRC_STEP + 1), FALSE, 0, NULL, 1, TRUE, FALSE); if(f != NULL) - f->remote_id = g_strdup(r->remote_id); + f->metadata = g_strdup(r->remote_id); } if(ps->vssrc[2] || ps->rid[2]) { f = janus_videoroom_rtp_forwarder_add_helper(participant, ps, @@ -12287,7 +11889,7 @@ static void *janus_videoroom_handler(void *data) { (REMOTE_PUBLISHER_BASE_SSRC + ps->mindex*REMOTE_PUBLISHER_SSRC_STEP + 2), FALSE, 0, NULL, 2, TRUE, FALSE); if(f != NULL) - f->remote_id = g_strdup(r->remote_id); + f->metadata = g_strdup(r->remote_id); } } else { /* Data stream */ @@ -12636,20 +12238,16 @@ static void janus_videoroom_relay_data_packet(gpointer data, gpointer user_data) } /* The following methods are only relevant if RTCP is used for RTP forwarders */ -static void janus_videoroom_rtp_forwarder_rtcp_receive(janus_videoroom_rtp_forwarder *forward) { - char buffer[1500]; - struct sockaddr_storage remote_addr; - socklen_t addrlen = sizeof(remote_addr); - int len = recvfrom(forward->rtcp_fd, buffer, sizeof(buffer), 0, (struct sockaddr *)&remote_addr, &addrlen); +static void janus_videoroom_rtp_forwarder_rtcp_receive(janus_rtp_forwarder *rf, char *buffer, int len) { if(len > 0 && janus_is_rtcp(buffer, len)) { - JANUS_LOG(LOG_HUGE, "Got %s RTCP packet: %d bytes\n", forward->is_video ? "video" : "audio", len); + JANUS_LOG(LOG_HUGE, "Got %s RTCP packet: %d bytes\n", rf->is_video ? "video" : "audio", len); /* We only handle incoming video PLIs or FIR at the moment */ if(!janus_rtcp_has_fir(buffer, len) && !janus_rtcp_has_pli(buffer, len)) return; /* Check if this is a regular RTP forwarder, or a publisher remotization */ - if(forward->remote_id == NULL) { + if(rf->metadata == NULL) { /* Regular forwarder, send the PLI to the stream associated with it */ - janus_videoroom_reqpli((janus_videoroom_publisher_stream *)forward->source, "RTCP from forwarder"); + janus_videoroom_reqpli((janus_videoroom_publisher_stream *)rf->source, "RTCP from forwarder"); } else { /* Remotization, check the SSRC in the request so that we know * which publisher video stream we should send the PLI to */ @@ -12688,8 +12286,8 @@ static void janus_videoroom_rtp_forwarder_rtcp_receive(janus_videoroom_rtp_forwa } if(ssrc > 0) { /* Look for the right publisher stream instance */ - char *remote_id = forward->remote_id; - janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)forward->source; + char *remote_id = (char *)rf->metadata; + janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)rf->source; if(ps == NULL) return; janus_videoroom_publisher *p = ps->publisher; @@ -12714,9 +12312,9 @@ static void janus_videoroom_rtp_forwarder_rtcp_receive(janus_videoroom_rtp_forwa gpointer key_f, value_f; g_hash_table_iter_init(&iter_f, ps->rtp_forwarders); while(g_hash_table_iter_next(&iter_f, &key_f, &value_f)) { - janus_videoroom_rtp_forwarder *rpv = value_f; + janus_rtp_forwarder *rpv = value_f; /* We only care about video forwarders used for the same remotization */ - if(!rpv->is_video || rpv->remote_id == NULL || strcasecmp(rpv->remote_id, remote_id)) + if(!rpv->is_video || rpv->metadata == NULL || strcasecmp((char *)rpv->metadata, remote_id)) continue; /* Check the SSRC */ if(rpv->ssrc == ssrc) { @@ -12735,15 +12333,6 @@ static void janus_videoroom_rtp_forwarder_rtcp_receive(janus_videoroom_rtp_forwa } } -static void *janus_videoroom_rtp_forwarder_rtcp_thread(void *data) { - JANUS_LOG(LOG_VERB, "Joining RTCP thread for RTP forwarders...\n"); - /* Run the main loop */ - g_main_loop_run(rtcpfwd_loop); - /* When the loop ends, we're done */ - JANUS_LOG(LOG_VERB, "Leaving RTCP thread for RTP forwarders...\n"); - return NULL; -} - /* Helpers to create a listener filedescriptor */ static int janus_videoroom_create_fd(int port, in_addr_t mcast, const janus_network_address *iface, char *host, size_t hostlen) { janus_mutex_lock(&fd_mutex); diff --git a/src/rtpfwd.c b/src/rtpfwd.c new file mode 100644 index 0000000000..b5819e174a --- /dev/null +++ b/src/rtpfwd.c @@ -0,0 +1,444 @@ +/*! \file rtpfwd.c + * \author Lorenzo Miniero + * \copyright GNU General Public License v3 + * \brief RTP forwarders + * \details Implementation of the so called RTP forwarders, that is an + * helper mechanism that core and/or plugins can make use of to quickly + * and simply forward RTP streams to a separate UDP address out of the + * context of any signalling. Such a mechanism can be used, for instance, + * for scalabiloty purposes, monitoring, or feeding external applications + * with media traffic handled by Janus.. + * + * \ingroup protocols + * \ref protocols + */ + +#include "rtpfwd.h" +#include "rtcp.h" +#include "utils.h" + +/* Local resources */ +static janus_mutex rtpfwds_mutex; +static GHashTable *rtpfwds = NULL; +static gboolean ipv6_disabled = FALSE; +/* RTCP stuff */ +static GMainContext *rtcpfwd_ctx = NULL; +static GMainLoop *rtcpfwd_loop = NULL; +static GThread *rtcpfwd_thread = NULL; +static void *janus_rtp_forwarder_rtcp_thread(void *data) { + JANUS_LOG(LOG_VERB, "Joining RTCP thread for RTP forwarders...\n"); + /* Run the main loop */ + g_main_loop_run(rtcpfwd_loop); + /* When the loop ends, we're done */ + JANUS_LOG(LOG_VERB, "Leaving RTCP thread for RTP forwarders...\n"); + return NULL; +} + +/* Static helper to quickly unref an RTP forwarder instance */ +static void janus_rtp_forwarder_unref(janus_rtp_forwarder *rf); +/* Static helper to free an RTP forwarder instance when the reference goes to 0 */ +static void janus_rtp_forwarder_free(const janus_refcount *f_ref); + +/* \brief RTP forwarders code initialization + * @returns 0 in case of success, a negative integer on errors */ +int janus_rtp_forwarders_init(void) { + /* Initialize the forwarders table and muted */ + rtpfwds = g_hash_table_new_full(g_str_hash, g_str_equal, + (GDestroyNotify)g_free, (GDestroyNotify)janus_rtp_forwarder_unref); + janus_mutex_init(&rtpfwds_mutex); + /* Let's check if IPv6 is disabled, as we may need to know for forwarders */ + int fd = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP); + if(fd < 0) { + ipv6_disabled = TRUE; + } else { + int v6only = 0; + if(setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &v6only, sizeof(v6only)) != 0) + ipv6_disabled = TRUE; + } + if(fd >= 0) + close(fd); + if(ipv6_disabled) { + JANUS_LOG(LOG_WARN, "IPv6 disabled, will only create RTP forwarders to IPv4 addresses\n"); + } + /* Spawn the thread for handling incoming RTCP packets from RTP forwarders, if any */ + rtcpfwd_ctx = g_main_context_new(); + rtcpfwd_loop = g_main_loop_new(rtcpfwd_ctx, FALSE); + GError *error = NULL; + rtcpfwd_thread = g_thread_try_new("rtcpfwd", janus_rtp_forwarder_rtcp_thread, NULL, &error); + if(error != NULL) { + /* We show the error but it's not fatal */ + JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the RTCP thread for RTP forwarders...\n", + error->code, error->message ? error->message : "??"); + g_error_free(error); + return -1; + } + /* Donw */ + return 0; +} + +/* \brief RTP forwarders code de-initialization */ +void janus_rtp_forwarders_deinit(void) { + /* Stop the RTCP receiver thread */ + if(rtcpfwd_thread != NULL) { + if(g_main_loop_is_running(rtcpfwd_loop)) { + g_main_loop_quit(rtcpfwd_loop); + g_main_context_wakeup(rtcpfwd_ctx); + } + g_thread_join(rtcpfwd_thread); + rtcpfwd_thread = NULL; + } + /* Get rid of the table */ + janus_mutex_lock(&rtpfwds_mutex); + g_hash_table_destroy(rtpfwds); + rtpfwds = NULL; + janus_mutex_unlock(&rtpfwds_mutex); +} + +/* RTCP support in RTP forwarders */ +typedef struct janus_rtcp_receiver { + GSource parent; + janus_rtp_forwarder *rf; + GDestroyNotify destroy; +} janus_rtcp_receiver; +static void janus_rtp_forwarder_rtcp_receive(janus_rtp_forwarder *rf) { + char buffer[1500]; + struct sockaddr_storage remote_addr; + socklen_t addrlen = sizeof(remote_addr); + int len = recvfrom(rf->rtcp_fd, buffer, sizeof(buffer), 0, (struct sockaddr *)&remote_addr, &addrlen); + if(len > 0 && janus_is_rtcp(buffer, len)) { + JANUS_LOG(LOG_HUGE, "Got %s RTCP packet: %d bytes\n", rf->is_video ? "video" : "audio", len); + /* Invoke the callback function for RTCP feedback, if any */ + if(rf->rtcp_callback) + rf->rtcp_callback(rf, buffer, len); + } +} +static gboolean janus_rtp_forwarder_rtcp_prepare(GSource *source, gint *timeout) { + *timeout = -1; + return FALSE; +} +static gboolean janus_rtp_forwarder_rtcp_dispatch(GSource *source, GSourceFunc callback, gpointer user_data) { + janus_rtcp_receiver *rr = (janus_rtcp_receiver *)source; + /* Receive the packet */ + if(rr) + janus_rtp_forwarder_rtcp_receive(rr->rf); + return G_SOURCE_CONTINUE; +} +static void janus_rtp_forwarder_rtcp_finalize(GSource *source) { + janus_rtcp_receiver *rr = (janus_rtcp_receiver *)source; + /* Remove the reference to the forwarder */ + if(rr && rr->rf) { + if(rr->rf->source) { + //~ janus_publisher_stream_dereference_void(r->forward->source); + rr->rf->source = NULL; + } + janus_rtp_forwarder_unref(rr->rf); + } +} +static GSourceFuncs janus_rtp_forwarder_rtcp_funcs = { + janus_rtp_forwarder_rtcp_prepare, + NULL, + janus_rtp_forwarder_rtcp_dispatch, + janus_rtp_forwarder_rtcp_finalize, + NULL, NULL +}; + +/* Create a new forwarder */ +janus_rtp_forwarder *janus_rtp_forwarder_create(const char *ctx, + uint32_t stream_id, int udp_fd, const char *host, int port, + uint32_t ssrc, int pt, int srtp_suite, const char *srtp_crypto, + gboolean simulcast, int substream, gboolean is_video, gboolean is_data) { + janus_mutex_lock(&rtpfwds_mutex); + if(ctx == NULL) + ctx = "default"; + char id[1024]; + if(stream_id > 0) { + /* Make sure the provided ID isn't already in use */ + g_snprintf(id, sizeof(id), "%s-%"SCNu32, ctx, stream_id); + if(g_hash_table_lookup(rtpfwds, id) != NULL) { + janus_mutex_unlock(&rtpfwds_mutex); + JANUS_LOG(LOG_ERR, "RTP forwarder with ID %"SCNu32" already exists in context '%s'\n", + stream_id, ctx); + return NULL; + } + } else { + /* Autogenerate an ID within the provided context */ + stream_id = janus_random_uint32(); + g_snprintf(id, sizeof(id), "%s-%"SCNu32, ctx, stream_id); + while(g_hash_table_lookup(rtpfwds, id)) { + stream_id = janus_random_uint32(); + g_snprintf(id, sizeof(id), "%s-%"SCNu32, ctx, stream_id); + } + } + janus_rtp_forwarder *rf = g_malloc0(sizeof(janus_rtp_forwarder)); + rf->udp_fd = udp_fd; /* FIXME Should we create one ourselves, if not provided? */ + /* RTCP may be added later */ + rf->rtcp_fd = -1; + rf->local_rtcp_port = 0; + rf->remote_rtcp_port = 0; + /* First of all, let's check if we need to setup an SRTP forwarder */ + if(!is_data && srtp_suite > 0 && srtp_crypto != NULL) { + /* Base64 decode the crypto string and set it as the SRTP context */ + gsize len = 0; + guchar *decoded = g_base64_decode(srtp_crypto, &len); + if(len < SRTP_MASTER_LENGTH) { + janus_mutex_unlock(&rtpfwds_mutex); + JANUS_LOG(LOG_ERR, "Invalid SRTP crypto (%s)\n", srtp_crypto); + g_free(decoded); + g_free(rf); + return NULL; + } + /* Set SRTP policy */ + srtp_policy_t *policy = &rf->srtp_policy; + srtp_crypto_policy_set_rtp_default(&(policy->rtp)); + if(srtp_suite == 32) { + srtp_crypto_policy_set_aes_cm_128_hmac_sha1_32(&(policy->rtp)); + } else if(srtp_suite == 80) { + srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&(policy->rtp)); + } + policy->ssrc.type = ssrc_any_outbound; + policy->key = decoded; + policy->next = NULL; + /* Create SRTP context */ + srtp_err_status_t res = srtp_create(&rf->srtp_ctx, policy); + if(res != srtp_err_status_ok) { + /* Something went wrong... */ + janus_mutex_unlock(&rtpfwds_mutex); + JANUS_LOG(LOG_ERR, "Error creating forwarder SRTP session: %d (%s)\n", res, janus_srtp_error_str(res)); + g_free(decoded); + policy->key = NULL; + g_free(rf); + return NULL; + } + rf->is_srtp = TRUE; + } + rf->is_video = is_video; + rf->payload_type = pt; + rf->ssrc = ssrc; + rf->substream = substream; + rf->is_data = is_data; + /* Check if the host address is IPv4 or IPv6 */ + if(strstr(host, ":") != NULL) { + rf->serv_addr6.sin6_family = AF_INET6; + inet_pton(AF_INET6, host, &(rf->serv_addr6.sin6_addr)); + rf->serv_addr6.sin6_port = htons(port); + } else { + rf->serv_addr.sin_family = AF_INET; + inet_pton(AF_INET, host, &(rf->serv_addr.sin_addr)); + rf->serv_addr.sin_port = htons(port); + } + if(is_video && simulcast) { + rf->simulcast = TRUE; + janus_rtp_switching_context_reset(&rf->rtp_context); + janus_rtp_simulcasting_context_reset(&rf->sim_context); + rf->sim_context.substream_target = 2; + rf->sim_context.templayer_target = 2; + } + janus_refcount_init(&rf->ref, janus_rtp_forwarder_free); + rf->context = g_strdup(ctx); + rf->stream_id = stream_id; + janus_refcount_increase(&rf->ref); + g_hash_table_insert(rtpfwds, g_strdup(id), rf); + janus_mutex_unlock(&rtpfwds_mutex); + /* Done */ + return rf; +} + +/* Add RTCP support to an existing RTP forwarder */ +int janus_rtp_forwarder_add_rtcp(janus_rtp_forwarder *rf, int rtcp_port, + void (*rtcp_callback)(janus_rtp_forwarder *rf, char *buffer, int len)) { + if(rf == NULL || g_atomic_int_get(&rf->destroyed) || rf->rtcp_fd > 0 || rtcp_port < 1 || rf->is_data) + return -1; + /* Bind to a port for RTCP */ + uint16_t local_rtcp_port = 0; + int fd = socket(!ipv6_disabled ? AF_INET6 : AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if(fd < 0) { + JANUS_LOG(LOG_ERR, "Error creating RTCP socket for new RTP forwarder... %d (%s)\n", + errno, g_strerror(errno)); + return -4; + } + struct sockaddr *address = NULL; + struct sockaddr_in addr4 = { 0 }; + struct sockaddr_in6 addr6 = { 0 }; + socklen_t len = 0; + if(!ipv6_disabled) { + /* Configure the socket so that it can be used both on IPv4 and IPv6 */ + int v6only = 0; + if(setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &v6only, sizeof(v6only)) != 0) { + JANUS_LOG(LOG_ERR, "Error configuring RTCP socket for new RTP forwarder... %d (%s)\n", + errno, g_strerror(errno)); + close(fd); + return -5; + } + len = sizeof(addr6); + addr6.sin6_family = AF_INET6; + addr6.sin6_port = htons(0); /* The RTCP port we received is the remote one */ + addr6.sin6_addr = in6addr_any; + address = (struct sockaddr *)&addr6; + } else { + /* IPv6 is disabled, only do IPv4 */ + len = sizeof(addr4); + addr4.sin_family = AF_INET; + addr4.sin_port = htons(0); /* The RTCP port we received is the remote one */ + addr4.sin_addr.s_addr = INADDR_ANY; + address = (struct sockaddr *)&addr4; + } + if(bind(fd, (struct sockaddr *)address, len) < 0 || + getsockname(fd, (struct sockaddr *)address, &len) < 0) { + JANUS_LOG(LOG_ERR, "Error binding RTCP socket for new RTP forwarder... %d (%s)\n", + errno, g_strerror(errno)); + close(fd); + return -6; + } + local_rtcp_port = ntohs(!ipv6_disabled ? addr6.sin6_port : addr4.sin_port); + JANUS_LOG(LOG_HUGE, "Bound RTP forwarder's local %s RTCP port: %"SCNu16"\n", + rf->is_video ? "video" : "audio", local_rtcp_port); + /* Update the forwarder, and create a source for the loop */ + rf->rtcp_fd = fd; + rf->remote_rtcp_port = rtcp_port; + rf->local_rtcp_port = local_rtcp_port; + rf->rtcp_callback = rtcp_callback; + rf->rtcp_recv = g_source_new(&janus_rtp_forwarder_rtcp_funcs, sizeof(janus_rtcp_receiver)); + janus_rtcp_receiver *rr = (janus_rtcp_receiver *)rf->rtcp_recv; + janus_refcount_increase(&rf->ref); + rr->rf = rf; + g_source_set_priority(rf->rtcp_recv, G_PRIORITY_DEFAULT); + g_source_add_unix_fd(rf->rtcp_recv, fd, G_IO_IN | G_IO_ERR); + g_source_attach((GSource *)rf->rtcp_recv, rtcpfwd_ctx); + /* Send a couple of empty RTP packets to the remote port to do latching */ + JANUS_LOG(LOG_HUGE, "Latching to remote %s RTCP port: %"SCNu16"\n", + rf->is_video ? "video" : "audio", local_rtcp_port); + socklen_t addrlen = 0; + if(rf->serv_addr.sin_family == AF_INET) { + addr4.sin_family = AF_INET; + addr4.sin_addr.s_addr = rf->serv_addr.sin_addr.s_addr; + addr4.sin_port = htons(rf->remote_rtcp_port); + address = (struct sockaddr *)&addr4; + addrlen = sizeof(addr4); + } else { + addr6.sin6_family = AF_INET6; + memcpy(&addr6.sin6_addr, &rf->serv_addr6.sin6_addr, sizeof(struct in6_addr)); + addr6.sin6_port = htons(rf->remote_rtcp_port); + address = (struct sockaddr *)&addr6; + addrlen = sizeof(addr6); + } + janus_rtp_header rtp = { 0 }; + rtp.version = 2; + (void)sendto(fd, &rtp, 12, 0, address, addrlen); + (void)sendto(fd, &rtp, 12, 0, address, addrlen); + /* Done */ + return 0; +} + +/* Simplified frontend to the forwarder function */ +void janus_rtp_forwarder_send_rtp(janus_rtp_forwarder *rf, char *buffer, int len, int substream) { + janus_rtp_forwarder_send_rtp_full(rf, buffer, len, substream, NULL, NULL, JANUS_VIDEOCODEC_NONE, NULL); +} + +/* Helper function to forward an RTP packet within the context of a forwarder */ +void janus_rtp_forwarder_send_rtp_full(janus_rtp_forwarder *rf, char *buffer, int len, int substream, + uint32_t *ssrcs, char **rids, janus_videocodec vcodec, janus_mutex *rid_mutex) { + if(!rf || g_atomic_int_get(&rf->destroyed) || !buffer || !janus_is_rtp(buffer, len)) + return; + /* Access the RTP header */ + janus_rtp_header *rtp = (janus_rtp_header *)buffer; + /* Backup the RTP header info, as we may rewrite part of it */ + uint32_t seq_number = ntohs(rtp->seq_number); + uint32_t timestamp = ntohl(rtp->timestamp); + int pt = rtp->type; + uint32_t ssrc = ntohl(rtp->ssrc); + /* First of all, check if we're simulcasting and if we need to forward or ignore this frame */ + if(rf->is_video && !rf->simulcast && rf->substream != substream) { + /* We're being asked to forward a specific substream, and it's not it */ + return; + } + if(rf->is_video && rf->simulcast) { + /* This is video and we're simulcasting, check if we need to forward this frame */ + if(!janus_rtp_simulcasting_context_process_rtp(&rf->sim_context, + buffer, len, ssrcs, rids, vcodec, &rf->rtp_context, rid_mutex)) { + /* There was an error processing simulcasting for this packet */ + return; + } + janus_rtp_header_update(rtp, &rf->rtp_context, TRUE, 0); + /* By default we use a fixed SSRC (it may be overwritten later) */ + rtp->ssrc = htonl(rf->stream_id); + } + /* Check if payload type and/or SSRC need to be overwritten for this forwarder */ + if(rf->payload_type > 0) + rtp->type = rf->payload_type; + if(rf->ssrc > 0) + rtp->ssrc = htonl(rf->ssrc); + /* Check if this is an RTP or SRTP forwarder */ + if(!rf->is_srtp) { + /* Plain RTP */ + struct sockaddr *address = (rf->serv_addr.sin_family == AF_INET ? + (struct sockaddr *)&rf->serv_addr : (struct sockaddr *)&rf->serv_addr6); + size_t addrlen = (rf->serv_addr.sin_family == AF_INET ? sizeof(rf->serv_addr) : sizeof(rf->serv_addr6)); + if(sendto(rf->udp_fd, buffer, len, 0, address, addrlen) < 0) { + JANUS_LOG(LOG_HUGE, "Error forwarding RTP %s packet... %s (len=%d)...\n", + (rf->is_video ? "video" : "audio"), g_strerror(errno), len); + } + } else { + /* SRTP: encrypt the packet before sending it */ + char sbuf[1500]; + memcpy(sbuf, buffer, len); + int protected = len; + int res = srtp_protect(rf->srtp_ctx, sbuf, &protected); + if(res != srtp_err_status_ok) { + janus_rtp_header *header = (janus_rtp_header *)sbuf; + guint32 timestamp = ntohl(header->timestamp); + guint16 seq = ntohs(header->seq_number); + JANUS_LOG(LOG_ERR, "Error encrypting %s packet... %s (len=%d-->%d, ts=%"SCNu32", seq=%"SCNu16")...\n", + (rf->is_video ? "Video" : "Audio"), janus_srtp_error_str(res), len, protected, timestamp, seq); + } else { + struct sockaddr *address = (rf->serv_addr.sin_family == AF_INET ? + (struct sockaddr *)&rf->serv_addr : (struct sockaddr *)&rf->serv_addr6); + size_t addrlen = (rf->serv_addr.sin_family == AF_INET ? sizeof(rf->serv_addr) : sizeof(rf->serv_addr6)); + if(sendto(rf->udp_fd, sbuf, protected, 0, address, addrlen) < 0) { + JANUS_LOG(LOG_HUGE, "Error forwarding SRTP %s packet... %s (len=%d)...\n", + (rf->is_video ? "video" : "audio"), g_strerror(errno), protected); + } + } + } + /* Restore original values of the RTP payload before returning */ + rtp->type = pt; + rtp->ssrc = htonl(ssrc); + rtp->timestamp = htonl(timestamp); + rtp->seq_number = htons(seq_number); +} + +/* Mark an RTP forwarder instance as destroyed */ +void janus_rtp_forwarder_destroy(janus_rtp_forwarder *rf) { + if(rf && g_atomic_int_compare_and_exchange(&rf->destroyed, 0, 1)) { + if(rf->rtcp_fd > -1 && rf->rtcp_recv != NULL) { + g_source_destroy(rf->rtcp_recv); + g_source_unref(rf->rtcp_recv); + } + char id[1024]; + g_snprintf(id, sizeof(id), "%s-%"SCNu32, rf->context, rf->stream_id); + janus_mutex_lock(&rtpfwds_mutex); + if(rtpfwds != NULL) + g_hash_table_remove(rtpfwds, id); + janus_mutex_unlock(&rtpfwds_mutex); + janus_refcount_decrease(&rf->ref); + } +} + +/* Static helper to quickly unref an RTP forwarder instance */ +static void janus_rtp_forwarder_unref(janus_rtp_forwarder *rf) { + if(rf) + janus_refcount_decrease(&rf->ref); +} + +/* Static helper to free an RTP forwarder instance when the reference goes to 0 */ +static void janus_rtp_forwarder_free(const janus_refcount *f_ref) { + janus_rtp_forwarder *rf = janus_refcount_containerof(f_ref, janus_rtp_forwarder, ref); + if(rf->rtcp_fd > -1) + close(rf->rtcp_fd); + if(rf->is_srtp) { + srtp_dealloc(rf->srtp_ctx); + g_free(rf->srtp_policy.key); + } + g_free(rf->context); + g_free(rf->metadata); + g_free(rf); +} diff --git a/src/rtpfwd.h b/src/rtpfwd.h new file mode 100644 index 0000000000..32752bca5b --- /dev/null +++ b/src/rtpfwd.h @@ -0,0 +1,139 @@ +/*! \file rtpfwd.h + * \author Lorenzo Miniero + * \copyright GNU General Public License v3 + * \brief RTP forwarders (headers) + * \details Implementation of the so called RTP forwarders, that is an + * helper mechanism that core and/or plugins can make use of to quickly + * and simply forward RTP streams to a separate UDP address out of the + * context of any signalling. Such a mechanism can be used, for instance, + * for scalabiloty purposes, monitoring, or feeding external applications + * with media traffic handled by Janus.. + * + * \ingroup protocols + * \ref protocols + */ + +#ifndef JANUS_RTPFWD_H +#define JANUS_RTPFWD_H + +#include "rtp.h" +#include "rtpsrtp.h" + + +/* \brief RTP forwarders code initialization + * @returns 0 in case of success, a negative integer on errors */ +int janus_rtp_forwarders_init(void); +/* \brief RTP forwarders code de-initialization */ +void janus_rtp_forwarders_deinit(void); + +/*! \brief Helper struct for implementing RTP forwarders */ +typedef struct janus_rtp_forwarder { + /* \brief Opaque pointer to the owner of this forwarder */ + void *source; + /* \brief Context of the forwarder */ + char *context; + /* \brief Unique ID (within the context) of the forwarder */ + uint32_t stream_id; + /* \brief Socket used for sending RTP packets */ + int udp_fd; + /* \brief Whether this is a video forwarder */ + gboolean is_video; + /* \brief Whether this is an audio forwarder */ + gboolean is_data; + /* \brief SSRC to put in forwarded RTP packets */ + uint32_t ssrc; + /* \brief Payload type to put in forwarded RTP packets */ + int payload_type; + /* \brief Substream to forward, in case this is part of a simulcast stream */ + int substream; + /* \brief Recipient address (IPv4) */ + struct sockaddr_in serv_addr; + /* \brief Recipient address (IPv6) */ + struct sockaddr_in6 serv_addr6; + /* \brief RTCP socket, if needed */ + int rtcp_fd; + /* \brief RTCP local and remote ports, if needed */ + uint16_t local_rtcp_port, remote_rtcp_port; + /* \brief Callback to invoke when receiving RTCP messages, if any */ + void (*rtcp_callback)(struct janus_rtp_forwarder *rf, char *buffer, int len); + /* \brief RTCP GSource, if needed */ + GSource *rtcp_recv; + /* \brief Whether simulcast automatic selection is enabled for this forwarder */ + gboolean simulcast; + /* \brief RTP swtiching context, if needed */ + janus_rtp_switching_context rtp_context; + /* \brief Simulcast context, if needed */ + janus_rtp_simulcasting_context sim_context; + /* \brief Whether SRTP is enabled for this forwarder */ + gboolean is_srtp; + /* \brief The SRTP context, in case SRTP is enabled */ + srtp_t srtp_ctx; + /* \brief The SRTP policy, in case SRTP is enabled */ + srtp_policy_t srtp_policy; + /* \brief Opaque metadata property, in case it's useful to the owner + * \note This can be anything (e.g., a string, an allocated struct, etc.), + * as long as it can be freed with a single call to g_free(), as + * that's all that will be done when getting rid of the forwarder */ + void *metadata; + /*! \brief Atomic flag to check if this instance has been destroyed */ + volatile gint destroyed; + /*! \brief Reference counter for this instance */ + janus_refcount ref; +} janus_rtp_forwarder; +/*! \brief Helper method to create a new janus_rtp_forwarder instance + * @param[in] ctx The context of this forwarder (e.g., the plugin name) + * @param[in] id The unique forwarder ID to assign as part of the context (0=autogenerate) + * @param[in] udp_fd The socket to use for sending RTP packets + * @param[in] host The address to forward the RTP packets to + * @param[in] port The port to forward the RTP packets to + * @param[in] ssrc The SSRC to put in outgoing RTP packets + * @param[in] pt The payload type to put in outgoing RTP packets + * @param[in] srtp_suite In case SRTP must be enabled, the SRTP suite to use + * @param[in] srtp_crypto In case SRTP must be enabled, the base64-encoded SRTP crypto material to use + * @param[in] simulcast Whether the RTP forwarder should act as a simulcast viewer + * (meaning it will only forward the highest quality available substream) + * @param[in] substream In case we want to forward a specific simulcast substream, which substream it is + * \note Do NOT mix the simulcast and substream properties, as they implement different behaviours + * @param[in] is_video Whether this a video forwarder + * @param[in] is_data Whether this a data channel forwarder + * @returns A pointer to a valid janus_rtp_forwarder instance, if successfull, NULL otherwise */ +janus_rtp_forwarder *janus_rtp_forwarder_create(const char *ctx, + uint32_t stream_id, int udp_fd, const char *host, int port, + uint32_t ssrc, int pt, int srtp_suite, const char *srtp_crypto, + gboolean simulcast, int substream, gboolean is_video, gboolean is_data); +/*! \brief Helper method to add RTCP support to an existing forwarder + * @note Notice that only a single RTCP handler can be added to a forwarder, + * and once added it cannot be removed until the forwarder is destroyed + * @param[in] rf The janus_rtp_forwarder instance to add RTCP to + * @param[in] rtcp_port The port to latch to for RTCP purposes + * @param[in] rtcp_callback The function to invoke when RTCP feedback is received + * @returns 0 if successful, a negative integer otherwise */ +int janus_rtp_forwarder_add_rtcp(janus_rtp_forwarder *rf, int rtcp_port, + void (*rtcp_callback)(janus_rtp_forwarder *rf, char *buffer, int len)); +/*! \brief Helper method to forward an RTP packet within the context of a forwarder + * @note This is equivalent to calling janus_rtp_forwarder_send_rtp_full + * with all the extra arguments that are usually not required set to NULL + * @param[in] rf The janus_rtp_forwarder instance to use + * @param[in] buffer The RTP packet buffer + * @param[in] len The length of the RTP packet buffer + * @param[in] substream In case the forwarder is relaying a single simulcast + * substream, the substream the packet belongs to (pass -1 to ignore) */ +void janus_rtp_forwarder_send_rtp(janus_rtp_forwarder *rf, char *buffer, int len, int substream); +/*! \brief Extended version of janus_rtp_forwarder_send_rtp, to be used when the forwarder + * is configured to act as a simulcast receiver, and so will call janus_rtp_simulcasting_context_process_rtp + * @param[in] rf The janus_rtp_forwarder instance to use + * @param[in] buffer The RTP packet buffer + * @param[in] len The length of the RTP packet buffer + * @param[in] substream In case the forwarder is relaying a single simulcast + * substream, the substream the packet belongs to (pass -1 to ignore) + * @param[in] ssrcs The simulcast SSRCs to refer to (may be updated if rids are involved) + * @param[in] rids The simulcast rids to refer to, if any + * @param[in] vcodec Video codec of the RTP payload + * @param[in] rid_mutex A mutex that must be acquired before reading the rids array, if any */ +void janus_rtp_forwarder_send_rtp_full(janus_rtp_forwarder *rf, char *buffer, int len, int substream, + uint32_t *ssrcs, char **rids, janus_videocodec vcodec, janus_mutex *rid_mutex); +/*! \brief Helper method to free a janus_rtp_forwarder instance + * @param[in] rf The janus_rtp_forwarder instance to free */ +void janus_rtp_forwarder_destroy(janus_rtp_forwarder *rf); + +#endif