Skip to content

Commit

Permalink
Add support for helper threads to RTSP mountpoints (fixes #2359)
Browse files Browse the repository at this point in the history
  • Loading branch information
lminiero committed Sep 15, 2020
1 parent 8b9549a commit 7d88876
Showing 1 changed file with 59 additions and 8 deletions.
67 changes: 59 additions & 8 deletions plugins/janus_streaming.c
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,7 @@ static struct janus_json_parameter rtsp_parameters[] = {
{"videofmtp", JSON_STRING, 0},
{"videopt", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE},
{"videobufferkf", JANUS_JSON_BOOL, 0},
{"threads", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE},
{"rtspiface", JSON_STRING, 0},
{"rtsp_failcheck", JANUS_JSON_BOOL, 0}
};
Expand Down Expand Up @@ -1129,8 +1130,8 @@ typedef struct janus_streaming_mountpoint {
janus_streaming_codecs codecs;
gboolean audio, video, data;
GList *viewers;
int helper_threads; /* Only relevant for RTP mountpoints */
GList *threads; /* Only relevant for RTP mountpoints */
int helper_threads; /* Only relevant for RTP/RTSP mountpoints */
GList *threads; /* Only relevant for RTP/RTSP mountpoints */
volatile gint destroyed;
janus_mutex mutex;
janus_refcount ref;
Expand Down Expand Up @@ -1185,7 +1186,7 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source(
char *url, char *username, char *password,
gboolean doaudio, int audiopt, char *artpmap, char *afmtp,
gboolean dovideo, int videopt, char *vrtpmap, char *vfmtp, gboolean bufferkf,
const janus_network_address *iface,
const janus_network_address *iface, int threads,
gboolean error_on_failure);


Expand Down Expand Up @@ -2098,6 +2099,7 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) {
janus_config_item *vkf = janus_config_get(config, cat, janus_config_type_item, "videobufferkf");
janus_config_item *iface = janus_config_get(config, cat, janus_config_type_item, "rtspiface");
janus_config_item *failerr = janus_config_get(config, cat, janus_config_type_item, "rtsp_failcheck");
janus_config_item *threads = janus_config_get(config, cat, janus_config_type_item, "threads");
janus_network_address iface_value;
if(file == NULL || file->value == NULL) {
JANUS_LOG(LOG_ERR, "Can't add 'rtsp' mountpoint '%s', missing mandatory information...\n", cat->name);
Expand All @@ -2111,6 +2113,11 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) {
gboolean error_on_failure = TRUE;
if(failerr && failerr->value)
error_on_failure = janus_is_true(failerr->value);
if(threads && threads->value && atoi(threads->value) < 0) {
JANUS_LOG(LOG_ERR, "Can't add 'rtsp' mountpoint '%s', invalid threads configuration...\n", cat->name);
cl = cl->next;
continue;
}

if((doaudio || dovideo) && iface && iface->value) {
if(!ifas) {
Expand Down Expand Up @@ -2144,6 +2151,7 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) {
vfmtp ? (char *)vfmtp->value : NULL,
bufferkf,
iface && iface->value ? &iface_value : NULL,
(threads && threads->value) ? atoi(threads->value) : 0,
error_on_failure)) == NULL) {
JANUS_LOG(LOG_ERR, "Error creating 'rtsp' mountpoint '%s'...\n", cat->name);
cl = cl->next;
Expand Down Expand Up @@ -3151,6 +3159,7 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi
json_t *username = json_object_get(root, "rtsp_user");
json_t *password = json_object_get(root, "rtsp_pwd");
json_t *iface = json_object_get(root, "rtspiface");
json_t *threads = json_object_get(root, "threads");
json_t *failerr = json_object_get(root, "rtsp_failcheck");
if(failerr == NULL) /* For an old typo, we support the legacy syntax too */
failerr = json_object_get(root, "rtsp_check");
Expand Down Expand Up @@ -3194,7 +3203,7 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi
dovideo, (videopt ? json_integer_value(videopt) : -1),
(char *)json_string_value(videortpmap), (char *)json_string_value(videofmtp),
videobufferkf ? json_is_true(videobufferkf) : FALSE,
&multicast_iface,
&multicast_iface, (threads ? json_integer_value(threads) : 0),
error_on_failure);
janus_mutex_lock(&mountpoints_mutex);
g_hash_table_remove(mountpoints_temp, string_ids ? (gpointer)mpid_str : (gpointer)&mpid);
Expand Down Expand Up @@ -3352,6 +3361,10 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi
json_t *iface = json_object_get(root, "rtspiface");
if(iface)
janus_config_add(config, c, janus_config_item_create("rtspiface", json_string_value(iface)));
if(mp->helper_threads > 0) {
g_snprintf(value, BUFSIZ, "%d", mp->helper_threads);
janus_config_add(config, c, janus_config_item_create("threads", value));
}
}
/* Some more common values */
if(mp->secret)
Expand Down Expand Up @@ -3562,6 +3575,10 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi
json_t *iface = json_object_get(root, "rtspiface");
if(iface)
janus_config_add(config, c, janus_config_item_create("rtspiface", json_string_value(iface)));
if(mp->helper_threads > 0) {
g_snprintf(value, BUFSIZ, "%d", mp->helper_threads);
janus_config_add(config, c, janus_config_item_create("threads", value));
}
} else {
janus_config_add(config, c, janus_config_item_create("type", "rtp"));
janus_config_add(config, c, janus_config_item_create("audio", mp->codecs.audio_pt >= 0 ? "yes" : "no"));
Expand Down Expand Up @@ -6068,7 +6085,6 @@ janus_streaming_mountpoint *janus_streaming_create_rtp_source(
/* This extra unref is for the init */
janus_refcount_decrease(&helper->ref);
janus_streaming_mountpoint_destroy(live_rtp);
g_free(helper);
return NULL;
}
janus_refcount_increase(&helper->ref);
Expand Down Expand Up @@ -6928,7 +6944,7 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source(
char *url, char *username, char *password,
gboolean doaudio, int acodec, char *artpmap, char *afmtp,
gboolean dovideo, int vcodec, char *vrtpmap, char *vfmtp, gboolean bufferkf,
const janus_network_address *iface,
const janus_network_address *iface, int threads,
gboolean error_on_failure) {
char id_num[30];
if(!string_ids) {
Expand Down Expand Up @@ -7040,9 +7056,44 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source(
return NULL;
}
}
/* Start the thread that will receive the media packets */
/* If we need helper threads, spawn them now */
GError *error = NULL;
char tname[16];
if(threads > 0) {
int i=0;
for(i=0; i<threads; i++) {
janus_streaming_helper *helper = g_malloc0(sizeof(janus_streaming_helper));
helper->id = i+1;
helper->mp = live_rtsp;
helper->queued_packets = g_async_queue_new_full((GDestroyNotify)janus_streaming_rtp_relay_packet_free);
janus_mutex_init(&helper->mutex);
janus_refcount_init(&helper->ref, janus_streaming_helper_free);
live_rtsp->helper_threads++;
/* Spawn a thread and add references */
g_snprintf(tname, sizeof(tname), "help %u-%"SCNu64, helper->id, live_rtsp->id);
janus_refcount_increase(&live_rtsp->ref);
janus_refcount_increase(&helper->ref);
helper->thread = g_thread_try_new(tname, &janus_streaming_helper_thread, helper, &error);
if(error != NULL) {
JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the helper thread...\n",
error->code, error->message ? error->message : "??");
g_error_free(error);
janus_refcount_decrease(&live_rtsp->ref); /* This is for the helper thread */
g_async_queue_unref(helper->queued_packets);
janus_refcount_decrease(&helper->ref);
/* This extra unref is for the init */
janus_refcount_decrease(&helper->ref);
janus_mutex_lock(&mountpoints_mutex);
g_hash_table_remove(mountpoints_temp, &id);
janus_mutex_unlock(&mountpoints_mutex);
janus_refcount_decrease(&live_rtsp->ref);
return NULL;
}
janus_refcount_increase(&helper->ref);
live_rtsp->threads = g_list_append(live_rtsp->threads, helper);
}
}
/* Finally, start the thread that will receive the media packets */
g_snprintf(tname, sizeof(tname), "mp %s", live_rtsp->id_str);
janus_refcount_increase(&live_rtsp->ref);
live_rtsp->thread = g_thread_try_new(tname, &janus_streaming_relay_thread, live_rtsp, &error);
Expand Down Expand Up @@ -7072,7 +7123,7 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source(
char *url, char *username, char *password,
gboolean doaudio, int acodec, char *audiortpmap, char *audiofmtp,
gboolean dovideo, int vcodec, char *videortpmap, char *videofmtp, gboolean bufferkf,
const janus_network_address *iface,
const janus_network_address *iface, int threads,
gboolean error_on_failure) {
JANUS_LOG(LOG_ERR, "RTSP need libcurl\n");
return NULL;
Expand Down

0 comments on commit 7d88876

Please sign in to comment.