Skip to content

Commit

Permalink
Add support for helper threads to RTSP mountpoints (fixes #2359) (#2361)
Browse files Browse the repository at this point in the history
  • Loading branch information
lminiero committed Sep 25, 2020
1 parent 4f0f95b commit bf0a59a
Showing 1 changed file with 59 additions and 8 deletions.
67 changes: 59 additions & 8 deletions plugins/janus_streaming.c
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,7 @@ static struct janus_json_parameter rtsp_parameters[] = {
{"videofmtp", JSON_STRING, 0},
{"videopt", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE},
{"videobufferkf", JANUS_JSON_BOOL, 0},
{"threads", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE},
{"rtspiface", JSON_STRING, 0},
{"rtsp_failcheck", JANUS_JSON_BOOL, 0}
};
Expand Down Expand Up @@ -1130,8 +1131,8 @@ typedef struct janus_streaming_mountpoint {
janus_streaming_codecs codecs;
gboolean audio, video, data;
GList *viewers;
int helper_threads; /* Only relevant for RTP mountpoints */
GList *threads; /* Only relevant for RTP mountpoints */
int helper_threads; /* Only relevant for RTP/RTSP mountpoints */
GList *threads; /* Only relevant for RTP/RTSP mountpoints */
volatile gint destroyed;
janus_mutex mutex;
janus_refcount ref;
Expand Down Expand Up @@ -1186,7 +1187,7 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source(
char *url, char *username, char *password,
gboolean doaudio, int audiopt, char *artpmap, char *afmtp,
gboolean dovideo, int videopt, char *vrtpmap, char *vfmtp, gboolean bufferkf,
const janus_network_address *iface,
const janus_network_address *iface, int threads,
gboolean error_on_failure);


Expand Down Expand Up @@ -2099,6 +2100,7 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) {
janus_config_item *vkf = janus_config_get(config, cat, janus_config_type_item, "videobufferkf");
janus_config_item *iface = janus_config_get(config, cat, janus_config_type_item, "rtspiface");
janus_config_item *failerr = janus_config_get(config, cat, janus_config_type_item, "rtsp_failcheck");
janus_config_item *threads = janus_config_get(config, cat, janus_config_type_item, "threads");
janus_network_address iface_value;
if(file == NULL || file->value == NULL) {
JANUS_LOG(LOG_ERR, "Can't add 'rtsp' mountpoint '%s', missing mandatory information...\n", cat->name);
Expand All @@ -2112,6 +2114,11 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) {
gboolean error_on_failure = TRUE;
if(failerr && failerr->value)
error_on_failure = janus_is_true(failerr->value);
if(threads && threads->value && atoi(threads->value) < 0) {
JANUS_LOG(LOG_ERR, "Can't add 'rtsp' mountpoint '%s', invalid threads configuration...\n", cat->name);
cl = cl->next;
continue;
}

if((doaudio || dovideo) && iface && iface->value) {
if(!ifas) {
Expand Down Expand Up @@ -2145,6 +2152,7 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) {
vfmtp ? (char *)vfmtp->value : NULL,
bufferkf,
iface && iface->value ? &iface_value : NULL,
(threads && threads->value) ? atoi(threads->value) : 0,
error_on_failure)) == NULL) {
JANUS_LOG(LOG_ERR, "Error creating 'rtsp' mountpoint '%s'...\n", cat->name);
cl = cl->next;
Expand Down Expand Up @@ -3152,6 +3160,7 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi
json_t *username = json_object_get(root, "rtsp_user");
json_t *password = json_object_get(root, "rtsp_pwd");
json_t *iface = json_object_get(root, "rtspiface");
json_t *threads = json_object_get(root, "threads");
json_t *failerr = json_object_get(root, "rtsp_failcheck");
if(failerr == NULL) /* For an old typo, we support the legacy syntax too */
failerr = json_object_get(root, "rtsp_check");
Expand Down Expand Up @@ -3195,7 +3204,7 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi
dovideo, (videopt ? json_integer_value(videopt) : -1),
(char *)json_string_value(videortpmap), (char *)json_string_value(videofmtp),
videobufferkf ? json_is_true(videobufferkf) : FALSE,
&multicast_iface,
&multicast_iface, (threads ? json_integer_value(threads) : 0),
error_on_failure);
janus_mutex_lock(&mountpoints_mutex);
g_hash_table_remove(mountpoints_temp, string_ids ? (gpointer)mpid_str : (gpointer)&mpid);
Expand Down Expand Up @@ -3353,6 +3362,10 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi
json_t *iface = json_object_get(root, "rtspiface");
if(iface)
janus_config_add(config, c, janus_config_item_create("rtspiface", json_string_value(iface)));
if(mp->helper_threads > 0) {
g_snprintf(value, BUFSIZ, "%d", mp->helper_threads);
janus_config_add(config, c, janus_config_item_create("threads", value));
}
}
/* Some more common values */
if(mp->secret)
Expand Down Expand Up @@ -3563,6 +3576,10 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi
json_t *iface = json_object_get(root, "rtspiface");
if(iface)
janus_config_add(config, c, janus_config_item_create("rtspiface", json_string_value(iface)));
if(mp->helper_threads > 0) {
g_snprintf(value, BUFSIZ, "%d", mp->helper_threads);
janus_config_add(config, c, janus_config_item_create("threads", value));
}
} else {
janus_config_add(config, c, janus_config_item_create("type", "rtp"));
janus_config_add(config, c, janus_config_item_create("audio", mp->codecs.audio_pt >= 0 ? "yes" : "no"));
Expand Down Expand Up @@ -6069,7 +6086,6 @@ janus_streaming_mountpoint *janus_streaming_create_rtp_source(
/* This extra unref is for the init */
janus_refcount_decrease(&helper->ref);
janus_streaming_mountpoint_destroy(live_rtp);
g_free(helper);
return NULL;
}
janus_refcount_increase(&helper->ref);
Expand Down Expand Up @@ -6929,7 +6945,7 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source(
char *url, char *username, char *password,
gboolean doaudio, int acodec, char *artpmap, char *afmtp,
gboolean dovideo, int vcodec, char *vrtpmap, char *vfmtp, gboolean bufferkf,
const janus_network_address *iface,
const janus_network_address *iface, int threads,
gboolean error_on_failure) {
char id_num[30];
if(!string_ids) {
Expand Down Expand Up @@ -7041,9 +7057,44 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source(
return NULL;
}
}
/* Start the thread that will receive the media packets */
/* If we need helper threads, spawn them now */
GError *error = NULL;
char tname[16];
if(threads > 0) {
int i=0;
for(i=0; i<threads; i++) {
janus_streaming_helper *helper = g_malloc0(sizeof(janus_streaming_helper));
helper->id = i+1;
helper->mp = live_rtsp;
helper->queued_packets = g_async_queue_new_full((GDestroyNotify)janus_streaming_rtp_relay_packet_free);
janus_mutex_init(&helper->mutex);
janus_refcount_init(&helper->ref, janus_streaming_helper_free);
live_rtsp->helper_threads++;
/* Spawn a thread and add references */
g_snprintf(tname, sizeof(tname), "help %u-%"SCNu64, helper->id, live_rtsp->id);
janus_refcount_increase(&live_rtsp->ref);
janus_refcount_increase(&helper->ref);
helper->thread = g_thread_try_new(tname, &janus_streaming_helper_thread, helper, &error);
if(error != NULL) {
JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the helper thread...\n",
error->code, error->message ? error->message : "??");
g_error_free(error);
janus_refcount_decrease(&live_rtsp->ref); /* This is for the helper thread */
g_async_queue_unref(helper->queued_packets);
janus_refcount_decrease(&helper->ref);
/* This extra unref is for the init */
janus_refcount_decrease(&helper->ref);
janus_mutex_lock(&mountpoints_mutex);
g_hash_table_remove(mountpoints_temp, &id);
janus_mutex_unlock(&mountpoints_mutex);
janus_refcount_decrease(&live_rtsp->ref);
return NULL;
}
janus_refcount_increase(&helper->ref);
live_rtsp->threads = g_list_append(live_rtsp->threads, helper);
}
}
/* Finally, start the thread that will receive the media packets */
g_snprintf(tname, sizeof(tname), "mp %s", live_rtsp->id_str);
janus_refcount_increase(&live_rtsp->ref);
live_rtsp->thread = g_thread_try_new(tname, &janus_streaming_relay_thread, live_rtsp, &error);
Expand Down Expand Up @@ -7073,7 +7124,7 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source(
char *url, char *username, char *password,
gboolean doaudio, int acodec, char *audiortpmap, char *audiofmtp,
gboolean dovideo, int vcodec, char *videortpmap, char *videofmtp, gboolean bufferkf,
const janus_network_address *iface,
const janus_network_address *iface, int threads,
gboolean error_on_failure) {
JANUS_LOG(LOG_ERR, "RTSP need libcurl\n");
return NULL;
Expand Down

0 comments on commit bf0a59a

Please sign in to comment.