From 43e4651e8c646d48722a1654085b55c5eaa3610d Mon Sep 17 00:00:00 2001 From: Zoltan Ponekker Date: Fri, 19 Mar 2021 13:16:44 +0100 Subject: [PATCH 01/11] Making the timeout parameters for RTSP is configurable like RTSP connection/communication timeout, no media timeout and reconnection delay. --- conf/janus.plugin.streaming.jcfg.sample.in | 27 +++++++ plugins/janus_streaming.c | 83 +++++++++++++++++----- 2 files changed, 93 insertions(+), 17 deletions(-) diff --git a/conf/janus.plugin.streaming.jcfg.sample.in b/conf/janus.plugin.streaming.jcfg.sample.in index 7eb058bea5..8561b602c3 100644 --- a/conf/janus.plugin.streaming.jcfg.sample.in +++ b/conf/janus.plugin.streaming.jcfg.sample.in @@ -249,4 +249,31 @@ file-ondemand-sample: { #rtsp_user = "username" #rtsp_pwd = "password" #secret = "adminpwd" + + # After n seconds passed and no media assumed, the RTSP server has + # gone and schedule a reconnect. + # + # Default value is 5 seconds. + #reconnect_delay = 5 + + # By default the sreaming plugin will check the RTSP connection with an OPTIONS query, + # the value of the timeout is came from the RTSP session initializer and by default + # this session timeout is the half of this value. + # + # In some cases this value is can be too high (for example more than one minute) + # because of the media server. In that case this plugin will calculate the timeout + # with this formula: timeout = min(session_timeout, rtsp_session_timeout / 2). + # + # Default value is 0 seconds, then it doesn't override the RTSP server's session timeout.. + #session_timeout = 0 + + # Communication timeout for cURL call gathering the RTSP information. + # + # Default value is 10 seconds. + # rtsp_timeout = 10 + + # Connection timeout for cURL call gathering the RTSP information. + # + # Default value is 5 seconds. + # rtsp_conn_timeout = 5 #} diff --git a/plugins/janus_streaming.c b/plugins/janus_streaming.c index 2f09777bc3..cb8466b3e4 100644 --- a/plugins/janus_streaming.c +++ b/plugins/janus_streaming.c @@ -721,6 +721,11 @@ rtspiface = network interface IP address or device name to listen on when receiv #include "../utils.h" #include "../ip-utils.h" +/* Default settings */ +#define JANUS_DEFAULT_SESSION_TIMEOUT 0 /* Overwrite the RTSP session timeout. If set to zero, the RTSP timeout is derived from a session. */ +#define JANUS_DEFAULT_RECONNECT_DELAY 5 /* Reconnecting delay in seconds. */ +#define JANUS_DEFAULT_CURL_TIMEOUT 10L /* Communication timeout for cURL. */ +#define JANUS_DEFAULT_CURL_CONNECT_TIMEOUT 5L /* Conection timeout for cURL. */ /* Plugin information */ #define JANUS_STREAMING_VERSION 8 @@ -1066,7 +1071,7 @@ typedef struct janus_streaming_rtp_source { janus_streaming_buffer *curldata; char *rtsp_url; char *rtsp_username, *rtsp_password; - int ka_timeout; + gint64 ka_timeout; char *rtsp_ahost, *rtsp_vhost; gboolean reconnecting; gint64 reconnect_timer; @@ -1132,6 +1137,10 @@ typedef struct janus_streaming_mountpoint { gboolean audio, video, data; GList *viewers; int helper_threads; /* Only relevant for RTP/RTSP mountpoints */ + gint64 reconnect_delay; /* Only relevant for RTSP mountpoints */ + gint64 session_timeout; /* Only relevant for RTSP mountpoints */ + int rtsp_timeout; /* Only relevant for RTSP mountpoints */ + int rtsp_conn_timeout; /* Only relevant for RTSP mountpoints */ GList *threads; /* Only relevant for RTP/RTSP mountpoints */ volatile gint destroyed; janus_mutex mutex; @@ -1188,9 +1197,12 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source( gboolean doaudio, int audiopt, char *artpmap, char *afmtp, gboolean dovideo, int videopt, char *vrtpmap, char *vfmtp, gboolean bufferkf, const janus_network_address *iface, int threads, + gint64 reconnect_delay, + gint64 session_timeout, + int rtsp_timeout, + int rtsp_conn_timeout, gboolean error_on_failure); - typedef struct janus_streaming_message { janus_plugin_session *handle; char *transaction; @@ -2101,6 +2113,10 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) { janus_config_item *iface = janus_config_get(config, cat, janus_config_type_item, "rtspiface"); janus_config_item *failerr = janus_config_get(config, cat, janus_config_type_item, "rtsp_failcheck"); janus_config_item *threads = janus_config_get(config, cat, janus_config_type_item, "threads"); + janus_config_item *reconnect_delay = janus_config_get(config, cat, janus_config_type_item, "reconnect_delay"); + janus_config_item *session_timeout = janus_config_get(config, cat, janus_config_type_item, "session_timeout"); + janus_config_item *rtsp_timeout = janus_config_get(config, cat, janus_config_type_item, "rtsp_timeout"); + janus_config_item *rtsp_conn_timeout = janus_config_get(config, cat, janus_config_type_item, "rtsp_conn_timeout"); janus_network_address iface_value; if(file == NULL || file->value == NULL) { JANUS_LOG(LOG_ERR, "Can't add 'rtsp' mountpoint '%s', missing mandatory information...\n", cat->name); @@ -2153,6 +2169,10 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) { bufferkf, iface && iface->value ? &iface_value : NULL, (threads && threads->value) ? atoi(threads->value) : 0, + ((reconnect_delay && reconnect_delay->value) ? atoi(reconnect_delay->value) : JANUS_DEFAULT_RECONNECT_DELAY) * G_USEC_PER_SEC, + ((session_timeout && session_timeout->value) ? atoi(session_timeout->value) : JANUS_DEFAULT_SESSION_TIMEOUT) * G_USEC_PER_SEC, + ((rtsp_timeout && rtsp_timeout->value) ? atoi(rtsp_timeout->value) : JANUS_DEFAULT_CURL_TIMEOUT), + ((rtsp_conn_timeout && rtsp_conn_timeout->value) ? atoi(rtsp_conn_timeout->value) : JANUS_DEFAULT_CURL_CONNECT_TIMEOUT), error_on_failure)) == NULL) { JANUS_LOG(LOG_ERR, "Error creating 'rtsp' mountpoint '%s'...\n", cat->name); cl = cl->next; @@ -3162,6 +3182,10 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi json_t *iface = json_object_get(root, "rtspiface"); json_t *threads = json_object_get(root, "threads"); json_t *failerr = json_object_get(root, "rtsp_failcheck"); + json_t *reconnect_delay = json_object_get(root, "reconnect_delay"); + json_t *session_timeout = json_object_get(root, "session_timeout"); + json_t *rtsp_timeout = json_object_get(root, "rtsp_timeout"); + json_t *rtsp_conn_timeout = json_object_get(root, "rtsp_conn_timeout"); if(failerr == NULL) /* For an old typo, we support the legacy syntax too */ failerr = json_object_get(root, "rtsp_check"); gboolean doaudio = audio ? json_is_true(audio) : FALSE; @@ -3205,6 +3229,10 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi (char *)json_string_value(videortpmap), (char *)json_string_value(videofmtp), videobufferkf ? json_is_true(videobufferkf) : FALSE, &multicast_iface, (threads ? json_integer_value(threads) : 0), + ((reconnect_delay ? json_integer_value(reconnect_delay) : JANUS_DEFAULT_RECONNECT_DELAY) * G_USEC_PER_SEC), + ((session_timeout ? json_integer_value(session_timeout) : JANUS_DEFAULT_SESSION_TIMEOUT) * G_USEC_PER_SEC), + (rtsp_timeout ? json_integer_value(rtsp_timeout) : JANUS_DEFAULT_CURL_TIMEOUT), + (rtsp_conn_timeout ? json_integer_value(rtsp_conn_timeout) : JANUS_DEFAULT_CURL_CONNECT_TIMEOUT), error_on_failure); janus_mutex_lock(&mountpoints_mutex); g_hash_table_remove(mountpoints_temp, string_ids ? (gpointer)mpid_str : (gpointer)&mpid); @@ -5598,7 +5626,7 @@ static int janus_streaming_allocate_port_pair(const char *name, const char *medi static int janus_streaming_get_fd_port(int fd) { struct sockaddr_in6 server = { 0 }; socklen_t len = sizeof(server); - if(getsockname(fd, &server, &len) == -1) { + if(getsockname(fd, (struct sockaddr *)&server, &len) == -1) { return -1; } @@ -5893,6 +5921,8 @@ janus_streaming_mountpoint *janus_streaming_create_rtp_source( live_rtp->audio = doaudio; live_rtp->video = dovideo; live_rtp->data = dodata; + live_rtp->reconnect_delay = JANUS_DEFAULT_RECONNECT_DELAY * G_USEC_PER_SEC; + live_rtp->session_timeout = 0; /* No overriding */ live_rtp->streaming_type = janus_streaming_type_live; live_rtp->streaming_source = janus_streaming_source_rtp; janus_streaming_rtp_source *live_rtp_source = g_malloc0(sizeof(janus_streaming_rtp_source)); @@ -6024,6 +6054,7 @@ janus_streaming_mountpoint *janus_streaming_create_rtp_source( live_rtp_source->textdata = textdata; live_rtp_source->buffermsg = buffermsg; live_rtp_source->last_msg = NULL; + live_rtp_source->ka_timeout = 0; janus_mutex_init(&live_rtp_source->buffermsg_mutex); live_rtp->source = live_rtp_source; live_rtp->source_destroy = (GDestroyNotify) janus_streaming_rtp_source_free; @@ -6328,6 +6359,14 @@ static int janus_streaming_rtsp_parse_sdp(const char *buffer, const char *name, return 0; } +static inline gint64 janus_min_if(gint64 a, gint64 b) +{ + return a > 0 + ? (a > b ? b : a) + : b + ; +} + /* Static helper to connect to an RTSP server, considering we might do this either * when creating a new mountpoint, or when reconnecting after some failure */ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp) { @@ -6350,8 +6389,8 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L); curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1L); curl_easy_setopt(curl, CURLOPT_URL, source->rtsp_url); - curl_easy_setopt(curl, CURLOPT_TIMEOUT, 10L); - curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 5L); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, mp->rtsp_timeout); + curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, mp->rtsp_conn_timeout); curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 0L); curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); #if CURL_AT_LEAST_VERSION(7, 66, 0) @@ -6403,7 +6442,6 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp } JANUS_LOG(LOG_VERB, "DESCRIBE answer:%s\n", curldata->buffer); /* Parse the SDP we just got to figure out the negotiated media */ - int ka_timeout = 0; int vpt = -1; char vrtpmap[2048]; vrtpmap[0] = '\0'; @@ -6574,8 +6612,8 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp } else if(is_session) { if(!strcasecmp(name, "timeout")) { /* Take note of the timeout, for keep-alives */ - ka_timeout = atoi(value); - JANUS_LOG(LOG_VERB, " -- RTSP session timeout (video): %d\n", ka_timeout); + source->ka_timeout = janus_min_if(mp->session_timeout, atoi(value) / 2 * G_USEC_PER_SEC); + JANUS_LOG(LOG_VERB, " -- RTSP session timeout (video): %lld ms\n", source->ka_timeout / 1000); } } } @@ -6747,8 +6785,8 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp } else if(is_session) { if(!strcasecmp(name, "timeout")) { /* Take note of the timeout, for keep-alives */ - ka_timeout = atoi(value); - JANUS_LOG(LOG_VERB, " -- RTSP session timeout (audio): %d\n", ka_timeout); + source->ka_timeout = janus_min_if(mp->session_timeout, atoi(value) / 2 * G_USEC_PER_SEC); + JANUS_LOG(LOG_VERB, " -- RTSP session timeout (audio): %lld ms\n", source->ka_timeout / 1000); } } } @@ -6850,7 +6888,6 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp source->rtsp_vhost = g_strdup(vhost); source->curl = curl; source->curldata = curldata; - source->ka_timeout = ka_timeout; return 0; } @@ -6951,6 +6988,10 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source( gboolean doaudio, int acodec, char *artpmap, char *afmtp, gboolean dovideo, int vcodec, char *vrtpmap, char *vfmtp, gboolean bufferkf, const janus_network_address *iface, int threads, + gint64 reconnect_delay, + gint64 session_timeout, + int rtsp_timeout, + int rtsp_conn_timeout, gboolean error_on_failure) { char id_num[30]; if(!string_ids) { @@ -7025,10 +7066,15 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source( live_rtsp_source->keyframe.latest_keyframe = NULL; live_rtsp_source->keyframe.temp_keyframe = NULL; live_rtsp_source->keyframe.temp_ts = 0; + live_rtsp_source->ka_timeout = session_timeout; janus_mutex_init(&live_rtsp_source->keyframe.mutex); live_rtsp_source->reconnect_timer = 0; janus_mutex_init(&live_rtsp_source->rtsp_mutex); live_rtsp->source = live_rtsp_source; + live_rtsp->reconnect_delay = reconnect_delay; + live_rtsp->session_timeout = session_timeout; + live_rtsp->rtsp_timeout = rtsp_timeout; + live_rtsp->rtsp_conn_timeout = rtsp_conn_timeout; live_rtsp->source_destroy = (GDestroyNotify) janus_streaming_rtp_source_free; live_rtsp->viewers = NULL; g_atomic_int_set(&live_rtsp->destroyed, 0); @@ -7130,6 +7176,10 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source( gboolean doaudio, int acodec, char *audiortpmap, char *audiofmtp, gboolean dovideo, int vcodec, char *videortpmap, char *videofmtp, gboolean bufferkf, const janus_network_address *iface, int threads, + gint64 reconnect_delay, + gint64 session_timeout, + int rtsp_timeout, + int rtsp_conn_timeout, gboolean error_on_failure) { JANUS_LOG(LOG_ERR, "RTSP need libcurl\n"); return NULL; @@ -7503,7 +7553,7 @@ static void *janus_streaming_relay_thread(void *data) { gint64 now = janus_get_monotonic_time(), before = now, ka_timeout = 0; if(source->rtsp) { source->reconnect_timer = now; - ka_timeout = ((gint64)source->ka_timeout*G_USEC_PER_SEC)/2; + ka_timeout = source->ka_timeout; } #endif /* Loop */ @@ -7519,8 +7569,8 @@ static void *janus_streaming_relay_thread(void *data) { continue; } now = janus_get_monotonic_time(); - if(!source->reconnecting && (now - source->reconnect_timer > 5*G_USEC_PER_SEC)) { - /* 5 seconds passed and no media? Assume the RTSP server has gone and schedule a reconnect */ + if(!source->reconnecting && (now - source->reconnect_timer > mountpoint->reconnect_delay)) { + /* n seconds passed and no media? Assume the RTSP server has gone and schedule a reconnect */ JANUS_LOG(LOG_WARN, "[%s] %"SCNi64"s passed with no media, trying to reconnect the RTSP stream\n", name, (now - source->reconnect_timer)/G_USEC_PER_SEC); audio_fd = -1; @@ -7582,7 +7632,7 @@ static void *janus_streaming_relay_thread(void *data) { data_fd = source->data_fd; audio_rtcp_fd = source->audio_rtcp_fd; video_rtcp_fd = source->video_rtcp_fd; - ka_timeout = ((gint64)source->ka_timeout*G_USEC_PER_SEC)/2; + ka_timeout = source->ka_timeout; } } source->reconnect_timer = janus_get_monotonic_time(); @@ -7592,7 +7642,7 @@ static void *janus_streaming_relay_thread(void *data) { } if(audio_fd < 0 && video_fd[0] < 0 && video_fd[1] < 0 && video_fd[2] < 0 && data_fd < 0) { /* No socket, we may be in the process of reconnecting, or waiting to reconnect */ - g_usleep(5000000); + g_usleep(mountpoint->reconnect_delay); continue; } /* We may also need to occasionally send a OPTIONS request as a keep-alive */ @@ -8485,7 +8535,6 @@ static void janus_streaming_relay_rtp_packet(gpointer data, gpointer user_data) return; } - static void janus_streaming_relay_rtcp_packet(gpointer data, gpointer user_data) { janus_streaming_rtp_relay_packet *packet = (janus_streaming_rtp_relay_packet *)user_data; if(!packet || !packet->data || packet->length < 1) { From 83ffdf3f16b3f0084c228b2e53a3c0031ae6ec5d Mon Sep 17 00:00:00 2001 From: Zoltan Ponekker Date: Wed, 24 Mar 2021 10:19:15 +0100 Subject: [PATCH 02/11] fix compiling w/o cURL --- plugins/janus_streaming.c | 1 - 1 file changed, 1 deletion(-) diff --git a/plugins/janus_streaming.c b/plugins/janus_streaming.c index cb8466b3e4..2bd241c7df 100644 --- a/plugins/janus_streaming.c +++ b/plugins/janus_streaming.c @@ -6054,7 +6054,6 @@ janus_streaming_mountpoint *janus_streaming_create_rtp_source( live_rtp_source->textdata = textdata; live_rtp_source->buffermsg = buffermsg; live_rtp_source->last_msg = NULL; - live_rtp_source->ka_timeout = 0; janus_mutex_init(&live_rtp_source->buffermsg_mutex); live_rtp->source = live_rtp_source; live_rtp->source_destroy = (GDestroyNotify) janus_streaming_rtp_source_free; From 996aedd8b7fd6e092fe81802ee611307c208b5b3 Mon Sep 17 00:00:00 2001 From: Zoltan Ponekker Date: Wed, 24 Mar 2021 10:38:58 +0100 Subject: [PATCH 03/11] PR comments fix --- conf/janus.plugin.streaming.jcfg.sample | 264 ++++++++++++++++++++++++ plugins/janus_streaming.c | 57 +++-- 2 files changed, 292 insertions(+), 29 deletions(-) create mode 100644 conf/janus.plugin.streaming.jcfg.sample diff --git a/conf/janus.plugin.streaming.jcfg.sample b/conf/janus.plugin.streaming.jcfg.sample new file mode 100644 index 0000000000..7c5a1ca23d --- /dev/null +++ b/conf/janus.plugin.streaming.jcfg.sample @@ -0,0 +1,264 @@ +# stream-name: { +# type = rtp|live|ondemand|rtsp +# rtp = stream originated by an external tool (e.g., gstreamer or +# ffmpeg) and sent to the plugin via RTP +# live = local file streamed live to multiple listeners +# (multiple listeners = same streaming context) +# ondemand = local file streamed on-demand to a single listener +# (multiple listeners = different streaming contexts) +# rtsp = stream originated by an external RTSP feed (only +# available if libcurl support was compiled) +# id = (if missing, a random one will be generated) +# description = This is my awesome stream +# metadata = An optional string that can contain any metadata (e.g., JSON) +# associated with the stream you want users to receive +# is_private = true|false (private streams don't appear when you do a 'list' +# request) +# secret = +# pin = +# filename = path to the local file to stream (only for live/ondemand) +# audio = true|false (do/don't stream audio) +# video = true|false (do/don't stream video) +# The following options are only valid for the 'rtp' type: +# data = true|false (do/don't stream text via datachannels) +# audioport = local port for receiving audio frames +# audiortcpport = local port, if any, for receiving and sending audio RTCP feedback +# audiomcast = multicast group port for receiving audio frames, if any +# audioiface = network interface or IP address to bind to, if any (binds to all otherwise) +# audiopt =