Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for helper threads to RTSP mountpoints (fixes #2359) #2361

Merged
merged 1 commit into from
Sep 25, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 59 additions & 8 deletions plugins/janus_streaming.c
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,7 @@ static struct janus_json_parameter rtsp_parameters[] = {
{"videofmtp", JSON_STRING, 0},
{"videopt", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE},
{"videobufferkf", JANUS_JSON_BOOL, 0},
{"threads", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE},
{"rtspiface", JSON_STRING, 0},
{"rtsp_failcheck", JANUS_JSON_BOOL, 0}
};
Expand Down Expand Up @@ -1129,8 +1130,8 @@ typedef struct janus_streaming_mountpoint {
janus_streaming_codecs codecs;
gboolean audio, video, data;
GList *viewers;
int helper_threads; /* Only relevant for RTP mountpoints */
GList *threads; /* Only relevant for RTP mountpoints */
int helper_threads; /* Only relevant for RTP/RTSP mountpoints */
GList *threads; /* Only relevant for RTP/RTSP mountpoints */
volatile gint destroyed;
janus_mutex mutex;
janus_refcount ref;
Expand Down Expand Up @@ -1185,7 +1186,7 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source(
char *url, char *username, char *password,
gboolean doaudio, int audiopt, char *artpmap, char *afmtp,
gboolean dovideo, int videopt, char *vrtpmap, char *vfmtp, gboolean bufferkf,
const janus_network_address *iface,
const janus_network_address *iface, int threads,
gboolean error_on_failure);


Expand Down Expand Up @@ -2098,6 +2099,7 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) {
janus_config_item *vkf = janus_config_get(config, cat, janus_config_type_item, "videobufferkf");
janus_config_item *iface = janus_config_get(config, cat, janus_config_type_item, "rtspiface");
janus_config_item *failerr = janus_config_get(config, cat, janus_config_type_item, "rtsp_failcheck");
janus_config_item *threads = janus_config_get(config, cat, janus_config_type_item, "threads");
janus_network_address iface_value;
if(file == NULL || file->value == NULL) {
JANUS_LOG(LOG_ERR, "Can't add 'rtsp' mountpoint '%s', missing mandatory information...\n", cat->name);
Expand All @@ -2111,6 +2113,11 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) {
gboolean error_on_failure = TRUE;
if(failerr && failerr->value)
error_on_failure = janus_is_true(failerr->value);
if(threads && threads->value && atoi(threads->value) < 0) {
JANUS_LOG(LOG_ERR, "Can't add 'rtsp' mountpoint '%s', invalid threads configuration...\n", cat->name);
cl = cl->next;
continue;
}

if((doaudio || dovideo) && iface && iface->value) {
if(!ifas) {
Expand Down Expand Up @@ -2144,6 +2151,7 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) {
vfmtp ? (char *)vfmtp->value : NULL,
bufferkf,
iface && iface->value ? &iface_value : NULL,
(threads && threads->value) ? atoi(threads->value) : 0,
error_on_failure)) == NULL) {
JANUS_LOG(LOG_ERR, "Error creating 'rtsp' mountpoint '%s'...\n", cat->name);
cl = cl->next;
Expand Down Expand Up @@ -3151,6 +3159,7 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi
json_t *username = json_object_get(root, "rtsp_user");
json_t *password = json_object_get(root, "rtsp_pwd");
json_t *iface = json_object_get(root, "rtspiface");
json_t *threads = json_object_get(root, "threads");
json_t *failerr = json_object_get(root, "rtsp_failcheck");
if(failerr == NULL) /* For an old typo, we support the legacy syntax too */
failerr = json_object_get(root, "rtsp_check");
Expand Down Expand Up @@ -3194,7 +3203,7 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi
dovideo, (videopt ? json_integer_value(videopt) : -1),
(char *)json_string_value(videortpmap), (char *)json_string_value(videofmtp),
videobufferkf ? json_is_true(videobufferkf) : FALSE,
&multicast_iface,
&multicast_iface, (threads ? json_integer_value(threads) : 0),
error_on_failure);
janus_mutex_lock(&mountpoints_mutex);
g_hash_table_remove(mountpoints_temp, string_ids ? (gpointer)mpid_str : (gpointer)&mpid);
Expand Down Expand Up @@ -3352,6 +3361,10 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi
json_t *iface = json_object_get(root, "rtspiface");
if(iface)
janus_config_add(config, c, janus_config_item_create("rtspiface", json_string_value(iface)));
if(mp->helper_threads > 0) {
g_snprintf(value, BUFSIZ, "%d", mp->helper_threads);
janus_config_add(config, c, janus_config_item_create("threads", value));
}
}
/* Some more common values */
if(mp->secret)
Expand Down Expand Up @@ -3562,6 +3575,10 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi
json_t *iface = json_object_get(root, "rtspiface");
if(iface)
janus_config_add(config, c, janus_config_item_create("rtspiface", json_string_value(iface)));
if(mp->helper_threads > 0) {
g_snprintf(value, BUFSIZ, "%d", mp->helper_threads);
janus_config_add(config, c, janus_config_item_create("threads", value));
}
} else {
janus_config_add(config, c, janus_config_item_create("type", "rtp"));
janus_config_add(config, c, janus_config_item_create("audio", mp->codecs.audio_pt >= 0 ? "yes" : "no"));
Expand Down Expand Up @@ -6068,7 +6085,6 @@ janus_streaming_mountpoint *janus_streaming_create_rtp_source(
/* This extra unref is for the init */
janus_refcount_decrease(&helper->ref);
janus_streaming_mountpoint_destroy(live_rtp);
g_free(helper);
return NULL;
}
janus_refcount_increase(&helper->ref);
Expand Down Expand Up @@ -6928,7 +6944,7 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source(
char *url, char *username, char *password,
gboolean doaudio, int acodec, char *artpmap, char *afmtp,
gboolean dovideo, int vcodec, char *vrtpmap, char *vfmtp, gboolean bufferkf,
const janus_network_address *iface,
const janus_network_address *iface, int threads,
gboolean error_on_failure) {
char id_num[30];
if(!string_ids) {
Expand Down Expand Up @@ -7040,9 +7056,44 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source(
return NULL;
}
}
/* Start the thread that will receive the media packets */
/* If we need helper threads, spawn them now */
GError *error = NULL;
char tname[16];
if(threads > 0) {
int i=0;
for(i=0; i<threads; i++) {
janus_streaming_helper *helper = g_malloc0(sizeof(janus_streaming_helper));
helper->id = i+1;
helper->mp = live_rtsp;
helper->queued_packets = g_async_queue_new_full((GDestroyNotify)janus_streaming_rtp_relay_packet_free);
janus_mutex_init(&helper->mutex);
janus_refcount_init(&helper->ref, janus_streaming_helper_free);
live_rtsp->helper_threads++;
/* Spawn a thread and add references */
g_snprintf(tname, sizeof(tname), "help %u-%"SCNu64, helper->id, live_rtsp->id);
janus_refcount_increase(&live_rtsp->ref);
janus_refcount_increase(&helper->ref);
helper->thread = g_thread_try_new(tname, &janus_streaming_helper_thread, helper, &error);
if(error != NULL) {
JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the helper thread...\n",
error->code, error->message ? error->message : "??");
g_error_free(error);
janus_refcount_decrease(&live_rtsp->ref); /* This is for the helper thread */
g_async_queue_unref(helper->queued_packets);
janus_refcount_decrease(&helper->ref);
/* This extra unref is for the init */
janus_refcount_decrease(&helper->ref);
janus_mutex_lock(&mountpoints_mutex);
g_hash_table_remove(mountpoints_temp, &id);
janus_mutex_unlock(&mountpoints_mutex);
janus_refcount_decrease(&live_rtsp->ref);
return NULL;
}
janus_refcount_increase(&helper->ref);
live_rtsp->threads = g_list_append(live_rtsp->threads, helper);
}
}
/* Finally, start the thread that will receive the media packets */
g_snprintf(tname, sizeof(tname), "mp %s", live_rtsp->id_str);
janus_refcount_increase(&live_rtsp->ref);
live_rtsp->thread = g_thread_try_new(tname, &janus_streaming_relay_thread, live_rtsp, &error);
Expand Down Expand Up @@ -7072,7 +7123,7 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source(
char *url, char *username, char *password,
gboolean doaudio, int acodec, char *audiortpmap, char *audiofmtp,
gboolean dovideo, int vcodec, char *videortpmap, char *videofmtp, gboolean bufferkf,
const janus_network_address *iface,
const janus_network_address *iface, int threads,
gboolean error_on_failure) {
JANUS_LOG(LOG_ERR, "RTSP need libcurl\n");
return NULL;
Expand Down