From 7d888765a6734e6457e48c0159ed155e9a4564f6 Mon Sep 17 00:00:00 2001 From: Lorenzo Miniero Date: Tue, 15 Sep 2020 17:53:27 +0200 Subject: [PATCH] Add support for helper threads to RTSP mountpoints (fixes #2359) --- plugins/janus_streaming.c | 67 ++++++++++++++++++++++++++++++++++----- 1 file changed, 59 insertions(+), 8 deletions(-) diff --git a/plugins/janus_streaming.c b/plugins/janus_streaming.c index 00cc603f8c..85fd86718b 100644 --- a/plugins/janus_streaming.c +++ b/plugins/janus_streaming.c @@ -862,6 +862,7 @@ static struct janus_json_parameter rtsp_parameters[] = { {"videofmtp", JSON_STRING, 0}, {"videopt", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"videobufferkf", JANUS_JSON_BOOL, 0}, + {"threads", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"rtspiface", JSON_STRING, 0}, {"rtsp_failcheck", JANUS_JSON_BOOL, 0} }; @@ -1129,8 +1130,8 @@ typedef struct janus_streaming_mountpoint { janus_streaming_codecs codecs; gboolean audio, video, data; GList *viewers; - int helper_threads; /* Only relevant for RTP mountpoints */ - GList *threads; /* Only relevant for RTP mountpoints */ + int helper_threads; /* Only relevant for RTP/RTSP mountpoints */ + GList *threads; /* Only relevant for RTP/RTSP mountpoints */ volatile gint destroyed; janus_mutex mutex; janus_refcount ref; @@ -1185,7 +1186,7 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source( char *url, char *username, char *password, gboolean doaudio, int audiopt, char *artpmap, char *afmtp, gboolean dovideo, int videopt, char *vrtpmap, char *vfmtp, gboolean bufferkf, - const janus_network_address *iface, + const janus_network_address *iface, int threads, gboolean error_on_failure); @@ -2098,6 +2099,7 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) { janus_config_item *vkf = janus_config_get(config, cat, janus_config_type_item, "videobufferkf"); janus_config_item *iface = janus_config_get(config, cat, janus_config_type_item, "rtspiface"); janus_config_item *failerr = janus_config_get(config, cat, janus_config_type_item, "rtsp_failcheck"); + janus_config_item *threads = janus_config_get(config, cat, janus_config_type_item, "threads"); janus_network_address iface_value; if(file == NULL || file->value == NULL) { JANUS_LOG(LOG_ERR, "Can't add 'rtsp' mountpoint '%s', missing mandatory information...\n", cat->name); @@ -2111,6 +2113,11 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) { gboolean error_on_failure = TRUE; if(failerr && failerr->value) error_on_failure = janus_is_true(failerr->value); + if(threads && threads->value && atoi(threads->value) < 0) { + JANUS_LOG(LOG_ERR, "Can't add 'rtsp' mountpoint '%s', invalid threads configuration...\n", cat->name); + cl = cl->next; + continue; + } if((doaudio || dovideo) && iface && iface->value) { if(!ifas) { @@ -2144,6 +2151,7 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) { vfmtp ? (char *)vfmtp->value : NULL, bufferkf, iface && iface->value ? &iface_value : NULL, + (threads && threads->value) ? atoi(threads->value) : 0, error_on_failure)) == NULL) { JANUS_LOG(LOG_ERR, "Error creating 'rtsp' mountpoint '%s'...\n", cat->name); cl = cl->next; @@ -3151,6 +3159,7 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi json_t *username = json_object_get(root, "rtsp_user"); json_t *password = json_object_get(root, "rtsp_pwd"); json_t *iface = json_object_get(root, "rtspiface"); + json_t *threads = json_object_get(root, "threads"); json_t *failerr = json_object_get(root, "rtsp_failcheck"); if(failerr == NULL) /* For an old typo, we support the legacy syntax too */ failerr = json_object_get(root, "rtsp_check"); @@ -3194,7 +3203,7 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi dovideo, (videopt ? json_integer_value(videopt) : -1), (char *)json_string_value(videortpmap), (char *)json_string_value(videofmtp), videobufferkf ? json_is_true(videobufferkf) : FALSE, - &multicast_iface, + &multicast_iface, (threads ? json_integer_value(threads) : 0), error_on_failure); janus_mutex_lock(&mountpoints_mutex); g_hash_table_remove(mountpoints_temp, string_ids ? (gpointer)mpid_str : (gpointer)&mpid); @@ -3352,6 +3361,10 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi json_t *iface = json_object_get(root, "rtspiface"); if(iface) janus_config_add(config, c, janus_config_item_create("rtspiface", json_string_value(iface))); + if(mp->helper_threads > 0) { + g_snprintf(value, BUFSIZ, "%d", mp->helper_threads); + janus_config_add(config, c, janus_config_item_create("threads", value)); + } } /* Some more common values */ if(mp->secret) @@ -3562,6 +3575,10 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi json_t *iface = json_object_get(root, "rtspiface"); if(iface) janus_config_add(config, c, janus_config_item_create("rtspiface", json_string_value(iface))); + if(mp->helper_threads > 0) { + g_snprintf(value, BUFSIZ, "%d", mp->helper_threads); + janus_config_add(config, c, janus_config_item_create("threads", value)); + } } else { janus_config_add(config, c, janus_config_item_create("type", "rtp")); janus_config_add(config, c, janus_config_item_create("audio", mp->codecs.audio_pt >= 0 ? "yes" : "no")); @@ -6068,7 +6085,6 @@ janus_streaming_mountpoint *janus_streaming_create_rtp_source( /* This extra unref is for the init */ janus_refcount_decrease(&helper->ref); janus_streaming_mountpoint_destroy(live_rtp); - g_free(helper); return NULL; } janus_refcount_increase(&helper->ref); @@ -6928,7 +6944,7 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source( char *url, char *username, char *password, gboolean doaudio, int acodec, char *artpmap, char *afmtp, gboolean dovideo, int vcodec, char *vrtpmap, char *vfmtp, gboolean bufferkf, - const janus_network_address *iface, + const janus_network_address *iface, int threads, gboolean error_on_failure) { char id_num[30]; if(!string_ids) { @@ -7040,9 +7056,44 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source( return NULL; } } - /* Start the thread that will receive the media packets */ + /* If we need helper threads, spawn them now */ GError *error = NULL; char tname[16]; + if(threads > 0) { + int i=0; + for(i=0; iid = i+1; + helper->mp = live_rtsp; + helper->queued_packets = g_async_queue_new_full((GDestroyNotify)janus_streaming_rtp_relay_packet_free); + janus_mutex_init(&helper->mutex); + janus_refcount_init(&helper->ref, janus_streaming_helper_free); + live_rtsp->helper_threads++; + /* Spawn a thread and add references */ + g_snprintf(tname, sizeof(tname), "help %u-%"SCNu64, helper->id, live_rtsp->id); + janus_refcount_increase(&live_rtsp->ref); + janus_refcount_increase(&helper->ref); + helper->thread = g_thread_try_new(tname, &janus_streaming_helper_thread, helper, &error); + if(error != NULL) { + JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the helper thread...\n", + error->code, error->message ? error->message : "??"); + g_error_free(error); + janus_refcount_decrease(&live_rtsp->ref); /* This is for the helper thread */ + g_async_queue_unref(helper->queued_packets); + janus_refcount_decrease(&helper->ref); + /* This extra unref is for the init */ + janus_refcount_decrease(&helper->ref); + janus_mutex_lock(&mountpoints_mutex); + g_hash_table_remove(mountpoints_temp, &id); + janus_mutex_unlock(&mountpoints_mutex); + janus_refcount_decrease(&live_rtsp->ref); + return NULL; + } + janus_refcount_increase(&helper->ref); + live_rtsp->threads = g_list_append(live_rtsp->threads, helper); + } + } + /* Finally, start the thread that will receive the media packets */ g_snprintf(tname, sizeof(tname), "mp %s", live_rtsp->id_str); janus_refcount_increase(&live_rtsp->ref); live_rtsp->thread = g_thread_try_new(tname, &janus_streaming_relay_thread, live_rtsp, &error); @@ -7072,7 +7123,7 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source( char *url, char *username, char *password, gboolean doaudio, int acodec, char *audiortpmap, char *audiofmtp, gboolean dovideo, int vcodec, char *videortpmap, char *videofmtp, gboolean bufferkf, - const janus_network_address *iface, + const janus_network_address *iface, int threads, gboolean error_on_failure) { JANUS_LOG(LOG_ERR, "RTSP need libcurl\n"); return NULL;