From c690ea96734c617582520dde71df4d053db1bd07 Mon Sep 17 00:00:00 2001 From: lucylu-star Date: Tue, 2 Feb 2021 15:55:21 +0800 Subject: [PATCH 01/10] test --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 3c86cac1a6..faff20b3ec 100644 --- a/README.md +++ b/README.md @@ -344,3 +344,4 @@ Janus is thoroughly documented. You can find the current documentation, automati Any thought, feedback or (hopefully not!) insult is welcome! Developed by [@meetecho](https://github.com/meetecho) + From 6517f2092f03d915b6add8f7bb030b2d51beb51d Mon Sep 17 00:00:00 2001 From: lucylu-star Date: Tue, 2 Feb 2021 15:59:57 +0800 Subject: [PATCH 02/10] fix deadlock when rtsp reconnect fail --- plugins/janus_streaming.c | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/plugins/janus_streaming.c b/plugins/janus_streaming.c index 07b8b3ce44..b7e3d80b1d 100644 --- a/plugins/janus_streaming.c +++ b/plugins/janus_streaming.c @@ -6474,7 +6474,11 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp int asport = 0, asport_rtcp = 0; multiple_fds audio_fds = {-1, -1}; - janus_mutex_lock(&mountpoints_mutex); + if(g_atomic_int_get(&mp->destroyed)) + return -8; + janus_mutex_lock(&mp_mutex); + if(g_atomic_int_get(&mp->destroyed)) + return -8; /* Parse both video and audio first before proceed to setup as curldata will be reused */ int vresult = -1; if(dovideo) { @@ -6486,7 +6490,7 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp aresult = janus_streaming_rtsp_parse_sdp(curldata->buffer, name, "audio", abase, &apt, atransport, ahost, artpmap, afmtp, acontrol, &source->audio_iface, &audio_fds); } - janus_mutex_unlock(&mountpoints_mutex); + janus_mutex_unlock(&mp_mutex); if(vresult == -1 && aresult == -1) { /* Both audio and video failed? Give up... */ @@ -7629,6 +7633,9 @@ static void *janus_streaming_relay_thread(void *data) { close(source->video_rtcp_fd); } source->video_rtcp_fd = -1; + + if(g_atomic_int_get(&mountpoint->destroyed)) + break; /* Now let's try to reconnect */ if(janus_streaming_rtsp_connect_to_server(mountpoint) < 0) { /* Reconnection failed? Let's try again later */ From 9064c253e3cd238a43a9ce26f2336c595b5b2057 Mon Sep 17 00:00:00 2001 From: lucylu-star Date: Tue, 2 Feb 2021 16:23:14 +0800 Subject: [PATCH 03/10] fix deadlock when destroying the mountpoint and rtsp reconnect always fail --- plugins/janus_streaming.c | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/plugins/janus_streaming.c b/plugins/janus_streaming.c index b7e3d80b1d..a2f2aadab6 100644 --- a/plugins/janus_streaming.c +++ b/plugins/janus_streaming.c @@ -6474,11 +6474,16 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp int asport = 0, asport_rtcp = 0; multiple_fds audio_fds = {-1, -1}; - if(g_atomic_int_get(&mp->destroyed)) - return -8; + if(g_atomic_int_get(&mp->destroyed)){ + return -8; + } + janus_mutex_lock(&mp_mutex); - if(g_atomic_int_get(&mp->destroyed)) - return -8; + + if(g_atomic_int_get(&mp->destroyed)){ + janus_mutex_unlock(&mp_mutex); + return -8; + } /* Parse both video and audio first before proceed to setup as curldata will be reused */ int vresult = -1; if(dovideo) { From 1045081c53a1862325a9cb3356881c4f73f31bd1 Mon Sep 17 00:00:00 2001 From: lucylu-star Date: Tue, 2 Feb 2021 16:27:30 +0800 Subject: [PATCH 04/10] replace space with tab --- plugins/janus_streaming.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/plugins/janus_streaming.c b/plugins/janus_streaming.c index a2f2aadab6..a7fa0e67d7 100644 --- a/plugins/janus_streaming.c +++ b/plugins/janus_streaming.c @@ -7639,8 +7639,10 @@ static void *janus_streaming_relay_thread(void *data) { } source->video_rtcp_fd = -1; - if(g_atomic_int_get(&mountpoint->destroyed)) - break; + if(g_atomic_int_get(&mountpoint->destroyed)){ + break; + } + /* Now let's try to reconnect */ if(janus_streaming_rtsp_connect_to_server(mountpoint) < 0) { /* Reconnection failed? Let's try again later */ From d9303cb0d207ee65ebda68cb221a82fe753038d4 Mon Sep 17 00:00:00 2001 From: lucylu-star Date: Tue, 2 Feb 2021 16:38:44 +0800 Subject: [PATCH 05/10] adjust since typo --- plugins/janus_streaming.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/janus_streaming.c b/plugins/janus_streaming.c index a7fa0e67d7..dfb507f4cc 100644 --- a/plugins/janus_streaming.c +++ b/plugins/janus_streaming.c @@ -6478,10 +6478,10 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp return -8; } - janus_mutex_lock(&mp_mutex); + janus_mutex_lock(&mp->mutex); if(g_atomic_int_get(&mp->destroyed)){ - janus_mutex_unlock(&mp_mutex); + janus_mutex_unlock(&mp->mutex); return -8; } /* Parse both video and audio first before proceed to setup as curldata will be reused */ @@ -6495,7 +6495,7 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp aresult = janus_streaming_rtsp_parse_sdp(curldata->buffer, name, "audio", abase, &apt, atransport, ahost, artpmap, afmtp, acontrol, &source->audio_iface, &audio_fds); } - janus_mutex_unlock(&mp_mutex); + janus_mutex_unlock(&mp->mutex); if(vresult == -1 && aresult == -1) { /* Both audio and video failed? Give up... */ From 65200da8adbbbffbf9fe4e307d4331a7eaba06c2 Mon Sep 17 00:00:00 2001 From: lucylu-star Date: Sun, 7 Feb 2021 10:35:48 +0800 Subject: [PATCH 06/10] del newline and brackets --- README.md | 1 - plugins/janus_streaming.c | 16 +++++----------- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index faff20b3ec..3c86cac1a6 100644 --- a/README.md +++ b/README.md @@ -344,4 +344,3 @@ Janus is thoroughly documented. You can find the current documentation, automati Any thought, feedback or (hopefully not!) insult is welcome! Developed by [@meetecho](https://github.com/meetecho) - diff --git a/plugins/janus_streaming.c b/plugins/janus_streaming.c index dfb507f4cc..965289a8a4 100644 --- a/plugins/janus_streaming.c +++ b/plugins/janus_streaming.c @@ -6474,12 +6474,9 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp int asport = 0, asport_rtcp = 0; multiple_fds audio_fds = {-1, -1}; - if(g_atomic_int_get(&mp->destroyed)){ - return -8; - } - - janus_mutex_lock(&mp->mutex); - + if(g_atomic_int_get(&mp->destroyed)) + return -8; + janus_mutex_lock(&mp->mutex); if(g_atomic_int_get(&mp->destroyed)){ janus_mutex_unlock(&mp->mutex); return -8; @@ -7638,11 +7635,8 @@ static void *janus_streaming_relay_thread(void *data) { close(source->video_rtcp_fd); } source->video_rtcp_fd = -1; - - if(g_atomic_int_get(&mountpoint->destroyed)){ - break; - } - + if(g_atomic_int_get(&mountpoint->destroyed)) + break; /* Now let's try to reconnect */ if(janus_streaming_rtsp_connect_to_server(mountpoint) < 0) { /* Reconnection failed? Let's try again later */ From 8df22534ab8e8754e5c3e559c4294afb7a8e8769 Mon Sep 17 00:00:00 2001 From: lucylu-star Date: Wed, 24 Feb 2021 10:47:49 +0800 Subject: [PATCH 07/10] remove the first check,only keep the second --- plugins/janus_streaming.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/plugins/janus_streaming.c b/plugins/janus_streaming.c index 965289a8a4..13aea76c5c 100644 --- a/plugins/janus_streaming.c +++ b/plugins/janus_streaming.c @@ -6473,14 +6473,13 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp abase[0] = '\0'; int asport = 0, asport_rtcp = 0; multiple_fds audio_fds = {-1, -1}; - - if(g_atomic_int_get(&mp->destroyed)) - return -8; - janus_mutex_lock(&mp->mutex); + + janus_mutex_lock(&mp->mutex); if(g_atomic_int_get(&mp->destroyed)){ janus_mutex_unlock(&mp->mutex); return -8; } + /* Parse both video and audio first before proceed to setup as curldata will be reused */ int vresult = -1; if(dovideo) { From ef8075bdd0cdba968d6d08db8169a16c0e04c3f5 Mon Sep 17 00:00:00 2001 From: lucylu-star Date: Wed, 7 Apr 2021 19:43:14 +0800 Subject: [PATCH 08/10] merge --- plugins/janus_streaming.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/plugins/janus_streaming.c b/plugins/janus_streaming.c index 13aea76c5c..84e089a24e 100644 --- a/plugins/janus_streaming.c +++ b/plugins/janus_streaming.c @@ -6474,11 +6474,10 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp int asport = 0, asport_rtcp = 0; multiple_fds audio_fds = {-1, -1}; - janus_mutex_lock(&mp->mutex); if(g_atomic_int_get(&mp->destroyed)){ - janus_mutex_unlock(&mp->mutex); return -8; } + janus_mutex_lock(&mountpoints_mutex); /* Parse both video and audio first before proceed to setup as curldata will be reused */ int vresult = -1; @@ -6491,7 +6490,7 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp aresult = janus_streaming_rtsp_parse_sdp(curldata->buffer, name, "audio", abase, &apt, atransport, ahost, artpmap, afmtp, acontrol, &source->audio_iface, &audio_fds); } - janus_mutex_unlock(&mp->mutex); + janus_mutex_unlock(&mountpoints_mutex); if(vresult == -1 && aresult == -1) { /* Both audio and video failed? Give up... */ @@ -7635,7 +7634,7 @@ static void *janus_streaming_relay_thread(void *data) { } source->video_rtcp_fd = -1; if(g_atomic_int_get(&mountpoint->destroyed)) - break; + break; /* Now let's try to reconnect */ if(janus_streaming_rtsp_connect_to_server(mountpoint) < 0) { /* Reconnection failed? Let's try again later */ From b2296a9805db086702f6f8ee26aa5a54a000764f Mon Sep 17 00:00:00 2001 From: lucylu-star Date: Thu, 8 Apr 2021 18:10:56 +0800 Subject: [PATCH 09/10] just fix code --- plugins/janus_streaming.c | 235 ++++++++------------------------------ 1 file changed, 46 insertions(+), 189 deletions(-) diff --git a/plugins/janus_streaming.c b/plugins/janus_streaming.c index 84e089a24e..9ebda94558 100644 --- a/plugins/janus_streaming.c +++ b/plugins/janus_streaming.c @@ -21,7 +21,7 @@ * * For what concerns types 1. and 2., considering the proof of concept * nature of the implementation the only pre-recorded media files - * that the plugins supports right now are Opus, raw mu-Law and a-Law files: + * that the plugins supports right now are raw mu-Law and a-Law files: * support is of course planned for other additional widespread formats * as well. * @@ -134,20 +134,12 @@ so neither Janus nor the Streaming plugin have access to anything. DO NOT SET THIS PROPERTY IF YOU DON'T KNOW WHAT YOU'RE DOING! e2ee = true -The following options are only valid for the 'rtsp' type: +The following options are only valid for the 'rstp' type: url = RTSP stream URL rtsp_user = RTSP authorization username, if needed rtsp_pwd = RTSP authorization password, if needed rtsp_failcheck = whether an error should be returned if connecting to the RTSP server fails (default=true) rtspiface = network interface IP address or device name to listen on when receiving RTSP streams -rtsp_reconnect_delay = after n seconds passed and no media assumed, the RTSP server has gone and schedule a reconnect (default=5s) -rtsp_session_timeout = by default the streaming plugin will check the RTSP connection with an OPTIONS query, - the value of the timeout comes from the RTSP session initializer and by default - this session timeout is the half of this value In some cases this value can be too high (for example more than one minute) - because of the media server. In that case this plugin will calculate the timeout with this - formula: timeout = min(session_timeout, rtsp_session_timeout / 2). (default=0s) -rtsp_timeout = communication timeout (CURLOPT_TIMEOUT) for cURL call gathering the RTSP information (default=10s) -rtsp_conn_timeout = connection timeout for cURL (CURLOPT_CONNECTTIMEOUT) call gathering the RTSP information (default=5s) \endverbatim * * \section streamapi Streaming API @@ -340,7 +332,7 @@ rtsp_conn_timeout = connection timeout for cURL (CURLOPT_CONNECTTIMEOUT) call ga * * Once you created a mountpoint, you can modify some (not all) of its * properties via an \c edit request. Namely, you can only modify generic - * properties like the mountpoint description, the secret, the PIN and + * properties like the mountoint description, the secret, the PIN and * whether or not the mountpoint should be listable. All other properties * are considered to be immutable. Again, you can choose whether the changes * should be permanent, e.g., saved to configuration file, or not. Notice @@ -729,11 +721,6 @@ rtsp_conn_timeout = connection timeout for cURL (CURLOPT_CONNECTTIMEOUT) call ga #include "../utils.h" #include "../ip-utils.h" -/* Default settings */ -#define JANUS_STREAMING_DEFAULT_SESSION_TIMEOUT 0 /* Overwrite the RTSP session timeout. If set to zero, the RTSP timeout is derived from a session. */ -#define JANUS_STREAMING_DEFAULT_RECONNECT_DELAY 5 /* Reconnecting delay in seconds. */ -#define JANUS_STREAMING_DEFAULT_CURL_TIMEOUT 10L /* Communication timeout for cURL. */ -#define JANUS_STREAMING_DEFAULT_CURL_CONNECT_TIMEOUT 5L /* Connection timeout for cURL. */ /* Plugin information */ #define JANUS_STREAMING_VERSION 8 @@ -868,10 +855,6 @@ static struct janus_json_parameter rtsp_parameters[] = { {"url", JSON_STRING, 0}, {"rtsp_user", JSON_STRING, 0}, {"rtsp_pwd", JSON_STRING, 0}, - {"rtsp_reconnect_delay", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, - {"rtsp_session_timeout", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, - {"rtsp_timeout", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, - {"rtsp_conn_timeout", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"audiortpmap", JSON_STRING, 0}, {"audiofmtp", JSON_STRING, 0}, {"audiopt", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, @@ -879,7 +862,6 @@ static struct janus_json_parameter rtsp_parameters[] = { {"videofmtp", JSON_STRING, 0}, {"videopt", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"videobufferkf", JANUS_JSON_BOOL, 0}, - {"threads", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"rtspiface", JSON_STRING, 0}, {"rtsp_failcheck", JANUS_JSON_BOOL, 0} }; @@ -1010,7 +992,6 @@ typedef struct janus_streaming_rtp_relay_packet { janus_rtp_header *data; gint length; gboolean is_rtp; /* This may be a data packet and not RTP */ - gboolean is_data; gboolean is_video; gboolean is_keyframe; gboolean simulcast; @@ -1083,14 +1064,10 @@ typedef struct janus_streaming_rtp_source { janus_streaming_buffer *curldata; char *rtsp_url; char *rtsp_username, *rtsp_password; - gint64 ka_timeout; + int ka_timeout; char *rtsp_ahost, *rtsp_vhost; gboolean reconnecting; gint64 reconnect_timer; - gint64 reconnect_delay; - gint64 session_timeout; - int rtsp_timeout; - int rtsp_conn_timeout; janus_mutex rtsp_mutex; #endif janus_streaming_rtp_keyframe keyframe; @@ -1152,14 +1129,14 @@ typedef struct janus_streaming_mountpoint { janus_streaming_codecs codecs; gboolean audio, video, data; GList *viewers; - int helper_threads; /* Only relevant for RTP/RTSP mountpoints */ - GList *threads; /* Only relevant for RTP/RTSP mountpoints */ + int helper_threads; /* Only relevant for RTP mountpoints */ + GList *threads; /* Only relevant for RTP mountpoints */ volatile gint destroyed; janus_mutex mutex; janus_refcount ref; } janus_streaming_mountpoint; GHashTable *mountpoints = NULL, *mountpoints_temp = NULL; -janus_mutex mountpoints_mutex = JANUS_MUTEX_INITIALIZER; +janus_mutex mountpoints_mutex; static char *admin_key = NULL; typedef struct janus_streaming_helper { @@ -1208,10 +1185,10 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source( char *url, char *username, char *password, gboolean doaudio, int audiopt, char *artpmap, char *afmtp, gboolean dovideo, int videopt, char *vrtpmap, char *vfmtp, gboolean bufferkf, - const janus_network_address *iface, int threads, - gint64 reconnect_delay, gint64 session_timeout, int rtsp_timeout, int rtsp_conn_timeout, + const janus_network_address *iface, gboolean error_on_failure); + typedef struct janus_streaming_message { janus_plugin_session *handle; char *transaction; @@ -1528,7 +1505,7 @@ static void janus_streaming_rtcp_pli_send(janus_streaming_rtp_source *source) { int sent = 0; if((sent = sendto(source->video_rtcp_fd, rtcp_buf, rtcp_len, 0, (struct sockaddr *)&source->video_rtcp_addr, sizeof(source->video_rtcp_addr))) < 0) { - JANUS_LOG(LOG_ERR, "Error in sendto... %d (%s)\n", errno, g_strerror(errno)); + JANUS_LOG(LOG_ERR, "Error in sendto... %d (%s)\n", errno, strerror(errno)); } else { JANUS_LOG(LOG_HUGE, "Sent %d/%d bytes\n", sent, rtcp_len); } @@ -1553,7 +1530,7 @@ static void janus_streaming_rtcp_remb_send(janus_streaming_rtp_source *source) { int sent = 0; if((sent = sendto(source->video_rtcp_fd, rtcp_buf, rtcp_len, 0, (struct sockaddr *)&source->video_rtcp_addr, sizeof(source->video_rtcp_addr))) < 0) { - JANUS_LOG(LOG_ERR, "Error in sendto... %d (%s)\n", errno, g_strerror(errno)); + JANUS_LOG(LOG_ERR, "Error in sendto... %d (%s)\n", errno, strerror(errno)); } else { JANUS_LOG(LOG_HUGE, "Sent %d/%d bytes\n", sent, rtcp_len); } @@ -1597,7 +1574,7 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) { struct ifaddrs *ifas = NULL; if(getifaddrs(&ifas) == -1) { JANUS_LOG(LOG_ERR, "Unable to acquire list of network devices/interfaces; some configurations may not work as expected... %d (%s)\n", - errno, g_strerror(errno)); + errno, strerror(errno)); } /* Read configuration */ @@ -2121,11 +2098,6 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) { janus_config_item *vkf = janus_config_get(config, cat, janus_config_type_item, "videobufferkf"); janus_config_item *iface = janus_config_get(config, cat, janus_config_type_item, "rtspiface"); janus_config_item *failerr = janus_config_get(config, cat, janus_config_type_item, "rtsp_failcheck"); - janus_config_item *threads = janus_config_get(config, cat, janus_config_type_item, "threads"); - janus_config_item *reconnect_delay = janus_config_get(config, cat, janus_config_type_item, "rtsp_reconnect_delay"); - janus_config_item *session_timeout = janus_config_get(config, cat, janus_config_type_item, "rtsp_session_timeout"); - janus_config_item *rtsp_timeout = janus_config_get(config, cat, janus_config_type_item, "rtsp_timeout"); - janus_config_item *rtsp_conn_timeout = janus_config_get(config, cat, janus_config_type_item, "rtsp_conn_timeout"); janus_network_address iface_value; if(file == NULL || file->value == NULL) { JANUS_LOG(LOG_ERR, "Can't add 'rtsp' mountpoint '%s', missing mandatory information...\n", cat->name); @@ -2139,11 +2111,6 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) { gboolean error_on_failure = TRUE; if(failerr && failerr->value) error_on_failure = janus_is_true(failerr->value); - if(threads && threads->value && atoi(threads->value) < 0) { - JANUS_LOG(LOG_ERR, "Can't add 'rtsp' mountpoint '%s', invalid threads configuration...\n", cat->name); - cl = cl->next; - continue; - } if((doaudio || dovideo) && iface && iface->value) { if(!ifas) { @@ -2177,11 +2144,6 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) { vfmtp ? (char *)vfmtp->value : NULL, bufferkf, iface && iface->value ? &iface_value : NULL, - (threads && threads->value) ? atoi(threads->value) : 0, - ((reconnect_delay && reconnect_delay->value) ? atoi(reconnect_delay->value) : JANUS_STREAMING_DEFAULT_RECONNECT_DELAY) * G_USEC_PER_SEC, - ((session_timeout && session_timeout->value) ? atoi(session_timeout->value) : JANUS_STREAMING_DEFAULT_SESSION_TIMEOUT) * G_USEC_PER_SEC, - ((rtsp_timeout && rtsp_timeout->value) ? atoi(rtsp_timeout->value) : JANUS_STREAMING_DEFAULT_CURL_TIMEOUT), - ((rtsp_conn_timeout && rtsp_conn_timeout->value) ? atoi(rtsp_conn_timeout->value) : JANUS_STREAMING_DEFAULT_CURL_CONNECT_TIMEOUT), error_on_failure)) == NULL) { JANUS_LOG(LOG_ERR, "Error creating 'rtsp' mountpoint '%s'...\n", cat->name); cl = cl->next; @@ -2690,7 +2652,7 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi if(getifaddrs(&ifas) == -1) { JANUS_LOG(LOG_ERR, "Unable to acquire list of network devices/interfaces; some configurations may not work as expected... %d (%s)\n", - errno, g_strerror(errno)); + errno, strerror(errno)); } json_t *type = json_object_get(root, "type"); @@ -3189,12 +3151,7 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi json_t *username = json_object_get(root, "rtsp_user"); json_t *password = json_object_get(root, "rtsp_pwd"); json_t *iface = json_object_get(root, "rtspiface"); - json_t *threads = json_object_get(root, "threads"); json_t *failerr = json_object_get(root, "rtsp_failcheck"); - json_t *reconnect_delay = json_object_get(root, "rtsp_reconnect_delay"); - json_t *session_timeout = json_object_get(root, "rtsp_session_timeout"); - json_t *rtsp_timeout = json_object_get(root, "rtsp_timeout"); - json_t *rtsp_conn_timeout = json_object_get(root, "rtsp_conn_timeout"); if(failerr == NULL) /* For an old typo, we support the legacy syntax too */ failerr = json_object_get(root, "rtsp_check"); gboolean doaudio = audio ? json_is_true(audio) : FALSE; @@ -3237,11 +3194,7 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi dovideo, (videopt ? json_integer_value(videopt) : -1), (char *)json_string_value(videortpmap), (char *)json_string_value(videofmtp), videobufferkf ? json_is_true(videobufferkf) : FALSE, - &multicast_iface, (threads ? json_integer_value(threads) : 0), - ((reconnect_delay ? json_integer_value(reconnect_delay) : JANUS_STREAMING_DEFAULT_RECONNECT_DELAY) * G_USEC_PER_SEC), - ((session_timeout ? json_integer_value(session_timeout) : JANUS_STREAMING_DEFAULT_SESSION_TIMEOUT) * G_USEC_PER_SEC), - (rtsp_timeout ? json_integer_value(rtsp_timeout) : JANUS_STREAMING_DEFAULT_CURL_TIMEOUT), - (rtsp_conn_timeout ? json_integer_value(rtsp_conn_timeout) : JANUS_STREAMING_DEFAULT_CURL_CONNECT_TIMEOUT), + &multicast_iface, error_on_failure); janus_mutex_lock(&mountpoints_mutex); g_hash_table_remove(mountpoints_temp, string_ids ? (gpointer)mpid_str : (gpointer)&mpid); @@ -3399,10 +3352,6 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi json_t *iface = json_object_get(root, "rtspiface"); if(iface) janus_config_add(config, c, janus_config_item_create("rtspiface", json_string_value(iface))); - if(mp->helper_threads > 0) { - g_snprintf(value, BUFSIZ, "%d", mp->helper_threads); - janus_config_add(config, c, janus_config_item_create("threads", value)); - } } /* Some more common values */ if(mp->secret) @@ -3613,10 +3562,6 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi json_t *iface = json_object_get(root, "rtspiface"); if(iface) janus_config_add(config, c, janus_config_item_create("rtspiface", json_string_value(iface))); - if(mp->helper_threads > 0) { - g_snprintf(value, BUFSIZ, "%d", mp->helper_threads); - janus_config_add(config, c, janus_config_item_create("threads", value)); - } } else { janus_config_add(config, c, janus_config_item_create("type", "rtp")); janus_config_add(config, c, janus_config_item_create("audio", mp->codecs.audio_pt >= 0 ? "yes" : "no")); @@ -4996,11 +4941,6 @@ static void *janus_streaming_handler(void *data) { goto error; } JANUS_LOG(LOG_VERB, "Starting the streaming\n"); - if(g_atomic_int_get(&session->paused) == 1) { - /* We were paused: reset the sequence number in RTP packets */ - session->context.a_seq_reset = TRUE; - session->context.v_seq_reset = TRUE; - } g_atomic_int_set(&session->paused, 0); result = json_object(); /* We wait for the setup_media event to start: on the other hand, it may have already arrived */ @@ -5338,8 +5278,6 @@ static void *janus_streaming_handler(void *data) { janus_mutex_unlock(&helper->mutex); } session->mountpoint = mp; - /* Send a PLI too, in case the mountpoint supports video and RTCP */ - janus_streaming_rtcp_pli_send(mp->source); g_atomic_int_set(&session->paused, 0); janus_mutex_unlock(&session->mutex); janus_mutex_unlock(&mp->mutex); @@ -5453,14 +5391,14 @@ static int janus_streaming_create_fd(int port, in_addr_t mcast, const janus_netw fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if(fd < 0) { JANUS_LOG(LOG_ERR, "[%s] Cannot create socket for %s... %d (%s)\n", - mountpointname, medianame, errno, g_strerror(errno)); + mountpointname, medianame, errno, strerror(errno)); break; } #ifdef IP_MULTICAST_ALL int mc_all = 0; if((setsockopt(fd, IPPROTO_IP, IP_MULTICAST_ALL, (void*) &mc_all, sizeof(mc_all))) < 0) { JANUS_LOG(LOG_ERR, "[%s] %s listener setsockopt IP_MULTICAST_ALL failed... %d (%s)\n", - mountpointname, listenername, errno, g_strerror(errno)); + mountpointname, listenername, errno, strerror(errno)); close(fd); janus_mutex_unlock(&fd_mutex); return -1; @@ -5490,7 +5428,7 @@ static int janus_streaming_create_fd(int port, in_addr_t mcast, const janus_netw } if(setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) == -1) { JANUS_LOG(LOG_ERR, "[%s] %s listener IP_ADD_MEMBERSHIP failed... %d (%s)\n", - mountpointname, listenername, errno, g_strerror(errno)); + mountpointname, listenername, errno, strerror(errno)); close(fd); janus_mutex_unlock(&fd_mutex); return -1; @@ -5518,7 +5456,7 @@ static int janus_streaming_create_fd(int port, in_addr_t mcast, const janus_netw int reuse = 1; if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1) { JANUS_LOG(LOG_ERR, "[%s] %s listener setsockopt SO_REUSEADDR failed... %d (%s)\n", - mountpointname, listenername, errno, g_strerror(errno)); + mountpointname, listenername, errno, strerror(errno)); close(fd); janus_mutex_unlock(&fd_mutex); return -1; @@ -5555,12 +5493,12 @@ static int janus_streaming_create_fd(int port, in_addr_t mcast, const janus_netw int v6only = 0; if(fd < 0) { JANUS_LOG(LOG_ERR, "[%s] Cannot create socket for %s... %d (%s)\n", - mountpointname, medianame, errno, g_strerror(errno)); + mountpointname, medianame, errno, strerror(errno)); break; } if(family != AF_INET && setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &v6only, sizeof(v6only)) != 0) { JANUS_LOG(LOG_ERR, "[%s] setsockopt on socket failed for %s... %d (%s)\n", - mountpointname, medianame, errno, g_strerror(errno)); + mountpointname, medianame, errno, strerror(errno)); break; } } @@ -5570,7 +5508,7 @@ static int janus_streaming_create_fd(int port, in_addr_t mcast, const janus_netw fd = -1; if(!quiet) { JANUS_LOG(LOG_ERR, "[%s] Bind failed for %s (port %d)... %d (%s)\n", - mountpointname, medianame, port, errno, g_strerror(errno)); + mountpointname, medianame, port, errno, strerror(errno)); } if(!use_range) /* Asked for a specific port but it's not available, give up */ break; @@ -5635,7 +5573,7 @@ static int janus_streaming_allocate_port_pair(const char *name, const char *medi static int janus_streaming_get_fd_port(int fd) { struct sockaddr_in6 server = { 0 }; socklen_t len = sizeof(server); - if(getsockname(fd, (struct sockaddr *)&server, &len) == -1) { + if(getsockname(fd, &server, &len) == -1) { return -1; } @@ -6128,6 +6066,7 @@ janus_streaming_mountpoint *janus_streaming_create_rtp_source( /* This extra unref is for the init */ janus_refcount_decrease(&helper->ref); janus_streaming_mountpoint_destroy(live_rtp); + g_free(helper); return NULL; } janus_refcount_increase(&helper->ref); @@ -6365,11 +6304,6 @@ static int janus_streaming_rtsp_parse_sdp(const char *buffer, const char *name, return 0; } -/* Helper function to calculating the minimum value if 'a' is bigger than zero */ -static inline gint64 janus_streaming_min_if(gint64 a, gint64 b) { - return a > 0 ? (a > b ? b : a) : b; -} - /* Static helper to connect to an RTSP server, considering we might do this either * when creating a new mountpoint, or when reconnecting after some failure */ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp) { @@ -6392,8 +6326,8 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L); curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1L); curl_easy_setopt(curl, CURLOPT_URL, source->rtsp_url); - curl_easy_setopt(curl, CURLOPT_TIMEOUT, source->rtsp_timeout); - curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, source->rtsp_conn_timeout); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, 10L); + curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 5L); curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 0L); curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); #if CURL_AT_LEAST_VERSION(7, 66, 0) @@ -6445,6 +6379,7 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp } JANUS_LOG(LOG_VERB, "DESCRIBE answer:%s\n", curldata->buffer); /* Parse the SDP we just got to figure out the negotiated media */ + int ka_timeout = 0; int vpt = -1; char vrtpmap[2048]; vrtpmap[0] = '\0'; @@ -6478,7 +6413,6 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp return -8; } janus_mutex_lock(&mountpoints_mutex); - /* Parse both video and audio first before proceed to setup as curldata will be reused */ int vresult = -1; if(dovideo) { @@ -6619,8 +6553,8 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp } else if(is_session) { if(!strcasecmp(name, "timeout")) { /* Take note of the timeout, for keep-alives */ - source->ka_timeout = janus_streaming_min_if(source->session_timeout, atoi(value) / 2 * G_USEC_PER_SEC); - JANUS_LOG(LOG_VERB, " -- RTSP session timeout (video): %"SCNi64" ms\n", source->ka_timeout / 1000); + ka_timeout = atoi(value); + JANUS_LOG(LOG_VERB, " -- RTSP session timeout (video): %d\n", ka_timeout); } } } @@ -6792,8 +6726,8 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp } else if(is_session) { if(!strcasecmp(name, "timeout")) { /* Take note of the timeout, for keep-alives */ - source->ka_timeout = janus_streaming_min_if(source->session_timeout, atoi(value) / 2 * G_USEC_PER_SEC); - JANUS_LOG(LOG_VERB, " -- RTSP session timeout (audio): %"SCNi64" ms\n", source->ka_timeout / 1000); + ka_timeout = atoi(value); + JANUS_LOG(LOG_VERB, " -- RTSP session timeout (audio): %d\n", ka_timeout); } } } @@ -6895,6 +6829,7 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp source->rtsp_vhost = g_strdup(vhost); source->curl = curl; source->curldata = curldata; + source->ka_timeout = ka_timeout; return 0; } @@ -6994,8 +6929,7 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source( char *url, char *username, char *password, gboolean doaudio, int acodec, char *artpmap, char *afmtp, gboolean dovideo, int vcodec, char *vrtpmap, char *vfmtp, gboolean bufferkf, - const janus_network_address *iface, int threads, - gint64 reconnect_delay, gint64 session_timeout, int rtsp_timeout, int rtsp_conn_timeout, + const janus_network_address *iface, gboolean error_on_failure) { char id_num[30]; if(!string_ids) { @@ -7006,23 +6940,6 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source( JANUS_LOG(LOG_ERR, "Can't add 'rtsp' stream, missing url...\n"); return NULL; } - if(reconnect_delay < 0) { - JANUS_LOG(LOG_ERR, "rtsp_reconnect_delay can't be smaller than zero.\n"); - return NULL; - } - if(session_timeout < 0) { - JANUS_LOG(LOG_ERR, "rtsp_session_timeout can't be smaller than zero.\n"); - return NULL; - } - if(rtsp_timeout < 0) { - JANUS_LOG(LOG_ERR, "rtsp_timeout can't be smaller than zero.\n"); - return NULL; - } - if(rtsp_conn_timeout < 0) { - JANUS_LOG(LOG_ERR, "rtsp_conn_timeout can't be smaller than zero.\n"); - return NULL; - } - JANUS_LOG(LOG_VERB, "Audio %s, Video %s\n", doaudio ? "enabled" : "NOT enabled", dovideo ? "enabled" : "NOT enabled"); /* Create an RTP source for the media we'll get */ @@ -7087,11 +7004,6 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source( live_rtsp_source->keyframe.latest_keyframe = NULL; live_rtsp_source->keyframe.temp_keyframe = NULL; live_rtsp_source->keyframe.temp_ts = 0; - live_rtsp_source->ka_timeout = session_timeout; - live_rtsp_source->reconnect_delay = reconnect_delay; - live_rtsp_source->session_timeout = session_timeout; - live_rtsp_source->rtsp_timeout = rtsp_timeout; - live_rtsp_source->rtsp_conn_timeout = rtsp_conn_timeout; janus_mutex_init(&live_rtsp_source->keyframe.mutex); live_rtsp_source->reconnect_timer = 0; janus_mutex_init(&live_rtsp_source->rtsp_mutex); @@ -7129,44 +7041,9 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source( return NULL; } } - /* If we need helper threads, spawn them now */ + /* Start the thread that will receive the media packets */ GError *error = NULL; char tname[16]; - if(threads > 0) { - int i=0; - for(i=0; iid = i+1; - helper->mp = live_rtsp; - helper->queued_packets = g_async_queue_new_full((GDestroyNotify)janus_streaming_rtp_relay_packet_free); - janus_mutex_init(&helper->mutex); - janus_refcount_init(&helper->ref, janus_streaming_helper_free); - live_rtsp->helper_threads++; - /* Spawn a thread and add references */ - g_snprintf(tname, sizeof(tname), "help %u-%"SCNu64, helper->id, live_rtsp->id); - janus_refcount_increase(&live_rtsp->ref); - janus_refcount_increase(&helper->ref); - helper->thread = g_thread_try_new(tname, &janus_streaming_helper_thread, helper, &error); - if(error != NULL) { - JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the helper thread...\n", - error->code, error->message ? error->message : "??"); - g_error_free(error); - janus_refcount_decrease(&live_rtsp->ref); /* This is for the helper thread */ - g_async_queue_unref(helper->queued_packets); - janus_refcount_decrease(&helper->ref); - /* This extra unref is for the init */ - janus_refcount_decrease(&helper->ref); - janus_mutex_lock(&mountpoints_mutex); - g_hash_table_remove(mountpoints_temp, &id); - janus_mutex_unlock(&mountpoints_mutex); - janus_refcount_decrease(&live_rtsp->ref); - return NULL; - } - janus_refcount_increase(&helper->ref); - live_rtsp->threads = g_list_append(live_rtsp->threads, helper); - } - } - /* Finally, start the thread that will receive the media packets */ g_snprintf(tname, sizeof(tname), "mp %s", live_rtsp->id_str); janus_refcount_increase(&live_rtsp->ref); live_rtsp->thread = g_thread_try_new(tname, &janus_streaming_relay_thread, live_rtsp, &error); @@ -7196,8 +7073,7 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source( char *url, char *username, char *password, gboolean doaudio, int acodec, char *audiortpmap, char *audiofmtp, gboolean dovideo, int vcodec, char *videortpmap, char *videofmtp, gboolean bufferkf, - const janus_network_address *iface, int threads, - gint64 reconnect_delay, gint64 session_timeout, int rtsp_timeout, int rtsp_conn_timeout, + const janus_network_address *iface, gboolean error_on_failure) { JANUS_LOG(LOG_ERR, "RTSP need libcurl\n"); return NULL; @@ -7292,10 +7168,7 @@ static void *janus_streaming_ondemand_thread(void *data) { now.tv_usec = before.tv_usec; time_t passed, d_s, d_us; /* Loop */ - gint read = 0; -#ifdef HAVE_LIBOGG - const gint plen = (sizeof(buf)-RTP_HEADER_SIZE); -#endif + gint read = 0, plen = (sizeof(buf)-RTP_HEADER_SIZE); janus_streaming_rtp_relay_packet packet; while(!g_atomic_int_get(&stopping) && !g_atomic_int_get(&mountpoint->destroyed) && !g_atomic_int_get(&session->stopping) && !g_atomic_int_get(&session->destroyed)) { @@ -7442,10 +7315,7 @@ static void *janus_streaming_filesource_thread(void *data) { now.tv_usec = before.tv_usec; time_t passed, d_s, d_us; /* Loop */ - gint read = 0; -#ifdef HAVE_LIBOGG - const gint plen = (sizeof(buf)-RTP_HEADER_SIZE); -#endif + gint read = 0, plen = (sizeof(buf)-RTP_HEADER_SIZE); janus_streaming_rtp_relay_packet packet; while(!g_atomic_int_get(&stopping) && !g_atomic_int_get(&mountpoint->destroyed)) { /* See if it's time to prepare a frame */ @@ -7571,7 +7441,7 @@ static void *janus_streaming_relay_thread(void *data) { gint64 now = janus_get_monotonic_time(), before = now, ka_timeout = 0; if(source->rtsp) { source->reconnect_timer = now; - ka_timeout = source->ka_timeout; + ka_timeout = ((gint64)source->ka_timeout*G_USEC_PER_SEC)/2; } #endif /* Loop */ @@ -7587,8 +7457,8 @@ static void *janus_streaming_relay_thread(void *data) { continue; } now = janus_get_monotonic_time(); - if(!source->reconnecting && (now - source->reconnect_timer > source->reconnect_delay)) { - /* Assume the RTSP server has gone and schedule a reconnect */ + if(!source->reconnecting && (now - source->reconnect_timer > 5*G_USEC_PER_SEC)) { + /* 5 seconds passed and no media? Assume the RTSP server has gone and schedule a reconnect */ JANUS_LOG(LOG_WARN, "[%s] %"SCNi64"s passed with no media, trying to reconnect the RTSP stream\n", name, (now - source->reconnect_timer)/G_USEC_PER_SEC); audio_fd = -1; @@ -7652,7 +7522,7 @@ static void *janus_streaming_relay_thread(void *data) { data_fd = source->data_fd; audio_rtcp_fd = source->audio_rtcp_fd; video_rtcp_fd = source->video_rtcp_fd; - ka_timeout = source->ka_timeout; + ka_timeout = ((gint64)source->ka_timeout*G_USEC_PER_SEC)/2; } } source->reconnect_timer = janus_get_monotonic_time(); @@ -7662,7 +7532,7 @@ static void *janus_streaming_relay_thread(void *data) { } if(audio_fd < 0 && video_fd[0] < 0 && video_fd[1] < 0 && video_fd[2] < 0 && data_fd < 0) { /* No socket, we may be in the process of reconnecting, or waiting to reconnect */ - g_usleep(source->reconnect_delay); + g_usleep(5000000); continue; } /* We may also need to occasionally send a OPTIONS request as a keep-alive */ @@ -7751,10 +7621,10 @@ static void *janus_streaming_relay_thread(void *data) { resfd = poll(fds, num, 1000); if(resfd < 0) { if(errno == EINTR) { - JANUS_LOG(LOG_HUGE, "[%s] Got an EINTR (%s), ignoring...\n", name, g_strerror(errno)); + JANUS_LOG(LOG_HUGE, "[%s] Got an EINTR (%s), ignoring...\n", name, strerror(errno)); continue; } - JANUS_LOG(LOG_ERR, "[%s] Error polling... %d (%s)\n", name, errno, g_strerror(errno)); + JANUS_LOG(LOG_ERR, "[%s] Error polling... %d (%s)\n", name, errno, strerror(errno)); mountpoint->enabled = FALSE; janus_mutex_lock(&source->rec_mutex); if(source->arc) { @@ -7789,7 +7659,7 @@ static void *janus_streaming_relay_thread(void *data) { if(fds[i].revents & (POLLERR | POLLHUP)) { /* Socket error? */ JANUS_LOG(LOG_ERR, "[%s] Error polling: %s... %d (%s)\n", name, - fds[i].revents & POLLERR ? "POLLERR" : "POLLHUP", errno, g_strerror(errno)); + fds[i].revents & POLLERR ? "POLLERR" : "POLLHUP", errno, strerror(errno)); mountpoint->enabled = FALSE; janus_mutex_lock(&source->rec_mutex); if(source->arc) { @@ -8132,7 +8002,6 @@ static void *janus_streaming_relay_thread(void *data) { packet.data = (janus_rtp_header *)data; packet.length = bytes; packet.is_rtp = FALSE; - packet.is_data = TRUE; packet.textdata = source->textdata; /* Is there a recorder? */ janus_recorder_save_frame(source->drc, data, bytes); @@ -8144,8 +8013,6 @@ static void *janus_streaming_relay_thread(void *data) { pkt->data = g_malloc(bytes); memcpy(pkt->data, data, bytes); packet.is_rtp = FALSE; - packet.is_data = TRUE; - packet.textdata = source->textdata; pkt->length = bytes; janus_mutex_unlock(&source->buffermsg_mutex); } @@ -8176,7 +8043,6 @@ static void *janus_streaming_relay_thread(void *data) { JANUS_LOG(LOG_HUGE, "[%s] Got audio RTCP feedback: SSRC %"SCNu32"\n", name, janus_rtcp_get_sender_ssrc(buffer, bytes)); /* Relay on all sessions */ - packet.is_rtp = FALSE; packet.is_video = FALSE; packet.data = (janus_rtp_header *)buffer; packet.length = bytes; @@ -8203,7 +8069,6 @@ static void *janus_streaming_relay_thread(void *data) { JANUS_LOG(LOG_HUGE, "[%s] Got video RTCP feedback: SSRC %"SCNu32"\n", name, janus_rtcp_get_sender_ssrc(buffer, bytes)); /* Relay on all sessions */ - packet.is_rtp = FALSE; packet.is_video = TRUE; packet.data = (janus_rtp_header *)buffer; packet.length = bytes; @@ -8451,13 +8316,6 @@ static void janus_streaming_relay_rtp_packet(gpointer data, gpointer user_data) /* Process this packet: don't relay if it's not the SSRC/layer we wanted to handle */ gboolean relay = janus_rtp_simulcasting_context_process_rtp(&session->sim_context, (char *)packet->data, packet->length, packet->ssrc, NULL, packet->codec, &session->context); - if(!relay) { - /* Did a lot of time pass before we could relay a packet? */ - gint64 now = janus_get_monotonic_time(); - if((now - session->sim_context.last_relayed) >= G_USEC_PER_SEC) { - g_atomic_int_set(&session->sim_context.need_pli, 1); - } - } if(session->sim_context.need_pli) { /* Schedule a PLI */ JANUS_LOG(LOG_VERB, "We need a PLI for the simulcast context\n"); @@ -8555,6 +8413,7 @@ static void janus_streaming_relay_rtp_packet(gpointer data, gpointer user_data) return; } + static void janus_streaming_relay_rtcp_packet(gpointer data, gpointer user_data) { janus_streaming_rtp_relay_packet *packet = (janus_streaming_rtp_relay_packet *)user_data; if(!packet || !packet->data || packet->length < 1) { @@ -8595,8 +8454,6 @@ static void janus_streaming_helper_rtprtcp_packet(gpointer data, gpointer user_d memcpy(copy->data, packet->data, packet->length); copy->length = packet->length; copy->is_rtp = packet->is_rtp; - copy->is_data = packet->is_data; - copy->textdata = packet->textdata; copy->is_video = packet->is_video; copy->is_keyframe = packet->is_keyframe; copy->simulcast = packet->simulcast; @@ -8621,7 +8478,7 @@ static void *janus_streaming_helper_thread(void *data) { break; janus_mutex_lock(&helper->mutex); g_list_foreach(helper->viewers, - pkt->is_rtp || pkt->is_data ? janus_streaming_relay_rtp_packet : janus_streaming_relay_rtcp_packet, + pkt->is_rtp ? janus_streaming_relay_rtp_packet : janus_streaming_relay_rtcp_packet, pkt); janus_mutex_unlock(&helper->mutex); janus_streaming_rtp_relay_packet_free(pkt); From b30a52d22cdd9f5e572ca7d738f5d768d23e73c1 Mon Sep 17 00:00:00 2001 From: lucylu-star Date: Thu, 8 Apr 2021 21:45:00 +0800 Subject: [PATCH 10/10] revert some removed by mistake --- plugins/janus_streaming.c | 234 ++++++++++++++++++++++++++++++-------- 1 file changed, 188 insertions(+), 46 deletions(-) diff --git a/plugins/janus_streaming.c b/plugins/janus_streaming.c index 9ebda94558..e473f2b581 100644 --- a/plugins/janus_streaming.c +++ b/plugins/janus_streaming.c @@ -21,7 +21,7 @@ * * For what concerns types 1. and 2., considering the proof of concept * nature of the implementation the only pre-recorded media files - * that the plugins supports right now are raw mu-Law and a-Law files: + * that the plugins supports right now are Opus, raw mu-Law and a-Law files: * support is of course planned for other additional widespread formats * as well. * @@ -134,12 +134,20 @@ so neither Janus nor the Streaming plugin have access to anything. DO NOT SET THIS PROPERTY IF YOU DON'T KNOW WHAT YOU'RE DOING! e2ee = true -The following options are only valid for the 'rstp' type: +The following options are only valid for the 'rtsp' type: url = RTSP stream URL rtsp_user = RTSP authorization username, if needed rtsp_pwd = RTSP authorization password, if needed rtsp_failcheck = whether an error should be returned if connecting to the RTSP server fails (default=true) rtspiface = network interface IP address or device name to listen on when receiving RTSP streams +rtsp_reconnect_delay = after n seconds passed and no media assumed, the RTSP server has gone and schedule a reconnect (default=5s) +rtsp_session_timeout = by default the streaming plugin will check the RTSP connection with an OPTIONS query, + the value of the timeout comes from the RTSP session initializer and by default + this session timeout is the half of this value In some cases this value can be too high (for example more than one minute) + because of the media server. In that case this plugin will calculate the timeout with this + formula: timeout = min(session_timeout, rtsp_session_timeout / 2). (default=0s) +rtsp_timeout = communication timeout (CURLOPT_TIMEOUT) for cURL call gathering the RTSP information (default=10s) +rtsp_conn_timeout = connection timeout for cURL (CURLOPT_CONNECTTIMEOUT) call gathering the RTSP information (default=5s) \endverbatim * * \section streamapi Streaming API @@ -332,7 +340,7 @@ rtspiface = network interface IP address or device name to listen on when receiv * * Once you created a mountpoint, you can modify some (not all) of its * properties via an \c edit request. Namely, you can only modify generic - * properties like the mountoint description, the secret, the PIN and + * properties like the mountpoint description, the secret, the PIN and * whether or not the mountpoint should be listable. All other properties * are considered to be immutable. Again, you can choose whether the changes * should be permanent, e.g., saved to configuration file, or not. Notice @@ -721,6 +729,11 @@ rtspiface = network interface IP address or device name to listen on when receiv #include "../utils.h" #include "../ip-utils.h" +/* Default settings */ +#define JANUS_STREAMING_DEFAULT_SESSION_TIMEOUT 0 /* Overwrite the RTSP session timeout. If set to zero, the RTSP timeout is derived from a session. */ +#define JANUS_STREAMING_DEFAULT_RECONNECT_DELAY 5 /* Reconnecting delay in seconds. */ +#define JANUS_STREAMING_DEFAULT_CURL_TIMEOUT 10L /* Communication timeout for cURL. */ +#define JANUS_STREAMING_DEFAULT_CURL_CONNECT_TIMEOUT 5L /* Connection timeout for cURL. */ /* Plugin information */ #define JANUS_STREAMING_VERSION 8 @@ -855,6 +868,10 @@ static struct janus_json_parameter rtsp_parameters[] = { {"url", JSON_STRING, 0}, {"rtsp_user", JSON_STRING, 0}, {"rtsp_pwd", JSON_STRING, 0}, + {"rtsp_reconnect_delay", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, + {"rtsp_session_timeout", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, + {"rtsp_timeout", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, + {"rtsp_conn_timeout", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"audiortpmap", JSON_STRING, 0}, {"audiofmtp", JSON_STRING, 0}, {"audiopt", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, @@ -862,6 +879,7 @@ static struct janus_json_parameter rtsp_parameters[] = { {"videofmtp", JSON_STRING, 0}, {"videopt", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"videobufferkf", JANUS_JSON_BOOL, 0}, + {"threads", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"rtspiface", JSON_STRING, 0}, {"rtsp_failcheck", JANUS_JSON_BOOL, 0} }; @@ -992,6 +1010,7 @@ typedef struct janus_streaming_rtp_relay_packet { janus_rtp_header *data; gint length; gboolean is_rtp; /* This may be a data packet and not RTP */ + gboolean is_data; gboolean is_video; gboolean is_keyframe; gboolean simulcast; @@ -1064,10 +1083,14 @@ typedef struct janus_streaming_rtp_source { janus_streaming_buffer *curldata; char *rtsp_url; char *rtsp_username, *rtsp_password; - int ka_timeout; + gint64 ka_timeout; char *rtsp_ahost, *rtsp_vhost; gboolean reconnecting; gint64 reconnect_timer; + gint64 reconnect_delay; + gint64 session_timeout; + int rtsp_timeout; + int rtsp_conn_timeout; janus_mutex rtsp_mutex; #endif janus_streaming_rtp_keyframe keyframe; @@ -1129,14 +1152,14 @@ typedef struct janus_streaming_mountpoint { janus_streaming_codecs codecs; gboolean audio, video, data; GList *viewers; - int helper_threads; /* Only relevant for RTP mountpoints */ - GList *threads; /* Only relevant for RTP mountpoints */ + int helper_threads; /* Only relevant for RTP/RTSP mountpoints */ + GList *threads; /* Only relevant for RTP/RTSP mountpoints */ volatile gint destroyed; janus_mutex mutex; janus_refcount ref; } janus_streaming_mountpoint; GHashTable *mountpoints = NULL, *mountpoints_temp = NULL; -janus_mutex mountpoints_mutex; +janus_mutex mountpoints_mutex = JANUS_MUTEX_INITIALIZER; static char *admin_key = NULL; typedef struct janus_streaming_helper { @@ -1185,10 +1208,10 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source( char *url, char *username, char *password, gboolean doaudio, int audiopt, char *artpmap, char *afmtp, gboolean dovideo, int videopt, char *vrtpmap, char *vfmtp, gboolean bufferkf, - const janus_network_address *iface, + const janus_network_address *iface, int threads, + gint64 reconnect_delay, gint64 session_timeout, int rtsp_timeout, int rtsp_conn_timeout, gboolean error_on_failure); - typedef struct janus_streaming_message { janus_plugin_session *handle; char *transaction; @@ -1505,7 +1528,7 @@ static void janus_streaming_rtcp_pli_send(janus_streaming_rtp_source *source) { int sent = 0; if((sent = sendto(source->video_rtcp_fd, rtcp_buf, rtcp_len, 0, (struct sockaddr *)&source->video_rtcp_addr, sizeof(source->video_rtcp_addr))) < 0) { - JANUS_LOG(LOG_ERR, "Error in sendto... %d (%s)\n", errno, strerror(errno)); + JANUS_LOG(LOG_ERR, "Error in sendto... %d (%s)\n", errno, g_strerror(errno)); } else { JANUS_LOG(LOG_HUGE, "Sent %d/%d bytes\n", sent, rtcp_len); } @@ -1530,7 +1553,7 @@ static void janus_streaming_rtcp_remb_send(janus_streaming_rtp_source *source) { int sent = 0; if((sent = sendto(source->video_rtcp_fd, rtcp_buf, rtcp_len, 0, (struct sockaddr *)&source->video_rtcp_addr, sizeof(source->video_rtcp_addr))) < 0) { - JANUS_LOG(LOG_ERR, "Error in sendto... %d (%s)\n", errno, strerror(errno)); + JANUS_LOG(LOG_ERR, "Error in sendto... %d (%s)\n", errno, g_strerror(errno)); } else { JANUS_LOG(LOG_HUGE, "Sent %d/%d bytes\n", sent, rtcp_len); } @@ -1574,7 +1597,7 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) { struct ifaddrs *ifas = NULL; if(getifaddrs(&ifas) == -1) { JANUS_LOG(LOG_ERR, "Unable to acquire list of network devices/interfaces; some configurations may not work as expected... %d (%s)\n", - errno, strerror(errno)); + errno, g_strerror(errno)); } /* Read configuration */ @@ -2098,6 +2121,11 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) { janus_config_item *vkf = janus_config_get(config, cat, janus_config_type_item, "videobufferkf"); janus_config_item *iface = janus_config_get(config, cat, janus_config_type_item, "rtspiface"); janus_config_item *failerr = janus_config_get(config, cat, janus_config_type_item, "rtsp_failcheck"); + janus_config_item *threads = janus_config_get(config, cat, janus_config_type_item, "threads"); + janus_config_item *reconnect_delay = janus_config_get(config, cat, janus_config_type_item, "rtsp_reconnect_delay"); + janus_config_item *session_timeout = janus_config_get(config, cat, janus_config_type_item, "rtsp_session_timeout"); + janus_config_item *rtsp_timeout = janus_config_get(config, cat, janus_config_type_item, "rtsp_timeout"); + janus_config_item *rtsp_conn_timeout = janus_config_get(config, cat, janus_config_type_item, "rtsp_conn_timeout"); janus_network_address iface_value; if(file == NULL || file->value == NULL) { JANUS_LOG(LOG_ERR, "Can't add 'rtsp' mountpoint '%s', missing mandatory information...\n", cat->name); @@ -2111,6 +2139,11 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) { gboolean error_on_failure = TRUE; if(failerr && failerr->value) error_on_failure = janus_is_true(failerr->value); + if(threads && threads->value && atoi(threads->value) < 0) { + JANUS_LOG(LOG_ERR, "Can't add 'rtsp' mountpoint '%s', invalid threads configuration...\n", cat->name); + cl = cl->next; + continue; + } if((doaudio || dovideo) && iface && iface->value) { if(!ifas) { @@ -2144,6 +2177,11 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) { vfmtp ? (char *)vfmtp->value : NULL, bufferkf, iface && iface->value ? &iface_value : NULL, + (threads && threads->value) ? atoi(threads->value) : 0, + ((reconnect_delay && reconnect_delay->value) ? atoi(reconnect_delay->value) : JANUS_STREAMING_DEFAULT_RECONNECT_DELAY) * G_USEC_PER_SEC, + ((session_timeout && session_timeout->value) ? atoi(session_timeout->value) : JANUS_STREAMING_DEFAULT_SESSION_TIMEOUT) * G_USEC_PER_SEC, + ((rtsp_timeout && rtsp_timeout->value) ? atoi(rtsp_timeout->value) : JANUS_STREAMING_DEFAULT_CURL_TIMEOUT), + ((rtsp_conn_timeout && rtsp_conn_timeout->value) ? atoi(rtsp_conn_timeout->value) : JANUS_STREAMING_DEFAULT_CURL_CONNECT_TIMEOUT), error_on_failure)) == NULL) { JANUS_LOG(LOG_ERR, "Error creating 'rtsp' mountpoint '%s'...\n", cat->name); cl = cl->next; @@ -2652,7 +2690,7 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi if(getifaddrs(&ifas) == -1) { JANUS_LOG(LOG_ERR, "Unable to acquire list of network devices/interfaces; some configurations may not work as expected... %d (%s)\n", - errno, strerror(errno)); + errno, g_strerror(errno)); } json_t *type = json_object_get(root, "type"); @@ -3151,7 +3189,12 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi json_t *username = json_object_get(root, "rtsp_user"); json_t *password = json_object_get(root, "rtsp_pwd"); json_t *iface = json_object_get(root, "rtspiface"); + json_t *threads = json_object_get(root, "threads"); json_t *failerr = json_object_get(root, "rtsp_failcheck"); + json_t *reconnect_delay = json_object_get(root, "rtsp_reconnect_delay"); + json_t *session_timeout = json_object_get(root, "rtsp_session_timeout"); + json_t *rtsp_timeout = json_object_get(root, "rtsp_timeout"); + json_t *rtsp_conn_timeout = json_object_get(root, "rtsp_conn_timeout"); if(failerr == NULL) /* For an old typo, we support the legacy syntax too */ failerr = json_object_get(root, "rtsp_check"); gboolean doaudio = audio ? json_is_true(audio) : FALSE; @@ -3194,7 +3237,11 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi dovideo, (videopt ? json_integer_value(videopt) : -1), (char *)json_string_value(videortpmap), (char *)json_string_value(videofmtp), videobufferkf ? json_is_true(videobufferkf) : FALSE, - &multicast_iface, + &multicast_iface, (threads ? json_integer_value(threads) : 0), + ((reconnect_delay ? json_integer_value(reconnect_delay) : JANUS_STREAMING_DEFAULT_RECONNECT_DELAY) * G_USEC_PER_SEC), + ((session_timeout ? json_integer_value(session_timeout) : JANUS_STREAMING_DEFAULT_SESSION_TIMEOUT) * G_USEC_PER_SEC), + (rtsp_timeout ? json_integer_value(rtsp_timeout) : JANUS_STREAMING_DEFAULT_CURL_TIMEOUT), + (rtsp_conn_timeout ? json_integer_value(rtsp_conn_timeout) : JANUS_STREAMING_DEFAULT_CURL_CONNECT_TIMEOUT), error_on_failure); janus_mutex_lock(&mountpoints_mutex); g_hash_table_remove(mountpoints_temp, string_ids ? (gpointer)mpid_str : (gpointer)&mpid); @@ -3352,6 +3399,10 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi json_t *iface = json_object_get(root, "rtspiface"); if(iface) janus_config_add(config, c, janus_config_item_create("rtspiface", json_string_value(iface))); + if(mp->helper_threads > 0) { + g_snprintf(value, BUFSIZ, "%d", mp->helper_threads); + janus_config_add(config, c, janus_config_item_create("threads", value)); + } } /* Some more common values */ if(mp->secret) @@ -3562,6 +3613,10 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi json_t *iface = json_object_get(root, "rtspiface"); if(iface) janus_config_add(config, c, janus_config_item_create("rtspiface", json_string_value(iface))); + if(mp->helper_threads > 0) { + g_snprintf(value, BUFSIZ, "%d", mp->helper_threads); + janus_config_add(config, c, janus_config_item_create("threads", value)); + } } else { janus_config_add(config, c, janus_config_item_create("type", "rtp")); janus_config_add(config, c, janus_config_item_create("audio", mp->codecs.audio_pt >= 0 ? "yes" : "no")); @@ -4941,6 +4996,11 @@ static void *janus_streaming_handler(void *data) { goto error; } JANUS_LOG(LOG_VERB, "Starting the streaming\n"); + if(g_atomic_int_get(&session->paused) == 1) { + /* We were paused: reset the sequence number in RTP packets */ + session->context.a_seq_reset = TRUE; + session->context.v_seq_reset = TRUE; + } g_atomic_int_set(&session->paused, 0); result = json_object(); /* We wait for the setup_media event to start: on the other hand, it may have already arrived */ @@ -5278,6 +5338,8 @@ static void *janus_streaming_handler(void *data) { janus_mutex_unlock(&helper->mutex); } session->mountpoint = mp; + /* Send a PLI too, in case the mountpoint supports video and RTCP */ + janus_streaming_rtcp_pli_send(mp->source); g_atomic_int_set(&session->paused, 0); janus_mutex_unlock(&session->mutex); janus_mutex_unlock(&mp->mutex); @@ -5391,14 +5453,14 @@ static int janus_streaming_create_fd(int port, in_addr_t mcast, const janus_netw fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if(fd < 0) { JANUS_LOG(LOG_ERR, "[%s] Cannot create socket for %s... %d (%s)\n", - mountpointname, medianame, errno, strerror(errno)); + mountpointname, medianame, errno, g_strerror(errno)); break; } #ifdef IP_MULTICAST_ALL int mc_all = 0; if((setsockopt(fd, IPPROTO_IP, IP_MULTICAST_ALL, (void*) &mc_all, sizeof(mc_all))) < 0) { JANUS_LOG(LOG_ERR, "[%s] %s listener setsockopt IP_MULTICAST_ALL failed... %d (%s)\n", - mountpointname, listenername, errno, strerror(errno)); + mountpointname, listenername, errno, g_strerror(errno)); close(fd); janus_mutex_unlock(&fd_mutex); return -1; @@ -5428,7 +5490,7 @@ static int janus_streaming_create_fd(int port, in_addr_t mcast, const janus_netw } if(setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) == -1) { JANUS_LOG(LOG_ERR, "[%s] %s listener IP_ADD_MEMBERSHIP failed... %d (%s)\n", - mountpointname, listenername, errno, strerror(errno)); + mountpointname, listenername, errno, g_strerror(errno)); close(fd); janus_mutex_unlock(&fd_mutex); return -1; @@ -5456,7 +5518,7 @@ static int janus_streaming_create_fd(int port, in_addr_t mcast, const janus_netw int reuse = 1; if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1) { JANUS_LOG(LOG_ERR, "[%s] %s listener setsockopt SO_REUSEADDR failed... %d (%s)\n", - mountpointname, listenername, errno, strerror(errno)); + mountpointname, listenername, errno, g_strerror(errno)); close(fd); janus_mutex_unlock(&fd_mutex); return -1; @@ -5493,12 +5555,12 @@ static int janus_streaming_create_fd(int port, in_addr_t mcast, const janus_netw int v6only = 0; if(fd < 0) { JANUS_LOG(LOG_ERR, "[%s] Cannot create socket for %s... %d (%s)\n", - mountpointname, medianame, errno, strerror(errno)); + mountpointname, medianame, errno, g_strerror(errno)); break; } if(family != AF_INET && setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &v6only, sizeof(v6only)) != 0) { JANUS_LOG(LOG_ERR, "[%s] setsockopt on socket failed for %s... %d (%s)\n", - mountpointname, medianame, errno, strerror(errno)); + mountpointname, medianame, errno, g_strerror(errno)); break; } } @@ -5508,7 +5570,7 @@ static int janus_streaming_create_fd(int port, in_addr_t mcast, const janus_netw fd = -1; if(!quiet) { JANUS_LOG(LOG_ERR, "[%s] Bind failed for %s (port %d)... %d (%s)\n", - mountpointname, medianame, port, errno, strerror(errno)); + mountpointname, medianame, port, errno, g_strerror(errno)); } if(!use_range) /* Asked for a specific port but it's not available, give up */ break; @@ -5573,7 +5635,7 @@ static int janus_streaming_allocate_port_pair(const char *name, const char *medi static int janus_streaming_get_fd_port(int fd) { struct sockaddr_in6 server = { 0 }; socklen_t len = sizeof(server); - if(getsockname(fd, &server, &len) == -1) { + if(getsockname(fd, (struct sockaddr *)&server, &len) == -1) { return -1; } @@ -6066,7 +6128,6 @@ janus_streaming_mountpoint *janus_streaming_create_rtp_source( /* This extra unref is for the init */ janus_refcount_decrease(&helper->ref); janus_streaming_mountpoint_destroy(live_rtp); - g_free(helper); return NULL; } janus_refcount_increase(&helper->ref); @@ -6304,6 +6365,11 @@ static int janus_streaming_rtsp_parse_sdp(const char *buffer, const char *name, return 0; } +/* Helper function to calculating the minimum value if 'a' is bigger than zero */ +static inline gint64 janus_streaming_min_if(gint64 a, gint64 b) { + return a > 0 ? (a > b ? b : a) : b; +} + /* Static helper to connect to an RTSP server, considering we might do this either * when creating a new mountpoint, or when reconnecting after some failure */ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp) { @@ -6326,8 +6392,8 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L); curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1L); curl_easy_setopt(curl, CURLOPT_URL, source->rtsp_url); - curl_easy_setopt(curl, CURLOPT_TIMEOUT, 10L); - curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 5L); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, source->rtsp_timeout); + curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, source->rtsp_conn_timeout); curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 0L); curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); #if CURL_AT_LEAST_VERSION(7, 66, 0) @@ -6379,7 +6445,6 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp } JANUS_LOG(LOG_VERB, "DESCRIBE answer:%s\n", curldata->buffer); /* Parse the SDP we just got to figure out the negotiated media */ - int ka_timeout = 0; int vpt = -1; char vrtpmap[2048]; vrtpmap[0] = '\0'; @@ -6553,8 +6618,8 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp } else if(is_session) { if(!strcasecmp(name, "timeout")) { /* Take note of the timeout, for keep-alives */ - ka_timeout = atoi(value); - JANUS_LOG(LOG_VERB, " -- RTSP session timeout (video): %d\n", ka_timeout); + source->ka_timeout = janus_streaming_min_if(source->session_timeout, atoi(value) / 2 * G_USEC_PER_SEC); + JANUS_LOG(LOG_VERB, " -- RTSP session timeout (video): %"SCNi64" ms\n", source->ka_timeout / 1000); } } } @@ -6726,8 +6791,8 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp } else if(is_session) { if(!strcasecmp(name, "timeout")) { /* Take note of the timeout, for keep-alives */ - ka_timeout = atoi(value); - JANUS_LOG(LOG_VERB, " -- RTSP session timeout (audio): %d\n", ka_timeout); + source->ka_timeout = janus_streaming_min_if(source->session_timeout, atoi(value) / 2 * G_USEC_PER_SEC); + JANUS_LOG(LOG_VERB, " -- RTSP session timeout (audio): %"SCNi64" ms\n", source->ka_timeout / 1000); } } } @@ -6829,7 +6894,6 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp source->rtsp_vhost = g_strdup(vhost); source->curl = curl; source->curldata = curldata; - source->ka_timeout = ka_timeout; return 0; } @@ -6929,7 +6993,8 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source( char *url, char *username, char *password, gboolean doaudio, int acodec, char *artpmap, char *afmtp, gboolean dovideo, int vcodec, char *vrtpmap, char *vfmtp, gboolean bufferkf, - const janus_network_address *iface, + const janus_network_address *iface, int threads, + gint64 reconnect_delay, gint64 session_timeout, int rtsp_timeout, int rtsp_conn_timeout, gboolean error_on_failure) { char id_num[30]; if(!string_ids) { @@ -6940,6 +7005,23 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source( JANUS_LOG(LOG_ERR, "Can't add 'rtsp' stream, missing url...\n"); return NULL; } + if(reconnect_delay < 0) { + JANUS_LOG(LOG_ERR, "rtsp_reconnect_delay can't be smaller than zero.\n"); + return NULL; + } + if(session_timeout < 0) { + JANUS_LOG(LOG_ERR, "rtsp_session_timeout can't be smaller than zero.\n"); + return NULL; + } + if(rtsp_timeout < 0) { + JANUS_LOG(LOG_ERR, "rtsp_timeout can't be smaller than zero.\n"); + return NULL; + } + if(rtsp_conn_timeout < 0) { + JANUS_LOG(LOG_ERR, "rtsp_conn_timeout can't be smaller than zero.\n"); + return NULL; + } + JANUS_LOG(LOG_VERB, "Audio %s, Video %s\n", doaudio ? "enabled" : "NOT enabled", dovideo ? "enabled" : "NOT enabled"); /* Create an RTP source for the media we'll get */ @@ -7004,6 +7086,11 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source( live_rtsp_source->keyframe.latest_keyframe = NULL; live_rtsp_source->keyframe.temp_keyframe = NULL; live_rtsp_source->keyframe.temp_ts = 0; + live_rtsp_source->ka_timeout = session_timeout; + live_rtsp_source->reconnect_delay = reconnect_delay; + live_rtsp_source->session_timeout = session_timeout; + live_rtsp_source->rtsp_timeout = rtsp_timeout; + live_rtsp_source->rtsp_conn_timeout = rtsp_conn_timeout; janus_mutex_init(&live_rtsp_source->keyframe.mutex); live_rtsp_source->reconnect_timer = 0; janus_mutex_init(&live_rtsp_source->rtsp_mutex); @@ -7041,9 +7128,44 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source( return NULL; } } - /* Start the thread that will receive the media packets */ + /* If we need helper threads, spawn them now */ GError *error = NULL; char tname[16]; + if(threads > 0) { + int i=0; + for(i=0; iid = i+1; + helper->mp = live_rtsp; + helper->queued_packets = g_async_queue_new_full((GDestroyNotify)janus_streaming_rtp_relay_packet_free); + janus_mutex_init(&helper->mutex); + janus_refcount_init(&helper->ref, janus_streaming_helper_free); + live_rtsp->helper_threads++; + /* Spawn a thread and add references */ + g_snprintf(tname, sizeof(tname), "help %u-%"SCNu64, helper->id, live_rtsp->id); + janus_refcount_increase(&live_rtsp->ref); + janus_refcount_increase(&helper->ref); + helper->thread = g_thread_try_new(tname, &janus_streaming_helper_thread, helper, &error); + if(error != NULL) { + JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the helper thread...\n", + error->code, error->message ? error->message : "??"); + g_error_free(error); + janus_refcount_decrease(&live_rtsp->ref); /* This is for the helper thread */ + g_async_queue_unref(helper->queued_packets); + janus_refcount_decrease(&helper->ref); + /* This extra unref is for the init */ + janus_refcount_decrease(&helper->ref); + janus_mutex_lock(&mountpoints_mutex); + g_hash_table_remove(mountpoints_temp, &id); + janus_mutex_unlock(&mountpoints_mutex); + janus_refcount_decrease(&live_rtsp->ref); + return NULL; + } + janus_refcount_increase(&helper->ref); + live_rtsp->threads = g_list_append(live_rtsp->threads, helper); + } + } + /* Finally, start the thread that will receive the media packets */ g_snprintf(tname, sizeof(tname), "mp %s", live_rtsp->id_str); janus_refcount_increase(&live_rtsp->ref); live_rtsp->thread = g_thread_try_new(tname, &janus_streaming_relay_thread, live_rtsp, &error); @@ -7073,7 +7195,8 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source( char *url, char *username, char *password, gboolean doaudio, int acodec, char *audiortpmap, char *audiofmtp, gboolean dovideo, int vcodec, char *videortpmap, char *videofmtp, gboolean bufferkf, - const janus_network_address *iface, + const janus_network_address *iface, int threads, + gint64 reconnect_delay, gint64 session_timeout, int rtsp_timeout, int rtsp_conn_timeout, gboolean error_on_failure) { JANUS_LOG(LOG_ERR, "RTSP need libcurl\n"); return NULL; @@ -7168,7 +7291,10 @@ static void *janus_streaming_ondemand_thread(void *data) { now.tv_usec = before.tv_usec; time_t passed, d_s, d_us; /* Loop */ - gint read = 0, plen = (sizeof(buf)-RTP_HEADER_SIZE); + gint read = 0; +#ifdef HAVE_LIBOGG + const gint plen = (sizeof(buf)-RTP_HEADER_SIZE); +#endif janus_streaming_rtp_relay_packet packet; while(!g_atomic_int_get(&stopping) && !g_atomic_int_get(&mountpoint->destroyed) && !g_atomic_int_get(&session->stopping) && !g_atomic_int_get(&session->destroyed)) { @@ -7315,7 +7441,10 @@ static void *janus_streaming_filesource_thread(void *data) { now.tv_usec = before.tv_usec; time_t passed, d_s, d_us; /* Loop */ - gint read = 0, plen = (sizeof(buf)-RTP_HEADER_SIZE); + gint read = 0; +#ifdef HAVE_LIBOGG + const gint plen = (sizeof(buf)-RTP_HEADER_SIZE); +#endif janus_streaming_rtp_relay_packet packet; while(!g_atomic_int_get(&stopping) && !g_atomic_int_get(&mountpoint->destroyed)) { /* See if it's time to prepare a frame */ @@ -7441,7 +7570,7 @@ static void *janus_streaming_relay_thread(void *data) { gint64 now = janus_get_monotonic_time(), before = now, ka_timeout = 0; if(source->rtsp) { source->reconnect_timer = now; - ka_timeout = ((gint64)source->ka_timeout*G_USEC_PER_SEC)/2; + ka_timeout = source->ka_timeout; } #endif /* Loop */ @@ -7457,8 +7586,8 @@ static void *janus_streaming_relay_thread(void *data) { continue; } now = janus_get_monotonic_time(); - if(!source->reconnecting && (now - source->reconnect_timer > 5*G_USEC_PER_SEC)) { - /* 5 seconds passed and no media? Assume the RTSP server has gone and schedule a reconnect */ + if(!source->reconnecting && (now - source->reconnect_timer > source->reconnect_delay)) { + /* Assume the RTSP server has gone and schedule a reconnect */ JANUS_LOG(LOG_WARN, "[%s] %"SCNi64"s passed with no media, trying to reconnect the RTSP stream\n", name, (now - source->reconnect_timer)/G_USEC_PER_SEC); audio_fd = -1; @@ -7522,7 +7651,7 @@ static void *janus_streaming_relay_thread(void *data) { data_fd = source->data_fd; audio_rtcp_fd = source->audio_rtcp_fd; video_rtcp_fd = source->video_rtcp_fd; - ka_timeout = ((gint64)source->ka_timeout*G_USEC_PER_SEC)/2; + ka_timeout = source->ka_timeout; } } source->reconnect_timer = janus_get_monotonic_time(); @@ -7532,7 +7661,7 @@ static void *janus_streaming_relay_thread(void *data) { } if(audio_fd < 0 && video_fd[0] < 0 && video_fd[1] < 0 && video_fd[2] < 0 && data_fd < 0) { /* No socket, we may be in the process of reconnecting, or waiting to reconnect */ - g_usleep(5000000); + g_usleep(source->reconnect_delay); continue; } /* We may also need to occasionally send a OPTIONS request as a keep-alive */ @@ -7621,10 +7750,10 @@ static void *janus_streaming_relay_thread(void *data) { resfd = poll(fds, num, 1000); if(resfd < 0) { if(errno == EINTR) { - JANUS_LOG(LOG_HUGE, "[%s] Got an EINTR (%s), ignoring...\n", name, strerror(errno)); + JANUS_LOG(LOG_HUGE, "[%s] Got an EINTR (%s), ignoring...\n", name, g_strerror(errno)); continue; } - JANUS_LOG(LOG_ERR, "[%s] Error polling... %d (%s)\n", name, errno, strerror(errno)); + JANUS_LOG(LOG_ERR, "[%s] Error polling... %d (%s)\n", name, errno, g_strerror(errno)); mountpoint->enabled = FALSE; janus_mutex_lock(&source->rec_mutex); if(source->arc) { @@ -7659,7 +7788,7 @@ static void *janus_streaming_relay_thread(void *data) { if(fds[i].revents & (POLLERR | POLLHUP)) { /* Socket error? */ JANUS_LOG(LOG_ERR, "[%s] Error polling: %s... %d (%s)\n", name, - fds[i].revents & POLLERR ? "POLLERR" : "POLLHUP", errno, strerror(errno)); + fds[i].revents & POLLERR ? "POLLERR" : "POLLHUP", errno, g_strerror(errno)); mountpoint->enabled = FALSE; janus_mutex_lock(&source->rec_mutex); if(source->arc) { @@ -8002,6 +8131,7 @@ static void *janus_streaming_relay_thread(void *data) { packet.data = (janus_rtp_header *)data; packet.length = bytes; packet.is_rtp = FALSE; + packet.is_data = TRUE; packet.textdata = source->textdata; /* Is there a recorder? */ janus_recorder_save_frame(source->drc, data, bytes); @@ -8013,6 +8143,8 @@ static void *janus_streaming_relay_thread(void *data) { pkt->data = g_malloc(bytes); memcpy(pkt->data, data, bytes); packet.is_rtp = FALSE; + packet.is_data = TRUE; + packet.textdata = source->textdata; pkt->length = bytes; janus_mutex_unlock(&source->buffermsg_mutex); } @@ -8043,6 +8175,7 @@ static void *janus_streaming_relay_thread(void *data) { JANUS_LOG(LOG_HUGE, "[%s] Got audio RTCP feedback: SSRC %"SCNu32"\n", name, janus_rtcp_get_sender_ssrc(buffer, bytes)); /* Relay on all sessions */ + packet.is_rtp = FALSE; packet.is_video = FALSE; packet.data = (janus_rtp_header *)buffer; packet.length = bytes; @@ -8069,6 +8202,7 @@ static void *janus_streaming_relay_thread(void *data) { JANUS_LOG(LOG_HUGE, "[%s] Got video RTCP feedback: SSRC %"SCNu32"\n", name, janus_rtcp_get_sender_ssrc(buffer, bytes)); /* Relay on all sessions */ + packet.is_rtp = FALSE; packet.is_video = TRUE; packet.data = (janus_rtp_header *)buffer; packet.length = bytes; @@ -8316,6 +8450,13 @@ static void janus_streaming_relay_rtp_packet(gpointer data, gpointer user_data) /* Process this packet: don't relay if it's not the SSRC/layer we wanted to handle */ gboolean relay = janus_rtp_simulcasting_context_process_rtp(&session->sim_context, (char *)packet->data, packet->length, packet->ssrc, NULL, packet->codec, &session->context); + if(!relay) { + /* Did a lot of time pass before we could relay a packet? */ + gint64 now = janus_get_monotonic_time(); + if((now - session->sim_context.last_relayed) >= G_USEC_PER_SEC) { + g_atomic_int_set(&session->sim_context.need_pli, 1); + } + } if(session->sim_context.need_pli) { /* Schedule a PLI */ JANUS_LOG(LOG_VERB, "We need a PLI for the simulcast context\n"); @@ -8413,7 +8554,6 @@ static void janus_streaming_relay_rtp_packet(gpointer data, gpointer user_data) return; } - static void janus_streaming_relay_rtcp_packet(gpointer data, gpointer user_data) { janus_streaming_rtp_relay_packet *packet = (janus_streaming_rtp_relay_packet *)user_data; if(!packet || !packet->data || packet->length < 1) { @@ -8454,6 +8594,8 @@ static void janus_streaming_helper_rtprtcp_packet(gpointer data, gpointer user_d memcpy(copy->data, packet->data, packet->length); copy->length = packet->length; copy->is_rtp = packet->is_rtp; + copy->is_data = packet->is_data; + copy->textdata = packet->textdata; copy->is_video = packet->is_video; copy->is_keyframe = packet->is_keyframe; copy->simulcast = packet->simulcast; @@ -8478,7 +8620,7 @@ static void *janus_streaming_helper_thread(void *data) { break; janus_mutex_lock(&helper->mutex); g_list_foreach(helper->viewers, - pkt->is_rtp ? janus_streaming_relay_rtp_packet : janus_streaming_relay_rtcp_packet, + pkt->is_rtp || pkt->is_data ? janus_streaming_relay_rtp_packet : janus_streaming_relay_rtcp_packet, pkt); janus_mutex_unlock(&helper->mutex); janus_streaming_rtp_relay_packet_free(pkt);