diff --git a/conf/janus.plugin.audiobridge.jcfg.sample b/conf/janus.plugin.audiobridge.jcfg.sample index c5f2359ddb..b4a7929187 100644 --- a/conf/janus.plugin.audiobridge.jcfg.sample +++ b/conf/janus.plugin.audiobridge.jcfg.sample @@ -12,6 +12,8 @@ # default_prebuffering = number of packets to buffer before decoding each particiant (default=6) # record = true|false (whether this room should be recorded, default=false) # record_file = "/path/to/recording.wav" (where to save the recording) +# allow_rtp_participants = true|false (whether participants should be allowed to join +# via plain RTP as well, rather than just WebRTC, default=false) # # The following lines are only needed if you want the mixed audio # to be automatically forwarded via plain RTP to an external component @@ -45,6 +47,22 @@ general: { # By default, integers are used as a unique ID for both rooms and participants. # In case you want to use strings instead (e.g., a UUID), set string_ids to true. #string_ids = true + + # Normally, all AudioBridge participants will join by negotiating a WebRTC + # PeerConnection: the plugin also supports adding participants that will + # use plain RTP, though, be it for supporting legacy users (e.g., SIP + # participants who an orchestrator can add to the bridge) or more simply + # to temporarily inject external audio in a room from a live source. To + # support plain RTP, the plugin needs to have a range of ports it can bind + # to: notice this should be configured so that it doesn't conflict with other + # plugins (e.g., Streaming, SIP, NoSIP) and applications (e.g., Janus itself). + # The default if you don't specify anything is 10000-60000. + #rtp_port_range = "50000-60000" + # In case we need to support plain RTP participants, we'll also need to know + # what local IP address to bind to for media. If no address is set in the + # property below, then one will be automatically guessed from the system. + #local_ip = "1.2.3.4" + } room-1234: { diff --git a/plugins/janus_audiobridge.c b/plugins/janus_audiobridge.c index c57d0e94ae..d628136eff 100644 --- a/plugins/janus_audiobridge.c +++ b/plugins/janus_audiobridge.c @@ -39,7 +39,9 @@ room-: { audio_level_average = 25 (average value of audio level, 127=muted, 0='too loud', default=25) default_prebuffering = number of packets to buffer before decoding each participant (default=DEFAULT_PREBUFFERING) record = true|false (whether this room should be recorded, default=false) - record_file = /path/to/recording.wav (where to save the recording) + record_file = /path/to/recording.wav (where to save the recording) + allow_rtp_participants = true|false (whether participants should be allowed to join + via plain RTP as well, rather than just WebRTC, default=false) [The following lines are only needed if you want the mixed audio to be automatically forwarded via plain RTP to an external component @@ -131,6 +133,7 @@ room-: { "default_prebuffering" : , "record" : , "record_file" : "", + "allow_rtp_participants" : } \endverbatim * @@ -867,8 +870,12 @@ room-: { #ifdef HAVE_LIBOGG #include #endif +#include +#include +#include #include #include +#include #include "../debug.h" #include "../apierror.h" @@ -987,6 +994,7 @@ static struct janus_json_parameter create_parameters[] = { {"sampling", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, /* We keep this to be backwards compatible */ {"record", JANUS_JSON_BOOL, 0}, {"record_file", JSON_STRING, 0}, + {"allow_rtp_participants", JANUS_JSON_BOOL, 0}, {"permanent", JANUS_JSON_BOOL, 0}, {"audiolevel_ext", JANUS_JSON_BOOL, 0}, {"audiolevel_event", JANUS_JSON_BOOL, 0}, @@ -1024,8 +1032,16 @@ static struct janus_json_parameter join_parameters[] = { {"audio_level_average", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"audio_active_packets", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"generate_offer", JANUS_JSON_BOOL, 0}, + {"rtp", JSON_OBJECT, 0}, {"secret", JSON_STRING, 0} }; +static struct janus_json_parameter rtp_parameters[] = { + {"ip", JSON_STRING, 0}, + {"port", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, + {"payload_type", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, + {"audiolevel_ext", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, + {"fec", JANUS_JSON_BOOL, 0}, +}; static struct janus_json_parameter configure_parameters[] = { {"muted", JANUS_JSON_BOOL, 0}, {"prebuffer", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, @@ -1080,6 +1096,15 @@ static void janus_audiobridge_hangup_media_internal(janus_plugin_session *handle /* Extension to add while recording (e.g., "tmp" --> ".wav.tmp") */ static char *rec_tempext = NULL; +/* RTP range, in case we need to support plain RTP participants */ +static char *local_ip = NULL; +#define JANUS_AUDIOBRIDGE_DEFAULT_RTP_RANGE_MIN 10000 +#define JANUS_AUDIOBRIDGE_DEFAULT_RTP_RANGE_MAX 60000 +static uint16_t rtp_range_min = JANUS_AUDIOBRIDGE_DEFAULT_RTP_RANGE_MIN; +static uint16_t rtp_range_max = JANUS_AUDIOBRIDGE_DEFAULT_RTP_RANGE_MAX; +static uint16_t rtp_range_slider = JANUS_AUDIOBRIDGE_DEFAULT_RTP_RANGE_MIN; + +/* Asynchronous API message to handle */ typedef struct janus_audiobridge_message { janus_plugin_session *handle; char *transaction; @@ -1109,6 +1134,7 @@ typedef struct janus_audiobridge_room { gchar *record_file; /* Path of the recording file */ FILE *recording; /* File to record the room into */ gint64 record_lastupdate; /* Time when we last updated the wav header */ + gboolean allow_plainrtp; /* Whether plain RTP participants are allowed*/ gboolean destroy; /* Value to flag the room for destruction */ GHashTable *participants; /* Map of participants */ GHashTable *anncs; /* Map of announcements */ @@ -1297,6 +1323,24 @@ static void janus_audiobridge_file_free(janus_audiobridge_file *ctx) { } #endif +/* In case we need to support plain RTP participants, this struct helps with that */ +typedef struct janus_audiobridge_plainrtp_media { + char *remote_audio_ip; + int ready:1; + int audio_rtp_fd; + int local_audio_rtp_port, remote_audio_rtp_port; + guint32 audio_ssrc, audio_ssrc_peer; + int audio_pt; + gboolean audio_send; + janus_rtp_switching_context context; + int pipefd[2]; + GThread *thread; +} janus_audiobridge_plainrtp_media; +static void janus_audiobridge_plainrtp_media_cleanup(janus_audiobridge_plainrtp_media *media); +static int janus_audiobridge_plainrtp_allocate_port(janus_audiobridge_plainrtp_media *media); +static void *janus_audiobridge_plainrtp_relay_thread(void *data); + +/* AudioBridge participant */ typedef struct janus_audiobridge_participant { janus_audiobridge_session *session; janus_audiobridge_room *room; /* Room */ @@ -1327,6 +1371,10 @@ typedef struct janus_audiobridge_participant { gboolean talking; /* Whether this participant is currently talking (uses audio levels extension) */ janus_rtp_switching_context context; /* Needed in case the participant changes room */ janus_audiocodec codec; /* Codec this participant is using (most often Opus, but G.711 is supported too) */ + /* Plain RTP, in case this is not a WebRTC participant */ + gboolean plainrtp; /* Whether this is a WebRTC participant, or a plain RTP one */ + janus_audiobridge_plainrtp_media plainrtp_media; + janus_mutex pmutex; /* Opus stuff */ OpusEncoder *encoder; /* Opus encoder instance */ OpusDecoder *decoder; /* Opus decoder instance */ @@ -1399,6 +1447,9 @@ static void janus_audiobridge_participant_free(const janus_refcount *participant #ifdef HAVE_LIBOGG janus_audiobridge_file_free(participant->annc); #endif + janus_mutex_lock(&participant->pmutex); + janus_audiobridge_plainrtp_media_cleanup(&participant->plainrtp_media); + janus_mutex_unlock(&participant->pmutex); g_free(participant); } @@ -2074,7 +2125,70 @@ int janus_audiobridge_init(janus_callbacks *callback, const char *config_path) { if(string_ids) { JANUS_LOG(LOG_INFO, "AudioBridge will use alphanumeric IDs, not numeric\n"); } + janus_config_item *lip = janus_config_get(config, config_general, janus_config_type_item, "local_ip"); + if(lip && lip->value) { + /* Verify that the address is valid */ + struct ifaddrs *ifas = NULL; + janus_network_address iface; + janus_network_address_string_buffer ibuf; + if(getifaddrs(&ifas) == -1) { + JANUS_LOG(LOG_ERR, "Unable to acquire list of network devices/interfaces; some configurations may not work as expected... %d (%s)\n", + errno, strerror(errno)); + } else { + if(janus_network_lookup_interface(ifas, lip->value, &iface) != 0) { + JANUS_LOG(LOG_WARN, "Error setting local IP address to %s, falling back to detecting IP address...\n", lip->value); + } else { + if(janus_network_address_to_string_buffer(&iface, &ibuf) != 0 || janus_network_address_string_buffer_is_null(&ibuf)) { + JANUS_LOG(LOG_WARN, "Error getting local IP address from %s, falling back to detecting IP address...\n", lip->value); + } else { + local_ip = g_strdup(janus_network_address_string_from_buffer(&ibuf)); + } + } + freeifaddrs(ifas); + } + } + janus_config_item *rpr = janus_config_get(config, config_general, janus_config_type_item, "rtp_port_range"); + if(rpr && rpr->value) { + /* Split in min and max port */ + char *maxport = strrchr(rpr->value, '-'); + if(maxport != NULL) { + *maxport = '\0'; + maxport++; + if(janus_string_to_uint16(rpr->value, &rtp_range_min) < 0) + JANUS_LOG(LOG_WARN, "Invalid RTP min port value: %s (assuming 0)\n", rpr->value); + if(janus_string_to_uint16(maxport, &rtp_range_max) < 0) + JANUS_LOG(LOG_WARN, "Invalid RTP max port value: %s (assuming 0)\n", maxport); + maxport--; + *maxport = '-'; + } + if(rtp_range_min > rtp_range_max) { + uint16_t temp_port = rtp_range_min; + rtp_range_min = rtp_range_max; + rtp_range_max = temp_port; + } + if(rtp_range_min % 2) + rtp_range_min++; /* Pick an even port for RTP */ + if(rtp_range_min > rtp_range_max) { + JANUS_LOG(LOG_WARN, "Incorrect port range (%u -- %u), switching min and max\n", rtp_range_min, rtp_range_max); + uint16_t range_temp = rtp_range_max; + rtp_range_max = rtp_range_min; + rtp_range_min = range_temp; + } + if(rtp_range_max == 0) + rtp_range_max = 65535; + rtp_range_slider = rtp_range_min; + JANUS_LOG(LOG_VERB, "AudioBridge RTP port range: %u -- %u\n", rtp_range_min, rtp_range_max); + } + } + if(local_ip == NULL) { + local_ip = janus_network_detect_local_ip_as_string(janus_network_query_options_any_ip); + if(local_ip == NULL) { + JANUS_LOG(LOG_WARN, "Couldn't find any address! using 127.0.0.1 as the local IP... (which is NOT going to work out of your machine)\n"); + local_ip = g_strdup("127.0.0.1"); + } } + JANUS_LOG(LOG_VERB, "Local IP set to %s\n", local_ip); + /* Iterate on all rooms */ rooms = g_hash_table_new_full(string_ids ? g_str_hash : g_int64_hash, string_ids ? g_str_equal : g_int64_equal, (GDestroyNotify)g_free, (GDestroyNotify)janus_audiobridge_room_destroy); @@ -2099,6 +2213,7 @@ int janus_audiobridge_init(janus_callbacks *callback, const char *config_path) { janus_config_item *pin = janus_config_get(config, cat, janus_config_type_item, "pin"); janus_config_item *record = janus_config_get(config, cat, janus_config_type_item, "record"); janus_config_item *recfile = janus_config_get(config, cat, janus_config_type_item, "record_file"); + janus_config_item *allowrtp = janus_config_get(config, cat, janus_config_type_item, "allow_rtp_participants"); if(sampling == NULL || sampling->value == NULL) { JANUS_LOG(LOG_ERR, "Can't add the AudioBridge room, missing mandatory information...\n"); cl = cl->next; @@ -2206,6 +2321,9 @@ int janus_audiobridge_init(janus_callbacks *callback, const char *config_path) { if(recfile && recfile->value) audiobridge->record_file = g_strdup(recfile->value); audiobridge->recording = NULL; + audiobridge->allow_plainrtp = FALSE; + if(allowrtp && allowrtp->value) + audiobridge->allow_plainrtp = janus_is_true(allowrtp->value); audiobridge->destroy = 0; audiobridge->participants = g_hash_table_new_full( string_ids ? g_str_hash : g_int64_hash, string_ids ? g_str_equal : g_int64_equal, @@ -2457,6 +2575,22 @@ json_t *janus_audiobridge_query_session(janus_plugin_session *handle) { json_object_set_new(info, "talking", participant->talking ? json_true() : json_false()); } json_object_set_new(info, "fec", participant->fec ? json_true() : json_false()); + if(participant->plainrtp_media.audio_rtp_fd != -1) { + json_t *rtp = json_object(); + if(local_ip) + json_object_set_new(rtp, "local-ip", json_string(local_ip)); + if(participant->plainrtp_media.local_audio_rtp_port) + json_object_set_new(rtp, "local-port", json_integer(participant->plainrtp_media.local_audio_rtp_port)); + if(participant->plainrtp_media.remote_audio_ip) + json_object_set_new(rtp, "remote-ip", json_string(participant->plainrtp_media.remote_audio_ip)); + if(participant->plainrtp_media.remote_audio_rtp_port) + json_object_set_new(rtp, "remote-port", json_integer(participant->plainrtp_media.remote_audio_rtp_port)); + if(participant->plainrtp_media.audio_ssrc) + json_object_set_new(rtp, "local-ssrc", json_integer(participant->plainrtp_media.audio_ssrc)); + if(participant->plainrtp_media.audio_ssrc_peer) + json_object_set_new(rtp, "remote-ssrc", json_integer(participant->plainrtp_media.audio_ssrc_peer)); + json_object_set_new(info, "plain-rtp", rtp); + } } if(session->plugin_offer) json_object_set_new(info, "plugin-offer", json_true()); @@ -2524,6 +2658,7 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s json_t *default_prebuffering = json_object_get(root, "default_prebuffering"); json_t *record = json_object_get(root, "record"); json_t *recfile = json_object_get(root, "record_file"); + json_t *allowrtp = json_object_get(root, "allow_rtp_participants"); json_t *permanent = json_object_get(root, "permanent"); if(allowed) { /* Make sure the "allowed" array only contains strings */ @@ -2670,6 +2805,9 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s if(recfile) audiobridge->record_file = g_strdup(json_string_value(recfile)); audiobridge->recording = NULL; + audiobridge->allow_plainrtp = FALSE; + if(allowrtp && json_is_true(allowrtp)) + audiobridge->allow_plainrtp = TRUE; audiobridge->destroy = 0; audiobridge->participants = g_hash_table_new_full( string_ids ? g_str_hash : g_int64_hash, string_ids ? g_str_equal : g_int64_equal, @@ -4487,7 +4625,8 @@ struct janus_plugin_result *janus_audiobridge_handle_message(janus_plugin_sessio /* We got a response, send it back */ goto plugin_response; } else if(!strcasecmp(request_text, "join") || !strcasecmp(request_text, "configure") - || !strcasecmp(request_text, "changeroom") || !strcasecmp(request_text, "leave")) { + || !strcasecmp(request_text, "changeroom") || !strcasecmp(request_text, "leave") + || !strcasecmp(request_text, "hangup")) { /* These messages are handled asynchronously */ janus_audiobridge_message *msg = g_malloc(sizeof(janus_audiobridge_message)); msg->handle = handle; @@ -4645,8 +4784,9 @@ void janus_audiobridge_incoming_rtp(janus_plugin_session *handle, janus_plugin_r return; if(participant->room && participant->room->muted && !participant->admin) return; - char *buf = packet->buffer; uint16_t len = packet->length; /* - Save the frame if we're recording this leg */ + char *buf = packet->buffer; + uint16_t len = packet->length; + /* Save the frame if we're recording this leg */ janus_recorder_save_frame(participant->arc, buf, len); if(g_atomic_int_get(&participant->active) && (participant->codec != JANUS_AUDIOCODEC_OPUS || @@ -4847,6 +4987,8 @@ void janus_audiobridge_incoming_rtp(janus_plugin_session *handle, janus_plugin_r JANUS_LOG(LOG_ERR, "[Opus] Ops! got an error decoding the Opus frame: %d (%s)\n", lost_pkt->length, opus_strerror(lost_pkt->length)); g_free(lost_pkt->data); g_free(lost_pkt); + g_free(pkt->data); + g_free(pkt); return; } /* Enqueue the decoded frame */ @@ -4989,6 +5131,14 @@ static void janus_audiobridge_hangup_media_internal(janus_plugin_session *handle return; /* Get rid of participant */ janus_audiobridge_participant *participant = (janus_audiobridge_participant *)session->participant; + /* If this was a plain RTP participant, notify the thread that it's time to go */ + if(participant->plainrtp_media.pipefd[1] > 0) { + int code = 1; + ssize_t res = 0; + do { + res = write(participant->plainrtp_media.pipefd[1], &code, sizeof(int)); + } while(res == -1 && errno == EINTR); + } janus_mutex_lock(&rooms_mutex); janus_audiobridge_room *audiobridge = participant->room; gboolean removed = FALSE; @@ -5160,6 +5310,19 @@ static void *janus_audiobridge_handler(void *data) { JANUS_AUDIOBRIDGE_ERROR_MISSING_ELEMENT, JANUS_AUDIOBRIDGE_ERROR_INVALID_ELEMENT); if(error_code != 0) goto error; + json_t *rtp = json_object_get(root, "rtp"); + if(rtp != NULL) { + JANUS_VALIDATE_JSON_OBJECT(root, rtp_parameters, + error_code, error_cause, TRUE, + JANUS_AUDIOBRIDGE_ERROR_MISSING_ELEMENT, JANUS_AUDIOBRIDGE_ERROR_INVALID_ELEMENT); + if(error_code != 0) + goto error; + if(msg_sdp != NULL) { + JANUS_LOG(LOG_WARN, "Added plain RTP details but negotiating a WebRTC PeerConnection: plain RTP will be ignored\n"); + rtp = NULL; + json_object_del(root, "rtp"); + } + } if(!string_ids) { JANUS_VALIDATE_JSON_OBJECT(root, room_parameters, error_code, error_cause, TRUE, @@ -5205,6 +5368,15 @@ static void *janus_audiobridge_handler(void *data) { janus_refcount_increase(&audiobridge->ref); janus_mutex_lock(&audiobridge->mutex); janus_mutex_unlock(&rooms_mutex); + if(rtp != NULL && !audiobridge->allow_plainrtp) { + /* Plain RTP participants are not allowed in this room */ + janus_mutex_unlock(&audiobridge->mutex); + janus_refcount_decrease(&audiobridge->ref); + error_code = JANUS_AUDIOBRIDGE_ERROR_UNAUTHORIZED; + JANUS_LOG(LOG_ERR, "Plain RTP participants not allowed in this room\n"); + g_snprintf(error_cause, 512, "Plain RTP participants not allowed in this room"); + goto error; + } /* A pin may be required for this action */ JANUS_CHECK_SECRET(audiobridge->room_pin, root, "pin", error_code, error_cause, JANUS_AUDIOBRIDGE_ERROR_MISSING_ELEMENT, JANUS_AUDIOBRIDGE_ERROR_INVALID_ELEMENT, JANUS_AUDIOBRIDGE_ERROR_UNAUTHORIZED); @@ -5346,6 +5518,8 @@ static void *janus_audiobridge_handler(void *data) { participant->last_timestamp = 0; janus_mutex_init(&participant->qmutex); participant->arc = NULL; + janus_audiobridge_plainrtp_media_cleanup(&participant->plainrtp_media); + janus_mutex_init(&participant->pmutex); janus_mutex_init(&participant->rec_mutex); } participant->session = session; @@ -5435,6 +5609,59 @@ static void *janus_audiobridge_handler(void *data) { } } participant->reset = FALSE; + /* If this is a plain RTP participant, create the socket */ + if(rtp != NULL) { + const char *ip = json_string_value(json_object_get(rtp, "ip")); + uint16_t port = json_integer_value(json_object_get(rtp, "port")); + int pt = json_integer_value(json_object_get(rtp, "payload_type")); + if(pt == 0) + pt = 100; + participant->opus_pt = pt; + int audiolevel_ext_id = json_integer_value(json_object_get(rtp, "audiolevel_ext")); + if(audiolevel_ext_id > 0) + participant->extmap_id = audiolevel_ext_id; + gboolean fec = json_is_true(json_object_get(rtp, "fec")); + if(fec) { + participant->fec = TRUE; + opus_encoder_ctl(participant->encoder, OPUS_SET_INBAND_FEC(participant->fec)); + } + /* Create the socket */ + janus_mutex_lock(&participant->pmutex); + janus_audiobridge_plainrtp_media_cleanup(&participant->plainrtp_media); + if(janus_audiobridge_plainrtp_allocate_port(&participant->plainrtp_media) < 0) { + JANUS_LOG(LOG_ERR, "[AudioBridge-%p] Couldn't bind to local port\n", session); + } else if(ip != NULL && port > 0) { + /* Connect the socket, if there's a remote address */ + g_free(participant->plainrtp_media.remote_audio_ip); + participant->plainrtp_media.remote_audio_ip = g_strdup(ip); + participant->plainrtp_media.remote_audio_rtp_port = port; + struct sockaddr_in audio_server_addr = { 0 }; + memset(&audio_server_addr, 0, sizeof(struct sockaddr_in)); + audio_server_addr.sin_family = AF_INET; + gboolean have_audio_server_ip = TRUE; + if(participant->plainrtp_media.remote_audio_ip && inet_aton(participant->plainrtp_media.remote_audio_ip, &audio_server_addr.sin_addr) == 0) { /* Not a numeric IP... */ + /* Note that gethostbyname() may block waiting for response if it triggers on the wire request.*/ + struct hostent *host = gethostbyname(participant->plainrtp_media.remote_audio_ip); /* ...resolve name */ + if(!host) { + JANUS_LOG(LOG_ERR, "[AudioBridge-%p] Couldn't get host (%s)\n", session, participant->plainrtp_media.remote_audio_ip); + have_audio_server_ip = FALSE; + } else { + audio_server_addr.sin_addr = *(struct in_addr *)host->h_addr_list; + } + } + audio_server_addr.sin_port = htons(participant->plainrtp_media.remote_audio_rtp_port); + if(have_audio_server_ip) { + if(connect(participant->plainrtp_media.audio_rtp_fd, (struct sockaddr *)&audio_server_addr, sizeof(struct sockaddr)) == -1) { + JANUS_LOG(LOG_ERR, "[AudioBridge-%p] Couldn't connect audio RTP? (%s:%d)\n", session, + participant->plainrtp_media.remote_audio_ip, participant->plainrtp_media.remote_audio_rtp_port); + JANUS_LOG(LOG_ERR, "[AudioBridge-%p] -- %d (%s)\n", session, errno, strerror(errno)); + } else { + participant->plainrtp_media.audio_send = TRUE; + } + } + } + janus_mutex_unlock(&participant->pmutex); + } /* Finally, start the encoding thread if it hasn't already */ if(participant->thread == NULL) { GError *error = NULL; @@ -5455,6 +5682,26 @@ static void *janus_audiobridge_handler(void *data) { g_error_free(error); } } + if(participant->plainrtp_media.audio_rtp_fd != -1 && participant->plainrtp_media.thread == NULL) { + /* Spawn a thread for incoming plain RTP traffic too */ + GError *error = NULL; + char roomtrunc[5], parttrunc[5]; + g_snprintf(roomtrunc, sizeof(roomtrunc), "%s", audiobridge->room_id_str); + g_snprintf(parttrunc, sizeof(parttrunc), "%s", participant->user_id_str); + char tname[16]; + g_snprintf(tname, sizeof(tname), "rtp %s %s", roomtrunc, parttrunc); + janus_refcount_increase(&session->ref); + janus_refcount_increase(&participant->ref); + participant->plainrtp_media.thread = g_thread_try_new(tname, &janus_audiobridge_plainrtp_relay_thread, session, &error); + if(error != NULL) { + janus_refcount_decrease(&participant->ref); + janus_refcount_decrease(&session->ref); + /* FIXME We should fail here... */ + JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the plain RTP participant thread...\n", + error->code, error->message ? error->message : "??"); + g_error_free(error); + } + } /* If a PeerConnection exists, make sure to update the RTP headers */ if(g_atomic_int_get(&session->started) == 1) participant->context.a_last_ssrc = 0; @@ -5517,6 +5764,12 @@ static void *janus_audiobridge_handler(void *data) { json_object_set_new(event, "room", string_ids ? json_string(room_id_str) : json_integer(room_id)); json_object_set_new(event, "id", string_ids ? json_string(user_id_str) : json_integer(user_id)); json_object_set_new(event, "participants", list); + if(participant->plainrtp_media.local_audio_rtp_port > 0) { + json_t *details = json_object(); + json_object_set_new(details, "ip", json_string(local_ip)); + json_object_set_new(details, "port", json_integer(participant->plainrtp_media.local_audio_rtp_port)); + json_object_set_new(event, "rtp", details); + } /* Also notify event handlers */ if(notify_events && gateway->events_is_enabled()) { json_t *info = json_object(); @@ -6109,6 +6362,11 @@ static void *janus_audiobridge_handler(void *data) { g_free(user_id_str); janus_mutex_unlock(&audiobridge->mutex); janus_mutex_unlock(&rooms_mutex); + } else if(!strcasecmp(request_text, "hangup")) { + /* Get rid of an ongoing session */ + gateway->close_pc(session->handle); + event = json_object(); + json_object_set_new(event, "audiobridge", json_string("hangingup")); } else if(!strcasecmp(request_text, "leave")) { /* This participant is leaving */ janus_audiobridge_participant *participant = (janus_audiobridge_participant *)session->participant; @@ -7043,7 +7301,16 @@ static void janus_audiobridge_relay_rtp_packet(gpointer data, gpointer user_data packet->data->type = (participant->codec == JANUS_AUDIOCODEC_PCMA ? 8 : 0); /* Fix sequence number and timestamp (room switching may be involved) */ janus_rtp_header_update(packet->data, &participant->context, FALSE, 0); - if(gateway != NULL) { + if(participant->plainrtp_media.audio_rtp_fd > 0) { + if(participant->plainrtp_media.audio_ssrc == 0) + participant->plainrtp_media.audio_ssrc = ntohl(packet->ssrc); + if(participant->plainrtp_media.audio_send) { + int ret = send(participant->plainrtp_media.audio_rtp_fd, (char *)packet->data, packet->length, 0); + if(ret < 0) { + JANUS_LOG(LOG_WARN, "Error sending plain RTP packet: %d (%s)\n", errno, strerror(errno)); + } + } + } else if(gateway != NULL) { janus_plugin_rtp rtp = { .video = FALSE, .buffer = (char *)packet->data, .length = packet->length }; janus_plugin_rtp_extensions_reset(&rtp.extensions); /* FIXME Should we add our own audio level extension? */ @@ -7053,3 +7320,207 @@ static void janus_audiobridge_relay_rtp_packet(gpointer data, gpointer user_data packet->data->timestamp = htonl(packet->timestamp); packet->data->seq_number = htons(packet->seq_number); } + +/* Plain RTP stuff */ +static void janus_audiobridge_plainrtp_media_cleanup(janus_audiobridge_plainrtp_media *media) { + if(media == NULL) + return; + media->ready = FALSE; + media->audio_pt = -1; + media->audio_send = FALSE; + if(media->audio_rtp_fd > 0) + close(media->audio_rtp_fd); + media->audio_rtp_fd = -1; + media->local_audio_rtp_port = 0; + media->remote_audio_rtp_port = 0; + g_free(media->remote_audio_ip); + media->remote_audio_ip = NULL; + media->audio_ssrc = 0; + media->audio_ssrc_peer = 0; + if(media->pipefd[0] > 0) + close(media->pipefd[0]); + media->pipefd[0] = -1; + if(media->pipefd[1] > 0) + close(media->pipefd[1]); + media->pipefd[1] = -1; +} +static int janus_audiobridge_plainrtp_allocate_port(janus_audiobridge_plainrtp_media *media) { + /* Read global slider */ + uint16_t rtp_port_next = rtp_range_slider; + uint16_t rtp_port_start = rtp_port_next; + gboolean rtp_port_wrap = FALSE; + /* Find a port we can use */ + int rtp_fd = -1; + while(1) { + if(rtp_port_wrap && rtp_port_next >= rtp_port_start) { + /* Full range scanned */ + JANUS_LOG(LOG_ERR, "No ports available in range: %u -- %u\n", rtp_range_min, rtp_range_max); + break; + } + if(rtp_fd == -1) { + rtp_fd = socket(AF_INET, SOCK_DGRAM, 0); + } + if(rtp_fd == -1) { + JANUS_LOG(LOG_ERR, "Error creating socket...\n"); + break; + } + int rtp_port = rtp_port_next; + if((uint32_t)(rtp_port_next + 2UL) < rtp_range_max) { + /* Advance to next value */ + rtp_port_next += 2; + } else { + rtp_port_next = rtp_range_min; + rtp_port_wrap = TRUE; + } + struct sockaddr_in rtp_address = { 0 }; + rtp_address.sin_family = AF_INET; + rtp_address.sin_port = htons(rtp_port); + inet_pton(AF_INET, local_ip, &rtp_address.sin_addr.s_addr); + if(bind(rtp_fd, (struct sockaddr *)(&rtp_address), sizeof(struct sockaddr)) < 0) { + /* rtp_fd still unbound, reuse it in the next iteration */ + } else { + media->audio_rtp_fd = rtp_fd; + media->local_audio_rtp_port = rtp_port; + rtp_range_slider = rtp_port_next; /* Update global slider */ + return 0; + } + } + if(rtp_fd != -1) { + close(rtp_fd); + } + return -1; +} +/* Thread to relay RTP/RTCP frames coming from the peer */ +static void *janus_audiobridge_plainrtp_relay_thread(void *data) { + janus_audiobridge_session *session = (janus_audiobridge_session *)data; + JANUS_LOG(LOG_INFO, "[AudioBridge-%p] Starting Plain RTP participant thread\n", session); + if(!session || !session->participant) { + JANUS_LOG(LOG_WARN, "[AudioBridge-%p] Invalid session or participant..?\n", session); + g_thread_unref(g_thread_self()); + return NULL; + } + janus_audiobridge_participant *participant = (janus_audiobridge_participant *)session->participant; + + /* File descriptors */ + socklen_t addrlen; + struct sockaddr_in remote = { 0 }; + int resfd = 0, bytes = 0, pollerrs = 0; + struct pollfd fds[2]; + int pipe_fd = participant->plainrtp_media.pipefd[0]; + char buffer[1500]; + memset(buffer, 0, 1500); + /* Loop */ + int num = 0; + gboolean first = TRUE, goon = TRUE; + + /* Fake RTP packet */ + janus_plugin_rtp packet = { .video = FALSE, .buffer = buffer, .length = 0 }; + janus_plugin_rtp_extensions_reset(&packet.extensions); + + while(goon && session != NULL && !g_atomic_int_get(&session->destroyed) && !g_atomic_int_get(&session->hangingup)) { + /* Prepare poll */ + num = 0; + if(participant->plainrtp_media.audio_rtp_fd != -1) { + fds[num].fd = participant->plainrtp_media.audio_rtp_fd; + fds[num].events = POLLIN; + fds[num].revents = 0; + num++; + } + if(pipe_fd != -1) { + fds[num].fd = pipe_fd; + fds[num].events = POLLIN; + fds[num].revents = 0; + num++; + } + /* Wait for some data */ + resfd = poll(fds, num, 1000); + if(resfd < 0) { + if(errno == EINTR) { + JANUS_LOG(LOG_HUGE, "[AudioBridge-%p] Got an EINTR (%s), ignoring...\n", session, strerror(errno)); + continue; + } + JANUS_LOG(LOG_ERR, "[AudioBridge-%p] Error polling...\n", session); + JANUS_LOG(LOG_ERR, "[AudioBridge-%p] -- %d (%s)\n", session, errno, strerror(errno)); + break; + } else if(resfd == 0) { + /* No data, keep going */ + continue; + } + if(session == NULL || g_atomic_int_get(&session->destroyed)) + break; + int i = 0; + for(i=0; ihandle); + break; + } else if(fds[i].revents & POLLIN) { + if(pipe_fd != -1 && fds[i].fd == pipe_fd) { + /* Poll interrupted for a reason, go on */ + int code = 0; + (void)read(pipe_fd, &code, sizeof(int)); + break; + } + /* Got an RTP packet */ + addrlen = sizeof(remote); + bytes = recvfrom(fds[i].fd, buffer, 1500, 0, (struct sockaddr*)&remote, &addrlen); + if(bytes < 0) { + /* Failed to read? */ + continue; + } + /* Audio RTP */ + if(!janus_is_rtp(buffer, bytes)) { + /* Not an RTP packet? */ + continue; + } + /* If this is the first packet we receive, simulate a setup_media event */ + if(first) { + first = FALSE; + janus_audiobridge_setup_media(session->handle); + } + /* Handle the packet */ + pollerrs = 0; + rtp_header *header = (rtp_header *)buffer; + if(participant->plainrtp_media.audio_ssrc_peer != ntohl(header->ssrc)) { + participant->plainrtp_media.audio_ssrc_peer = ntohl(header->ssrc); + JANUS_LOG(LOG_VERB, "[AudioBridge-%p] Got SIP peer audio SSRC: %"SCNu32"\n", + session, participant->plainrtp_media.audio_ssrc_peer); + } + /* Check if the SSRC changed (e.g., after a re-INVITE or UPDATE) */ + janus_rtp_header_update(header, &participant->plainrtp_media.context, FALSE, 0); + /* Handle as a WebRTC RTP packet */ + packet.length = bytes; + janus_audiobridge_incoming_rtp(session->handle, &packet); + continue; + } + } + } + /* Cleanup the media session */ + participant->plainrtp_media.thread = NULL; + janus_mutex_lock(&participant->pmutex); + janus_audiobridge_plainrtp_media_cleanup(&participant->plainrtp_media); + janus_mutex_unlock(&participant->pmutex); + /* Done */ + JANUS_LOG(LOG_INFO, "[AudioBridge-%p] Leaving Plain RTP participant thread\n", session); + janus_refcount_decrease(&participant->ref); + janus_refcount_decrease(&session->ref); + g_thread_unref(g_thread_self()); + return NULL; +}