From fe993140577ff91ec626eb7b5f03a2898809674f Mon Sep 17 00:00:00 2001 From: "Petar Minchev (Digital Samba)" <56128952+petarminchev@users.noreply.github.com> Date: Wed, 12 Jul 2023 16:39:40 +0300 Subject: [PATCH] Batch configure support for Streaming plugin (#3239) --- src/plugins/janus_streaming.c | 371 +++++++++++++++++++++------------- 1 file changed, 235 insertions(+), 136 deletions(-) diff --git a/src/plugins/janus_streaming.c b/src/plugins/janus_streaming.c index 5d740cdfcb..72d73193d6 100644 --- a/src/plugins/janus_streaming.c +++ b/src/plugins/janus_streaming.c @@ -793,15 +793,20 @@ multistream-test: { \verbatim { "request" : "configure", - "mid" : , - "send" : , - "substream" : , - "temporal" : , - "fallback" : , - "spatial_layer" : , - "temporal_layer" : , - "min_delay" : , - "max_delay" : + "streams" : [ + { + "mid" : , + "send" : , + "substream" : , + "temporal" : , + "fallback" : , + "spatial_layer" : , + "temporal_layer" : , + "min_delay" : , + "max_delay" : + }, + // Other streams, if any + ] } \endverbatim * @@ -1180,6 +1185,7 @@ static struct janus_json_parameter svc_parameters[] = { }; static struct janus_json_parameter configure_parameters[] = { {"mid", JANUS_JSON_STRING, 0}, + {"streams", JANUS_JSON_ARRAY, 0}, {"send", JANUS_JSON_BOOL, 0}, /* For VP8 (or H.264) simulcast */ {"substream", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, @@ -6557,146 +6563,239 @@ static void *janus_streaming_handler(void *data) { json_t *audio = json_object_get(root, "audio"); json_t *video = json_object_get(root, "video"); json_t *data = json_object_get(root, "data"); - /* Better to specify the 'send' property of a specific 'mid' */ - const char *mid = json_string_value(json_object_get(root, "mid")); - json_t *send = json_object_get(root, "send"); + + /* We use an array of streams to state the changes we want to make, + * were for each stream we specify the 'mid' to impact (e.g., send) */ + json_t *streams = json_object_get(root, "streams"); + if(streams == NULL) { + /* No streams object, check if the properties have been + * provided globally, which is how we handled this + * request before: if so, create a new fake streams + * array, and move the parsed options there */ + streams = json_array(); + json_t *stream = json_object(); + const char *mid = json_string_value(json_object_get(root, "mid")); + if(mid != NULL) + json_object_set_new(stream, "mid", json_string(mid)); + json_t *send = json_object_get(root, "send"); + if(send != NULL) + json_object_set_new(stream, "send", json_is_true(send) ? json_true() : json_false()); + json_t *spatial = json_object_get(root, "spatial_layer"); + if(spatial != NULL) + json_object_set_new(stream, "spatial_layer", json_integer(json_integer_value(spatial))); + json_t *sc_substream = json_object_get(root, "substream"); + if(sc_substream != NULL) + json_object_set_new(stream, "substream", json_integer(json_integer_value(sc_substream))); + json_t *temporal = json_object_get(root, "temporal_layer"); + if(temporal != NULL) + json_object_set_new(stream, "temporal_layer", json_integer(json_integer_value(temporal))); + json_t *sc_temporal = json_object_get(root, "temporal"); + if(sc_temporal != NULL) + json_object_set_new(stream, "temporal", json_integer(json_integer_value(sc_temporal))); + json_t *sc_fallback = json_object_get(root, "fallback"); + if(sc_fallback != NULL) + json_object_set_new(stream, "fallback", json_integer(json_integer_value(sc_fallback))); + json_t *min_delay = json_object_get(root, "min_delay"); + if(min_delay != NULL) + json_object_set_new(stream, "min_delay", json_integer(json_integer_value(min_delay))); + json_t *max_delay = json_object_get(root, "max_delay"); + if(max_delay != NULL) + json_object_set_new(stream, "max_delay", json_integer(json_integer_value(max_delay))); + json_array_append_new(streams, stream); + json_object_set_new(root, "streams", streams); + } + + size_t i = 0; + size_t streams_size = json_array_size(streams); + for(i=0; i 1) { + JANUS_LOG(LOG_ERR, "Invalid element (mid can't be null in a streams array)\n"); + error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT; + g_snprintf(error_cause, 512, "Invalid value (mid can't be null in a streams array)"); + break; + } + if(mid != NULL) { + json_object_del(root, "audio"); + audio = NULL; + json_object_del(root, "video"); + video = NULL; + json_object_del(root, "data"); + data = NULL; + } + json_t *spatial = json_object_get(s, "spatial_layer"); + json_t *sc_substream = json_object_get(s, "substream"); + json_t *temporal = json_object_get(s, "temporal_layer"); + json_t *sc_temporal = json_object_get(s, "temporal"); + if(json_integer_value(spatial) < 0 || json_integer_value(spatial) > 2 || + json_integer_value(sc_substream) < 0 || json_integer_value(sc_substream) > 2) { + JANUS_LOG(LOG_ERR, "Invalid element (substream/spatial_layer should be 0, 1 or 2)\n"); + error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT; + g_snprintf(error_cause, 512, "Invalid value (substream/spatial_layer should be 0, 1 or 2)"); + break; + } + if(json_integer_value(temporal) < 0 || json_integer_value(temporal) > 2 || + json_integer_value(sc_temporal) < 0 || json_integer_value(sc_temporal) > 2) { + JANUS_LOG(LOG_ERR, "Invalid element (temporal/temporal_layer should be 0, 1 or 2)\n"); + error_code = JANUS_STREAMING_ERROR_INVALID_ELEMENT; + g_snprintf(error_cause, 512, "Invalid value (temporal/temporal_layer should be 0, 1 or 2)"); + break; + } + } + if(error_code != 0) { + goto error; + } + if(mp->streaming_source == janus_streaming_source_rtp) { - GList *temp = session->streams; - while(temp) { - janus_streaming_session_stream *s = (janus_streaming_session_stream *)temp->data; - janus_streaming_rtp_source_stream *stream = s->stream; - /* Check the old and deprecated approach first */ - if(audio && stream->type == JANUS_STREAMING_MEDIA_AUDIO) - s->send = json_is_true(audio); - else if(video && stream->type == JANUS_STREAMING_MEDIA_VIDEO) - s->send = json_is_true(video); - else if(data && stream->type == JANUS_STREAMING_MEDIA_DATA) - s->send = json_is_true(data); - /* Now let's see if this is the right mid */ - if(mid && strcasecmp(stream->mid, mid)) { - temp = temp->next; - continue; - } - if(send) - s->send = json_is_true(send); - /* FIXME What if we're simulcasting or doing SVC on two different video streams? */ - if(stream && stream->simulcast) { - /* Check if the viewer is requesting a different substream/temporal layer */ - json_t *substream = json_object_get(root, "substream"); - if(substream) { - s->sim_context.substream_target = json_integer_value(substream); - JANUS_LOG(LOG_VERB, "Setting video substream to let through (simulcast): %d (was %d)\n", - s->sim_context.substream_target, s->sim_context.substream); - if(s->sim_context.substream_target == s->sim_context.substream) { - /* No need to do anything, we're already getting the right substream, so notify the viewer */ - json_t *event = json_object(); - json_object_set_new(event, "streaming", json_string("event")); - json_t *result = json_object(); - json_object_set_new(result, "substream", json_integer(s->sim_context.substream)); - json_object_set_new(event, "result", result); - gateway->push_event(session->handle, &janus_streaming_plugin, NULL, event, NULL); - json_decref(event); - } else { - /* Schedule a PLI */ - JANUS_LOG(LOG_VERB, "We need a PLI for the simulcast context\n"); - g_atomic_int_set(&stream->need_pli, 1); - } + /* Enforce the requested changes */ + for(i=0; istreams; + while(temp) { + janus_streaming_session_stream *s = (janus_streaming_session_stream *)temp->data; + janus_streaming_rtp_source_stream *stream = s->stream; + /* Check the old and deprecated approach first */ + if(audio && stream->type == JANUS_STREAMING_MEDIA_AUDIO) + s->send = json_is_true(audio); + else if(video && stream->type == JANUS_STREAMING_MEDIA_VIDEO) + s->send = json_is_true(video); + else if(data && stream->type == JANUS_STREAMING_MEDIA_DATA) + s->send = json_is_true(data); + /* Now let's see if this is the right mid */ + if(mid && strcasecmp(stream->mid, mid)) { + temp = temp->next; + continue; } - json_t *temporal = json_object_get(root, "temporal"); - if(temporal) { - s->sim_context.templayer_target = json_integer_value(temporal); - JANUS_LOG(LOG_VERB, "Setting video temporal layer to let through (simulcast): %d (was %d)\n", - s->sim_context.templayer_target, s->sim_context.templayer); - if(stream->codecs.video_codec == JANUS_VIDEOCODEC_VP8 && s->sim_context.templayer_target == s->sim_context.templayer) { - /* No need to do anything, we're already getting the right temporal layer, so notify the viewer */ - json_t *event = json_object(); - json_object_set_new(event, "streaming", json_string("event")); - json_t *result = json_object(); - json_object_set_new(result, "temporal", json_integer(s->sim_context.templayer)); - json_object_set_new(event, "result", result); - gateway->push_event(session->handle, &janus_streaming_plugin, NULL, event, NULL); - json_decref(event); + if(send) + s->send = json_is_true(send); + if(stream && stream->simulcast) { + /* Check if the viewer is requesting a different substream/temporal layer */ + json_t *substream = json_object_get(sconf, "substream"); + if(substream) { + s->sim_context.substream_target = json_integer_value(substream); + JANUS_LOG(LOG_VERB, "Setting video substream to let through (simulcast): %d (was %d)\n", + s->sim_context.substream_target, s->sim_context.substream); + if(s->sim_context.substream_target == s->sim_context.substream) { + /* No need to do anything, we're already getting the right substream, so notify the viewer */ + json_t *event = json_object(); + json_object_set_new(event, "streaming", json_string("event")); + json_t *result = json_object(); + json_object_set_new(result, "substream", json_integer(s->sim_context.substream)); + json_object_set_new(event, "result", result); + gateway->push_event(session->handle, &janus_streaming_plugin, NULL, event, NULL); + json_decref(event); + } else { + /* Schedule a PLI */ + JANUS_LOG(LOG_VERB, "We need a PLI for the simulcast context\n"); + g_atomic_int_set(&stream->need_pli, 1); + } } - } - /* Check if we need to change the fallback timer for the substream */ - json_t *fallback = json_object_get(root, "fallback"); - if(fallback) { - JANUS_LOG(LOG_VERB, "Setting fallback timer (simulcast): %lld (was %"SCNu32")\n", - json_integer_value(fallback) ? json_integer_value(fallback) : 250000, - s->sim_context.drop_trigger ? s->sim_context.drop_trigger : 250000); - s->sim_context.drop_trigger = json_integer_value(fallback); - } - } - if(stream && stream->svc) { - /* Check if the viewer is requesting a different SVC spatial/temporal layer */ - json_t *spatial = json_object_get(root, "spatial_layer"); - if(spatial) { - int spatial_layer = json_integer_value(spatial); - if(spatial_layer > 1) { - JANUS_LOG(LOG_WARN, "Spatial layer higher than 1, will probably be ignored\n"); + json_t *temporal = json_object_get(sconf, "temporal"); + if(temporal) { + s->sim_context.templayer_target = json_integer_value(temporal); + JANUS_LOG(LOG_VERB, "Setting video temporal layer to let through (simulcast): %d (was %d)\n", + s->sim_context.templayer_target, s->sim_context.templayer); + if(stream->codecs.video_codec == JANUS_VIDEOCODEC_VP8 && s->sim_context.templayer_target == s->sim_context.templayer) { + /* No need to do anything, we're already getting the right temporal layer, so notify the viewer */ + json_t *event = json_object(); + json_object_set_new(event, "streaming", json_string("event")); + json_t *result = json_object(); + json_object_set_new(result, "temporal", json_integer(s->sim_context.templayer)); + json_object_set_new(event, "result", result); + gateway->push_event(session->handle, &janus_streaming_plugin, NULL, event, NULL); + json_decref(event); + } } - if(spatial_layer == s->spatial_layer) { - /* No need to do anything, we're already getting the right spatial layer, so notify the user */ - json_t *event = json_object(); - json_object_set_new(event, "streaming", json_string("event")); - json_t *result = json_object(); - json_object_set_new(result, "spatial_layer", json_integer(s->spatial_layer)); - json_object_set_new(event, "result", result); - gateway->push_event(msg->handle, &janus_streaming_plugin, NULL, event, NULL); - json_decref(event); - } else if(spatial_layer != s->target_spatial_layer) { - /* Send a FIR to the source, if RTCP is enabled */ - g_atomic_int_set(&stream->need_pli, 1); + /* Check if we need to change the fallback timer for the substream */ + json_t *fallback = json_object_get(sconf, "fallback"); + if(fallback) { + JANUS_LOG(LOG_VERB, "Setting fallback timer (simulcast): %lld (was %"SCNu32")\n", + json_integer_value(fallback) ? json_integer_value(fallback) : 250000, + s->sim_context.drop_trigger ? s->sim_context.drop_trigger : 250000); + s->sim_context.drop_trigger = json_integer_value(fallback); } - s->target_spatial_layer = spatial_layer; } - json_t *temporal = json_object_get(root, "temporal_layer"); - if(temporal) { - int temporal_layer = json_integer_value(temporal); - if(temporal_layer > 2) { - JANUS_LOG(LOG_WARN, "Temporal layer higher than 2, will probably be ignored\n"); + if(stream && stream->svc) { + /* Check if the viewer is requesting a different SVC spatial/temporal layer */ + json_t *spatial = json_object_get(sconf, "spatial_layer"); + if(spatial) { + int spatial_layer = json_integer_value(spatial); + if(spatial_layer > 1) { + JANUS_LOG(LOG_WARN, "Spatial layer higher than 1, will probably be ignored\n"); + } + if(spatial_layer == s->spatial_layer) { + /* No need to do anything, we're already getting the right spatial layer, so notify the user */ + json_t *event = json_object(); + json_object_set_new(event, "streaming", json_string("event")); + json_t *result = json_object(); + json_object_set_new(result, "spatial_layer", json_integer(s->spatial_layer)); + json_object_set_new(event, "result", result); + gateway->push_event(msg->handle, &janus_streaming_plugin, NULL, event, NULL); + json_decref(event); + } else if(spatial_layer != s->target_spatial_layer) { + /* Send a FIR to the source, if RTCP is enabled */ + g_atomic_int_set(&stream->need_pli, 1); + } + s->target_spatial_layer = spatial_layer; } - if(temporal_layer == s->temporal_layer) { - /* No need to do anything, we're already getting the right temporal layer, so notify the user */ - json_t *event = json_object(); - json_object_set_new(event, "streaming", json_string("event")); - json_t *result = json_object(); - json_object_set_new(result, "temporal_layer", json_integer(s->temporal_layer)); - json_object_set_new(event, "result", result); - gateway->push_event(msg->handle, &janus_streaming_plugin, NULL, event, NULL); - json_decref(event); + json_t *temporal = json_object_get(sconf, "temporal_layer"); + if(temporal) { + int temporal_layer = json_integer_value(temporal); + if(temporal_layer > 2) { + JANUS_LOG(LOG_WARN, "Temporal layer higher than 2, will probably be ignored\n"); + } + if(temporal_layer == s->temporal_layer) { + /* No need to do anything, we're already getting the right temporal layer, so notify the user */ + json_t *event = json_object(); + json_object_set_new(event, "streaming", json_string("event")); + json_t *result = json_object(); + json_object_set_new(result, "temporal_layer", json_integer(s->temporal_layer)); + json_object_set_new(event, "result", result); + gateway->push_event(msg->handle, &janus_streaming_plugin, NULL, event, NULL); + json_decref(event); + } + s->target_temporal_layer = temporal_layer; } - s->target_temporal_layer = temporal_layer; } - } - if(stream && stream->type == JANUS_STREAMING_MEDIA_VIDEO && session->playoutdelay_ext) { - /* Check if we need to specify a custom playout delay for this stream */ - json_t *min_delay = json_object_get(root, "min_delay"); - if(min_delay) { - int16_t md = json_integer_value(min_delay); - if(md < 0) { - s->min_delay = -1; - s->max_delay = -1; - } else { - s->min_delay = md; - if(s->min_delay > s->max_delay) - s->max_delay = s->min_delay; + if(stream && stream->type == JANUS_STREAMING_MEDIA_VIDEO && session->playoutdelay_ext) { + /* Check if we need to specify a custom playout delay for this stream */ + json_t *min_delay = json_object_get(sconf, "min_delay"); + if(min_delay) { + int16_t md = json_integer_value(min_delay); + if(md < 0) { + s->min_delay = -1; + s->max_delay = -1; + } else { + s->min_delay = md; + if(s->min_delay > s->max_delay) + s->max_delay = s->min_delay; + } } - } - json_t *max_delay = json_object_get(root, "max_delay"); - if(max_delay) { - int16_t md = json_integer_value(max_delay); - if(md < 0) { - s->min_delay = -1; - s->max_delay = -1; - } else { - s->max_delay = md; - if(s->max_delay < s->min_delay) - s->min_delay = s->max_delay; + json_t *max_delay = json_object_get(sconf, "max_delay"); + if(max_delay) { + int16_t md = json_integer_value(max_delay); + if(md < 0) { + s->min_delay = -1; + s->max_delay = -1; + } else { + s->max_delay = md; + if(s->max_delay < s->min_delay) + s->min_delay = s->max_delay; + } } } + temp = temp->next; } - temp = temp->next; } } /* Done */