From 0f70c31f2b2f9d5391b82a47dfe5b8448c00beb5 Mon Sep 17 00:00:00 2001 From: Lorenzo Miniero Date: Thu, 9 Apr 2020 12:57:28 +0200 Subject: [PATCH] New plugin callback to know when datachannel is writable --- dtls.c | 23 +++++- dtls.h | 8 ++- ice.c | 28 +++++++- ice.h | 7 +- plugins/duktape/echotest.js | 8 +++ plugins/janus_duktape.c | 58 ++++++++++++++- plugins/janus_duktape_data.h | 1 + plugins/janus_echotest.c | 9 +++ plugins/janus_lua.c | 54 +++++++++++++- plugins/janus_lua_data.h | 1 + plugins/janus_streaming.c | 82 +++++++++++++-------- plugins/janus_textroom.c | 22 +++++- plugins/janus_videocall.c | 20 +++++- plugins/janus_videoroom.c | 19 ++++- plugins/lua/echotest.lua | 8 +++ plugins/plugin.h | 9 ++- sctp.c | 135 +++++++++++++++++++++++++++-------- sctp.h | 2 + 18 files changed, 418 insertions(+), 76 deletions(-) diff --git a/dtls.c b/dtls.c index ac9817a6a1..ba29380777 100644 --- a/dtls.c +++ b/dtls.c @@ -1031,6 +1031,27 @@ int janus_dtls_verify_callback(int preverify_ok, X509_STORE_CTX *ctx) { } #ifdef HAVE_SCTP +void janus_dtls_sctp_data_ready(janus_dtls_srtp *dtls) { + if(dtls == NULL) + return; + janus_ice_component *component = (janus_ice_component *)dtls->component; + if(component == NULL) { + JANUS_LOG(LOG_ERR, "No component...\n"); + return; + } + janus_ice_stream *stream = component->stream; + if(!stream) { + JANUS_LOG(LOG_ERR, "No stream...\n"); + return; + } + janus_ice_handle *handle = stream->handle; + if(!handle || !handle->agent || !dtls->write_bio) { + JANUS_LOG(LOG_ERR, "No handle...\n"); + return; + } + janus_ice_notify_data_ready(handle); +} + void janus_dtls_wrap_sctp_data(janus_dtls_srtp *dtls, char *label, gboolean textdata, char *buf, int len) { if(dtls == NULL || !dtls->ready || dtls->sctp == NULL || buf == NULL || len < 1) return; @@ -1048,7 +1069,7 @@ int janus_dtls_send_sctp_data(janus_dtls_srtp *dtls, char *buf, int len) { return res; } -void janus_dtls_notify_data(janus_dtls_srtp *dtls, char *label, gboolean textdata, char *buf, int len) { +void janus_dtls_notify_sctp_data(janus_dtls_srtp *dtls, char *label, gboolean textdata, char *buf, int len) { if(dtls == NULL || buf == NULL || len < 1) return; janus_ice_component *component = (janus_ice_component *)dtls->component; diff --git a/dtls.h b/dtls.h index 6b6215d26f..f69a4a6d7b 100644 --- a/dtls.h +++ b/dtls.h @@ -125,7 +125,7 @@ int janus_dtls_srtp_create_sctp(janus_dtls_srtp *dtls); /*! \brief Handle an incoming DTLS message * @param[in] dtls The janus_dtls_srtp instance to start the handshake on * @param[in] buf The DTLS message data - * @param[in] len The DTLS message data lenght */ + * @param[in] len The DTLS message data length */ void janus_dtls_srtp_incoming_msg(janus_dtls_srtp *dtls, char *buf, uint16_t len); /*! \brief Send an alert on a janus_dtls_srtp instance * @param[in] dtls The janus_dtls_srtp instance to send the alert on */ @@ -147,6 +147,10 @@ void janus_dtls_callback(const SSL *ssl, int where, int ret); int janus_dtls_verify_callback(int preverify_ok, X509_STORE_CTX *ctx); #ifdef HAVE_SCTP +/*! \brief Callback (called from the SCTP stack) that SCTP data (DataChannel) can be sent + * @param[in] dtls The janus_dtls_srtp instance to use */ +void janus_dtls_sctp_data_ready(janus_dtls_srtp *dtls); + /*! \brief Callback (called from the ICE handle) to encapsulate in DTLS outgoing SCTP data (DataChannel) * @param[in] dtls The janus_dtls_srtp instance to use * @param[in] label The label of the data channel to use @@ -168,7 +172,7 @@ int janus_dtls_send_sctp_data(janus_dtls_srtp *dtls, char *buf, int len); * @param[in] textdata Whether the buffer is text (domstring) or binary data * @param[in] buf The data buffer * @param[in] len The data length */ -void janus_dtls_notify_data(janus_dtls_srtp *dtls, char *label, gboolean textdata, char *buf, int len); +void janus_dtls_notify_sctp_data(janus_dtls_srtp *dtls, char *label, gboolean textdata, char *buf, int len); #endif /*! \brief DTLS retransmission timer diff --git a/ice.c b/ice.c index 81d89b5901..78cf4139fa 100644 --- a/ice.c +++ b/ice.c @@ -321,7 +321,8 @@ static janus_ice_queued_packet janus_ice_add_candidates, janus_ice_dtls_handshake, janus_ice_hangup_peerconnection, - janus_ice_detach_handle; + janus_ice_detach_handle, + janus_ice_data_ready; /* Janus NACKed packet we're tracking (to avoid duplicates) */ typedef struct janus_ice_nacked_packet { @@ -478,7 +479,8 @@ static void janus_ice_free_queued_packet(janus_ice_queued_packet *pkt) { pkt == &janus_ice_add_candidates || pkt == &janus_ice_dtls_handshake || pkt == &janus_ice_hangup_peerconnection || - pkt == &janus_ice_detach_handle) { + pkt == &janus_ice_detach_handle || + pkt == &janus_ice_data_ready) { return; } g_free(pkt->data); @@ -4095,6 +4097,14 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu session->session_id, handle->handle_id, "detached", plugin ? plugin->get_package() : NULL, handle->opaque_id); return G_SOURCE_REMOVE; + } else if(pkt == &janus_ice_data_ready) { + /* Data is writable on this PeerConnection, notify the plugin */ + janus_plugin *plugin = (janus_plugin *)handle->app; + if(plugin != NULL && plugin->data_ready != NULL && handle->app_handle != NULL) { + JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Telling the plugin about the data channel being ready (%s)\n", + handle->handle_id, plugin ? plugin->get_name() : "??"); + plugin->data_ready(handle->app_handle); + } } if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_READY)) { janus_ice_free_queued_packet(pkt); @@ -4735,6 +4745,20 @@ void janus_ice_relay_sctp(janus_ice_handle *handle, char *buffer, int length) { #endif } +void janus_ice_notify_data_ready(janus_ice_handle *handle) { +#ifdef HAVE_SCTP + if(!handle || handle->queued_packets == NULL) + return; + /* Queue this event */ +#if GLIB_CHECK_VERSION(2, 46, 0) + g_async_queue_push_front(handle->queued_packets, &janus_ice_data_ready); +#else + g_async_queue_push(handle->queued_packets, &janus_ice_data_ready); +#endif + g_main_context_wakeup(handle->mainctx); +#endif +} + void janus_ice_dtls_handshake_done(janus_ice_handle *handle, janus_ice_component *component) { if(!handle || !component) return; diff --git a/ice.h b/ice.h index 151daa14bf..d34d29b1de 100644 --- a/ice.h +++ b/ice.h @@ -630,13 +630,16 @@ void janus_ice_send_remb(janus_ice_handle *handle, uint32_t bitrate); * @param[in] label The label of the data channel the message is from * @param[in] textdata Whether the buffer is text (domstring) or binary data * @param[in] buffer The message data (buffer) - * @param[in] length The buffer lenght */ + * @param[in] length The buffer length */ void janus_ice_incoming_data(janus_ice_handle *handle, char *label, gboolean textdata, char *buffer, int length); /*! \brief Core SCTP/DataChannel callback, called by the SCTP stack when when there's data to send. * @param[in] handle The Janus ICE handle associated with the peer * @param[in] buffer The message data (buffer) - * @param[in] length The buffer lenght */ + * @param[in] length The buffer length */ void janus_ice_relay_sctp(janus_ice_handle *handle, char *buffer, int length); +/*! \brief Plugin SCTP/DataChannel callback, called by the SCTP stack when data can be written + * @param[in] handle The Janus ICE handle associated with the peer */ +void janus_ice_notify_data_ready(janus_ice_handle *handle); ///@} diff --git a/plugins/duktape/echotest.js b/plugins/duktape/echotest.js index e3d0469b64..1a5efc578c 100644 --- a/plugins/duktape/echotest.js +++ b/plugins/duktape/echotest.js @@ -185,6 +185,14 @@ function incomingBinaryData(id, buf, len) { relayBinaryData(id, buf, len); } +function dataReady(id) { + // This callback is invoked when the datachannel first becomes + // available (meaning you should never send data before it has been + // invoked at least once), but also when the datachannel is ready to + // receive more data (buffers are empty), which means it can be used + // to throttle outgoing data and not send too much at a time. +} + function resumeScheduler() { // This is the function responsible for resuming coroutines associated // with whatever is relevant to the JS script, e.g., for this script, diff --git a/plugins/janus_duktape.c b/plugins/janus_duktape.c index 54a2b60cb4..d84d8856de 100644 --- a/plugins/janus_duktape.c +++ b/plugins/janus_duktape.c @@ -59,7 +59,8 @@ * with \c incomingTextData() or \c incomingBinaryData * though, the performance impact of directly processing and manipulating * RTP an RTCP packets is probably too high, and so their usage is currently - * discouraged. As an additional note, JavaScript scripts can also decide to + * discouraged. The \c dataReady() callback can be used to figure out when + * data can be sent. As an additional note, JavaScript scripts can also decide to * implement the functions that return information about the plugin itself, * namely \c getVersion() \c getVersionString() \c getDescription() * \c getName() \c getAuthor() and \c getPackage(). If not implemented, @@ -217,6 +218,7 @@ void janus_duktape_setup_media(janus_plugin_session *handle); void janus_duktape_incoming_rtp(janus_plugin_session *handle, janus_plugin_rtp *packet); void janus_duktape_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rtcp *packet); void janus_duktape_incoming_data(janus_plugin_session *handle, janus_plugin_data *packet); +void janus_duktape_data_ready(janus_plugin_session *handle); void janus_duktape_slow_link(janus_plugin_session *handle, int uplink, int video); void janus_duktape_hangup_media(janus_plugin_session *handle); void janus_duktape_destroy_session(janus_plugin_session *handle, int *error); @@ -243,6 +245,7 @@ static janus_plugin janus_duktape_plugin = .incoming_rtp = janus_duktape_incoming_rtp, .incoming_rtcp = janus_duktape_incoming_rtcp, .incoming_data = janus_duktape_incoming_data, + .data_ready = janus_duktape_data_ready, .slow_link = janus_duktape_slow_link, .hangup_media = janus_duktape_hangup_media, .destroy_session = janus_duktape_destroy_session, @@ -289,6 +292,7 @@ static gboolean has_incoming_rtcp = FALSE; static gboolean has_incoming_data_legacy = FALSE, /* Legacy callback */ has_incoming_text_data = FALSE, has_incoming_binary_data = FALSE; +static gboolean has_data_ready = FALSE; static gboolean has_slow_link = FALSE; /* JavaScript C scheduler (for coroutines) */ static GThread *scheduler_thread = NULL; @@ -1069,6 +1073,11 @@ static duk_ret_t janus_duktape_method_relaytextdata(duk_context *ctx) { } janus_refcount_increase(&session->ref); janus_mutex_unlock(&duktape_sessions_mutex); + if(!g_atomic_int_get(&session->dataready)) { + janus_refcount_decrease(&session->ref); + duk_push_error_object(ctx, DUK_ERR_ERROR, "Datachannel not ready yet for session %"SCNu32, id); + return duk_throw(ctx); + } /* Send the data */ janus_plugin_data data = { .label = NULL, .binary = FALSE, .buffer = (char *)payload, .length = len }; janus_core->relay_data(session->handle, &data); @@ -1112,6 +1121,11 @@ static duk_ret_t janus_duktape_method_relaybinarydata(duk_context *ctx) { } janus_refcount_increase(&session->ref); janus_mutex_unlock(&duktape_sessions_mutex); + if(!g_atomic_int_get(&session->dataready)) { + janus_refcount_decrease(&session->ref); + duk_push_error_object(ctx, DUK_ERR_ERROR, "Datachannel not ready yet for session %"SCNu32, id); + return duk_throw(ctx); + } janus_plugin_data data = { .label = NULL, .binary = TRUE, .buffer = (char *)payload, .length = len }; janus_core->relay_data(session->handle, &data); janus_refcount_decrease(&session->ref); @@ -1467,6 +1481,9 @@ int janus_duktape_init(janus_callbacks *callback, const char *config_path) { duk_get_global_string(duktape_ctx, "incomingBinaryData"); if(duk_is_function(duktape_ctx, duk_get_top(duktape_ctx)-1) != 0) has_incoming_binary_data = TRUE; + duk_get_global_string(duktape_ctx, "dataReady"); + if(duk_is_function(duktape_ctx, duk_get_top(duktape_ctx)-1) != 0) + has_data_ready = TRUE; duk_get_global_string(duktape_ctx, "slowLink"); if(duk_is_function(duktape_ctx, duk_get_top(duktape_ctx)-1) != 0) has_slow_link = TRUE; @@ -2281,6 +2298,39 @@ void janus_duktape_incoming_data(janus_plugin_session *handle, janus_plugin_data janus_mutex_unlock_nodebug(&session->recipients_mutex); } +void janus_duktape_data_ready(janus_plugin_session *handle) { + if(handle == NULL || handle->stopped || g_atomic_int_get(&duktape_stopping) || !g_atomic_int_get(&duktape_initialized)) + return; + janus_duktape_session *session = (janus_duktape_session *)handle->plugin_handle; + if(!session) { + JANUS_LOG(LOG_ERR, "No session associated with this handle...\n"); + return; + } + if(g_atomic_int_get(&session->destroyed) || g_atomic_int_get(&session->hangingup)) + return; + if(g_atomic_int_compare_and_exchange(&session->dataready, 0, 1)) { + JANUS_LOG(LOG_INFO, "[%s-%p] Data channel available\n", JANUS_DUKTAPE_PACKAGE, handle); + } + /* Check if the JS script wants to receive this event */ + if(has_data_ready) { + /* Yep, pass the event to the JS script and return */ + janus_mutex_lock(&duktape_mutex); + duk_idx_t thr_idx = duk_push_thread(duktape_ctx); + duk_context *t = duk_get_context(duktape_ctx, thr_idx); + duk_get_global_string(t, "dataReady"); + duk_push_number(t, session->id); + int res = duk_pcall(t, 1); + if(res != DUK_EXEC_SUCCESS) { + /* Something went wrong... */ + JANUS_LOG(LOG_ERR, "Duktape error: %s\n", duk_safe_to_string(t, -1)); + } + duk_pop(t); + duk_pop(duktape_ctx); + janus_mutex_unlock(&duktape_mutex); + return; + } +} + void janus_duktape_slow_link(janus_plugin_session *handle, int uplink, int video) { if(handle == NULL || handle->stopped || g_atomic_int_get(&duktape_stopping) || !g_atomic_int_get(&duktape_initialized)) return; @@ -2334,11 +2384,12 @@ void janus_duktape_hangup_media(janus_plugin_session *handle) { janus_refcount_decrease(&session->ref); return; } - if(g_atomic_int_add(&session->hangingup, 1)) { + if(!g_atomic_int_compare_and_exchange(&session->hangingup, 0, 1)) { janus_refcount_decrease(&session->ref); return; } g_atomic_int_set(&session->started, 0); + g_atomic_int_set(&session->dataready, 0); /* Reset the media properties */ session->accept_audio = FALSE; @@ -2419,7 +2470,8 @@ static void janus_duktape_relay_data_packet(gpointer data, gpointer user_data) { return; } janus_duktape_session *session = (janus_duktape_session *)data; - if(!session || !session->handle || !g_atomic_int_get(&session->started) || !session->accept_data) { + if(!session || !session->handle || !g_atomic_int_get(&session->started) || + !session->accept_data || !g_atomic_int_get(&session->dataready)) { return; } if(janus_core != NULL) { diff --git a/plugins/janus_duktape_data.h b/plugins/janus_duktape_data.h index fd77c1d77d..647c3125bf 100644 --- a/plugins/janus_duktape_data.h +++ b/plugins/janus_duktape_data.h @@ -70,6 +70,7 @@ typedef struct janus_duktape_session { janus_recorder *drc; /* The Janus recorder instance for data, if enabled */ janus_mutex rec_mutex; /* Mutex to protect the recorders from race conditions */ volatile gint started; /* Whether this session's PeerConnection is ready or not */ + volatile gint dataready; /* Whether the data channel was established on this sessions's PeerConnection */ volatile gint hangingup; /* Whether this session's PeerConnection is hanging up */ volatile gint destroyed; /* Whether this session's been marked as destroyed */ /* If you need any additional property (e.g., for hooks you added in janus_duktape_extra.c) add them below this line */ diff --git a/plugins/janus_echotest.c b/plugins/janus_echotest.c index 038b0525e2..38571b23f1 100644 --- a/plugins/janus_echotest.c +++ b/plugins/janus_echotest.c @@ -145,6 +145,7 @@ void janus_echotest_setup_media(janus_plugin_session *handle); void janus_echotest_incoming_rtp(janus_plugin_session *handle, janus_plugin_rtp *packet); void janus_echotest_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rtcp *packet); void janus_echotest_incoming_data(janus_plugin_session *handle, janus_plugin_data *packet); +void janus_echotest_data_ready(janus_plugin_session *handle); void janus_echotest_slow_link(janus_plugin_session *handle, int uplink, int video); void janus_echotest_hangup_media(janus_plugin_session *handle); void janus_echotest_destroy_session(janus_plugin_session *handle, int *error); @@ -171,6 +172,7 @@ static janus_plugin janus_echotest_plugin = .incoming_rtp = janus_echotest_incoming_rtp, .incoming_rtcp = janus_echotest_incoming_rtcp, .incoming_data = janus_echotest_incoming_data, + .data_ready = janus_echotest_data_ready, .slow_link = janus_echotest_slow_link, .hangup_media = janus_echotest_hangup_media, .destroy_session = janus_echotest_destroy_session, @@ -682,6 +684,13 @@ void janus_echotest_incoming_data(janus_plugin_session *handle, janus_plugin_dat } } +void janus_echotest_data_ready(janus_plugin_session *handle) { + if(handle == NULL || g_atomic_int_get(&handle->stopped) || + g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized) || !gateway) + return; + /* Data channels are writable */ +} + void janus_echotest_slow_link(janus_plugin_session *handle, int uplink, int video) { /* The core is informing us that our peer got or sent too many NACKs, are we pushing media too hard? */ if(handle == NULL || g_atomic_int_get(&handle->stopped) || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) diff --git a/plugins/janus_lua.c b/plugins/janus_lua.c index 1c2a94b72b..4364582823 100644 --- a/plugins/janus_lua.c +++ b/plugins/janus_lua.c @@ -59,7 +59,8 @@ * with \c incomingTextData() or \c incomingBinaryData * though, the performance impact of directly processing and manipulating * RTP an RTCP packets is probably too high, and so their usage is currently - * discouraged. As an additional note, Lua scripts can also decide to + * discouraged. The \c dataReady() callback can be used to figure out when + * data can be sent. As an additional note, Lua scripts can also decide to * implement the functions that return information about the plugin itself, * namely \c getVersion() \c getVersionString() \c getDescription() * \c getName() \c getAuthor() and \c getPackage(). If not implemented, @@ -218,6 +219,7 @@ void janus_lua_setup_media(janus_plugin_session *handle); void janus_lua_incoming_rtp(janus_plugin_session *handle, janus_plugin_rtp *packet); void janus_lua_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rtcp *packet); void janus_lua_incoming_data(janus_plugin_session *handle, janus_plugin_data *packet); +void janus_lua_data_ready(janus_plugin_session *handle); void janus_lua_slow_link(janus_plugin_session *handle, int uplink, int video); void janus_lua_hangup_media(janus_plugin_session *handle); void janus_lua_destroy_session(janus_plugin_session *handle, int *error); @@ -244,6 +246,7 @@ static janus_plugin janus_lua_plugin = .incoming_rtp = janus_lua_incoming_rtp, .incoming_rtcp = janus_lua_incoming_rtcp, .incoming_data = janus_lua_incoming_data, + .data_ready = janus_lua_data_ready, .slow_link = janus_lua_slow_link, .hangup_media = janus_lua_hangup_media, .destroy_session = janus_lua_destroy_session, @@ -289,6 +292,7 @@ static gboolean has_incoming_rtcp = FALSE; static gboolean has_incoming_data_legacy = FALSE, /* Legacy callback */ has_incoming_text_data = FALSE, has_incoming_binary_data = FALSE; +static gboolean has_data_ready = FALSE; static gboolean has_slow_link = FALSE; /* Lua C scheduler (for coroutines) */ static GThread *scheduler_thread = NULL; @@ -944,6 +948,12 @@ static int janus_lua_method_relaytextdata(lua_State *s) { } janus_refcount_increase(&session->ref); janus_mutex_unlock(&lua_sessions_mutex); + if(!g_atomic_int_get(&session->dataready)) { + janus_refcount_decrease(&session->ref); + JANUS_LOG(LOG_WARN, "Datachannel not ready yet for session %"SCNu32", dropping data\n", id); + lua_pushnumber(s, -1); + return 1; + } /* Send the data */ janus_plugin_data data = { .label = NULL, .binary = FALSE, .buffer = (char *)payload, .length = len }; janus_core->relay_data(session->handle, &data); @@ -979,6 +989,12 @@ static int janus_lua_method_relaybinarydata(lua_State *s) { } janus_refcount_increase(&session->ref); janus_mutex_unlock(&lua_sessions_mutex); + if(!g_atomic_int_get(&session->dataready)) { + janus_refcount_decrease(&session->ref); + JANUS_LOG(LOG_WARN, "Datachannel not ready yet for session %"SCNu32", dropping data\n", id); + lua_pushnumber(s, -1); + return 1; + } /* Send the data */ janus_plugin_data data = { .label = NULL, .binary = TRUE, .buffer = (char *)payload, .length = len }; janus_core->relay_data(session->handle, &data); @@ -1293,6 +1309,9 @@ int janus_lua_init(janus_callbacks *callback, const char *config_path) { lua_getglobal(lua_state, "incomingBinaryData"); if(lua_isfunction(lua_state, lua_gettop(lua_state)) != 0) has_incoming_binary_data = TRUE; + lua_getglobal(lua_state, "dataReady"); + if(lua_isfunction(lua_state, lua_gettop(lua_state)) != 0) + has_data_ready = TRUE; lua_getglobal(lua_state, "slowLink"); if(lua_isfunction(lua_state, lua_gettop(lua_state)) != 0) has_slow_link = TRUE; @@ -1967,6 +1986,33 @@ void janus_lua_incoming_data(janus_plugin_session *handle, janus_plugin_data *pa janus_mutex_unlock_nodebug(&session->recipients_mutex); } +void janus_lua_data_ready(janus_plugin_session *handle) { + if(handle == NULL || handle->stopped || g_atomic_int_get(&lua_stopping) || !g_atomic_int_get(&lua_initialized)) + return; + janus_lua_session *session = (janus_lua_session *)handle->plugin_handle; + if(!session) { + JANUS_LOG(LOG_ERR, "No session associated with this handle...\n"); + return; + } + if(g_atomic_int_get(&session->destroyed) || g_atomic_int_get(&session->hangingup)) + return; + if(g_atomic_int_compare_and_exchange(&session->dataready, 0, 1)) { + JANUS_LOG(LOG_INFO, "[%s-%p] Data channel available\n", JANUS_LUA_PACKAGE, handle); + } + /* Check if the Lua script wants to receive this event */ + if(has_data_ready) { + /* Yep, pass the event to the Lua script and return */ + janus_mutex_lock(&lua_mutex); + lua_State *t = lua_newthread(lua_state); + lua_getglobal(t, "dataReady"); + lua_pushnumber(t, session->id); + lua_call(t, 1, 0); + lua_pop(lua_state, 1); + janus_mutex_unlock(&lua_mutex); + return; + } +} + void janus_lua_slow_link(janus_plugin_session *handle, int uplink, int video) { if(handle == NULL || handle->stopped || g_atomic_int_get(&lua_stopping) || !g_atomic_int_get(&lua_initialized)) return; @@ -2014,11 +2060,12 @@ void janus_lua_hangup_media(janus_plugin_session *handle) { janus_refcount_decrease(&session->ref); return; } - if(g_atomic_int_add(&session->hangingup, 1)) { + if(!g_atomic_int_compare_and_exchange(&session->hangingup, 0, 1)) { janus_refcount_decrease(&session->ref); return; } g_atomic_int_set(&session->started, 0); + g_atomic_int_set(&session->dataready, 0); /* Reset the media properties */ session->accept_audio = FALSE; @@ -2093,7 +2140,8 @@ static void janus_lua_relay_data_packet(gpointer data, gpointer user_data) { return; } janus_lua_session *session = (janus_lua_session *)data; - if(!session || !session->handle || !g_atomic_int_get(&session->started) || !session->accept_data) { + if(!session || !session->handle || !g_atomic_int_get(&session->started) || + !session->accept_data || !g_atomic_int_get(&session->dataready)) { return; } if(janus_core != NULL) { diff --git a/plugins/janus_lua_data.h b/plugins/janus_lua_data.h index dd5e9a13f6..449e0adb7e 100644 --- a/plugins/janus_lua_data.h +++ b/plugins/janus_lua_data.h @@ -70,6 +70,7 @@ typedef struct janus_lua_session { janus_recorder *drc; /* The Janus recorder instance for data, if enabled */ janus_mutex rec_mutex; /* Mutex to protect the recorders from race conditions */ volatile gint started; /* Whether this session's PeerConnection is ready or not */ + volatile gint dataready; /* Whether the data channel was established on this sessions's PeerConnection */ volatile gint hangingup; /* Whether this session's PeerConnection is hanging up */ volatile gint destroyed; /* Whether this session's been marked as destroyed */ /* If you need any additional property (e.g., for hooks you added in janus_lua_extra.c) add them below this line */ diff --git a/plugins/janus_streaming.c b/plugins/janus_streaming.c index e5a520ec21..f461d807cb 100644 --- a/plugins/janus_streaming.c +++ b/plugins/janus_streaming.c @@ -738,6 +738,7 @@ json_t *janus_streaming_handle_admin_message(json_t *message); void janus_streaming_setup_media(janus_plugin_session *handle); void janus_streaming_incoming_rtp(janus_plugin_session *handle, janus_plugin_rtp *packet); void janus_streaming_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rtcp *packet); +void janus_streaming_data_ready(janus_plugin_session *handle); void janus_streaming_hangup_media(janus_plugin_session *handle); void janus_streaming_destroy_session(janus_plugin_session *handle, int *error); json_t *janus_streaming_query_session(janus_plugin_session *handle); @@ -763,6 +764,7 @@ static janus_plugin janus_streaming_plugin = .setup_media = janus_streaming_setup_media, .incoming_rtp = janus_streaming_incoming_rtp, .incoming_rtcp = janus_streaming_incoming_rtcp, + .data_ready = janus_streaming_data_ready, .hangup_media = janus_streaming_hangup_media, .destroy_session = janus_streaming_destroy_session, .query_session = janus_streaming_query_session, @@ -1170,8 +1172,8 @@ typedef struct janus_streaming_session { janus_streaming_mountpoint *mountpoint; gint64 sdp_sessid; gint64 sdp_version; - gboolean started; - gboolean paused; + volatile gint started; + volatile gint paused; gboolean audio, video, data; /* Whether audio, video and/or data must be sent to this listener */ janus_rtp_switching_context context; janus_rtp_simulcasting_context sim_context; @@ -1181,7 +1183,8 @@ typedef struct janus_streaming_session { int spatial_layer, target_spatial_layer; gint64 last_spatial_layer[3]; int temporal_layer, target_temporal_layer; - gboolean stopping; + volatile gint dataready; + volatile gint stopping; volatile gint renegotiating; volatile gint hangingup; volatile gint destroyed; @@ -2223,8 +2226,8 @@ void janus_streaming_create_session(janus_plugin_session *handle, int *error) { janus_streaming_session *session = g_malloc0(sizeof(janus_streaming_session)); session->handle = handle; session->mountpoint = NULL; /* This will happen later */ - session->started = FALSE; /* This will happen later */ - session->paused = FALSE; + g_atomic_int_set(&session->started, 0); + g_atomic_int_set(&session->paused, 0); g_atomic_int_set(&session->destroyed, 0); g_atomic_int_set(&session->hangingup, 0); handle->plugin_handle = session; @@ -2306,7 +2309,10 @@ json_t *janus_streaming_query_session(janus_plugin_session *handle) { } janus_refcount_decrease(&mp->ref); } - json_object_set_new(info, "hangingup", json_integer(g_atomic_int_get(&session->hangingup))); + json_object_set_new(info, "started", json_integer(g_atomic_int_get(&session->started))); + json_object_set_new(info, "dataready", json_integer(g_atomic_int_get(&session->dataready))); + json_object_set_new(info, "paused", json_integer(g_atomic_int_get(&session->paused))); + json_object_set_new(info, "stopping", json_integer(g_atomic_int_get(&session->stopping))); json_object_set_new(info, "destroyed", json_integer(g_atomic_int_get(&session->destroyed))); janus_refcount_decrease(&session->ref); return info; @@ -3650,9 +3656,9 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi while(viewer) { janus_streaming_session *s = (janus_streaming_session *)viewer->data; if(s != NULL) { - s->stopping = TRUE; - s->started = FALSE; - s->paused = FALSE; + g_atomic_int_set(&s->stopping, 1); + g_atomic_int_set(&s->started, 0); + g_atomic_int_set(&s->paused, 0); s->mountpoint = NULL; /* Tell the core to tear down the PeerConnection, hangup_media will do the rest */ gateway->push_event(s->handle, &janus_streaming_plugin, NULL, event, NULL); @@ -4254,7 +4260,7 @@ void janus_streaming_setup_media(janus_plugin_session *handle) { janus_mutex_unlock(&source->buffermsg_mutex); } } - session->started = TRUE; + g_atomic_int_set(&session->started, 1); /* Prepare JSON event */ json_t *event = json_object(); json_object_set_new(event, "streaming", json_string("event")); @@ -4277,7 +4283,8 @@ void janus_streaming_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rt if(handle == NULL || g_atomic_int_get(&handle->stopped) || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) return; janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle; - if(!session || g_atomic_int_get(&session->destroyed) || session->stopping || !session->started || session->paused) + if(!session || g_atomic_int_get(&session->destroyed) || g_atomic_int_get(&session->stopping) || + !g_atomic_int_get(&session->started) || g_atomic_int_get(&session->paused)) return; janus_streaming_mountpoint *mp = (janus_streaming_mountpoint *)session->mountpoint; if(mp->streaming_source != janus_streaming_source_rtp) @@ -4309,6 +4316,19 @@ void janus_streaming_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rt } } +void janus_streaming_data_ready(janus_plugin_session *handle) { + if(handle == NULL || g_atomic_int_get(&handle->stopped) || + g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized) || !gateway) + return; + /* Data channels are writable: we shouldn't send any datachannel message before this happens */ + janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle; + if(!session || g_atomic_int_get(&session->destroyed) || g_atomic_int_get(&session->hangingup)) + return; + if(g_atomic_int_compare_and_exchange(&session->dataready, 0, 1)) { + JANUS_LOG(LOG_INFO, "[%s-%p] Data channel available\n", JANUS_STREAMING_PACKAGE, handle); + } +} + void janus_streaming_hangup_media(janus_plugin_session *handle) { JANUS_LOG(LOG_INFO, "[%s-%p] No WebRTC media anymore\n", JANUS_STREAMING_PACKAGE, handle); janus_mutex_lock(&sessions_mutex); @@ -4328,6 +4348,10 @@ static void janus_streaming_hangup_media_internal(janus_plugin_session *handle) return; if(!g_atomic_int_compare_and_exchange(&session->hangingup, 0, 1)) return; + g_atomic_int_set(&session->dataready, 0); + g_atomic_int_set(&session->stopping, 1); + g_atomic_int_set(&session->started, 0); + g_atomic_int_set(&session->paused, 0); janus_rtp_switching_context_reset(&session->context); janus_rtp_simulcasting_context_reset(&session->sim_context); janus_vp8_simulcast_context_reset(&session->vp8_context); @@ -4338,9 +4362,6 @@ static void janus_streaming_hangup_media_internal(janus_plugin_session *handle) session->last_spatial_layer[2] = 0; session->temporal_layer = -1; session->target_temporal_layer = 2; /* FIXME Chrome sends 0, 1 and 2 */ - session->stopping = TRUE; - session->started = FALSE; - session->paused = FALSE; janus_streaming_mountpoint *mp = session->mountpoint; if(mp) { janus_mutex_lock(&mp->mutex); @@ -4509,7 +4530,7 @@ static void *janus_streaming_handler(void *data) { goto error; } else { /* Make sure it's not an API error */ - if(!session->started) { + if(!g_atomic_int_get(&session->started)) { /* Can't be a renegotiation, PeerConnection isn't up yet */ janus_mutex_unlock(&mp->mutex); JANUS_LOG(LOG_ERR, "Already watching mountpoint %s\n", session->mountpoint->id_str); @@ -4544,7 +4565,7 @@ static void *janus_streaming_handler(void *data) { g_snprintf(error_cause, 512, "Already watching a stream"); goto error; } - session->stopping = FALSE; + g_atomic_int_set(&session->stopping, 1); session->mountpoint = mp; session->sdp_version = 1; /* This needs to be increased when it changes */ session->sdp_sessid = janus_get_real_time(); @@ -4771,10 +4792,10 @@ static void *janus_streaming_handler(void *data) { goto error; } JANUS_LOG(LOG_VERB, "Starting the streaming\n"); - session->paused = FALSE; + g_atomic_int_set(&session->paused, 0); result = json_object(); /* We wait for the setup_media event to start: on the other hand, it may have already arrived */ - json_object_set_new(result, "status", json_string(session->started ? "started" : "starting")); + json_object_set_new(result, "status", json_string(g_atomic_int_get(&session->started) ? "started" : "starting")); /* Also notify event handlers */ if(notify_events && gateway->events_is_enabled()) { json_t *info = json_object(); @@ -4792,7 +4813,7 @@ static void *janus_streaming_handler(void *data) { goto error; } JANUS_LOG(LOG_VERB, "Pausing the streaming\n"); - session->paused = TRUE; + g_atomic_int_set(&session->paused, 1); result = json_object(); json_object_set_new(result, "status", json_string("pausing")); /* Also notify event handlers */ @@ -4977,7 +4998,7 @@ static void *janus_streaming_handler(void *data) { } janus_mutex_unlock(&mountpoints_mutex); JANUS_LOG(LOG_VERB, "Request to switch to mountpoint/stream %s (old: %s)\n", mp->id_str, oldmp->id_str); - session->paused = TRUE; + g_atomic_int_set(&session->paused, 1); /* Unsubscribe from the previous mountpoint and subscribe to the new one */ janus_mutex_lock(&oldmp->mutex); oldmp->viewers = g_list_remove_all(oldmp->viewers, session); @@ -5024,7 +5045,7 @@ static void *janus_streaming_handler(void *data) { } janus_mutex_unlock(&mp->mutex); session->mountpoint = mp; - session->paused = FALSE; + g_atomic_int_set(&session->paused, 1); /* Done */ janus_refcount_decrease(&oldmp->ref); /* This is for the request being done with it */ result = json_object(); @@ -5038,7 +5059,7 @@ static void *janus_streaming_handler(void *data) { gateway->notify_event(&janus_streaming_plugin, session->handle, info); } } else if(!strcasecmp(request_text, "stop")) { - if(session->stopping || !session->started) { + if(g_atomic_int_get(&session->stopping) || !g_atomic_int_get(&session->started)) { /* Been there, done that: ignore */ janus_streaming_message_free(msg); continue; @@ -6842,7 +6863,8 @@ static void *janus_streaming_ondemand_thread(void *data) { /* Loop */ gint read = 0, plen = (sizeof(buf)-RTP_HEADER_SIZE); janus_streaming_rtp_relay_packet packet; - while(!g_atomic_int_get(&stopping) && !g_atomic_int_get(&mountpoint->destroyed) && !session->stopping && !g_atomic_int_get(&session->destroyed)) { + while(!g_atomic_int_get(&stopping) && !g_atomic_int_get(&mountpoint->destroyed) && + !g_atomic_int_get(&session->stopping) && !g_atomic_int_get(&session->destroyed)) { /* See if it's time to prepare a frame */ gettimeofday(&now, NULL); d_s = now.tv_sec - before.tv_sec; @@ -6863,7 +6885,7 @@ static void *janus_streaming_ondemand_thread(void *data) { before.tv_usec -= 1000000; } /* If not started or paused, wait some more */ - if(!session->started || session->paused || !mountpoint->enabled) + if(!g_atomic_int_get(&session->started) || g_atomic_int_get(&session->paused) || !mountpoint->enabled) continue; if(source->opus) { #ifdef HAVE_LIBOGG @@ -7747,9 +7769,9 @@ static void *janus_streaming_relay_thread(void *data) { while(viewer) { janus_streaming_session *session = (janus_streaming_session *)viewer->data; if(session != NULL) { - session->stopping = TRUE; - session->started = FALSE; - session->paused = FALSE; + g_atomic_int_set(&session->stopping, 1); + g_atomic_int_set(&session->started, 0); + g_atomic_int_set(&session->paused, 0); session->mountpoint = NULL; /* Tell the core to tear down the PeerConnection, hangup_media will do the rest */ gateway->push_event(session->handle, &janus_streaming_plugin, NULL, event, NULL); @@ -7790,7 +7812,7 @@ static void janus_streaming_relay_rtp_packet(gpointer data, gpointer user_data) //~ JANUS_LOG(LOG_ERR, "Invalid session...\n"); return; } - if(!packet->is_keyframe && (!session->started || session->paused)) { + if(!packet->is_keyframe && (!g_atomic_int_get(&session->started) || g_atomic_int_get(&session->paused))) { //~ JANUS_LOG(LOG_ERR, "Streaming not started yet for this session...\n"); return; } @@ -8039,7 +8061,7 @@ static void janus_streaming_relay_rtp_packet(gpointer data, gpointer user_data) /* We're broadcasting a data channel message */ if(!session->data) return; - if(gateway != NULL && packet->data != NULL) { + if(gateway != NULL && packet->data != NULL && g_atomic_int_get(&session->dataready)) { janus_plugin_data data = { .label = NULL, .binary = !packet->textdata, .buffer = (char *)packet->data, .length = packet->length }; gateway->relay_data(session->handle, &data); @@ -8061,7 +8083,7 @@ static void janus_streaming_relay_rtcp_packet(gpointer data, gpointer user_data) //~ JANUS_LOG(LOG_ERR, "Invalid session...\n"); return; } - if(!session->started || session->paused) { + if(!g_atomic_int_get(&session->started) || g_atomic_int_get(&session->paused)) { //~ JANUS_LOG(LOG_ERR, "Streaming not started yet for this session...\n"); return; } diff --git a/plugins/janus_textroom.c b/plugins/janus_textroom.c index e69a89485a..7096491667 100644 --- a/plugins/janus_textroom.c +++ b/plugins/janus_textroom.c @@ -535,6 +535,7 @@ void janus_textroom_setup_media(janus_plugin_session *handle); void janus_textroom_incoming_rtp(janus_plugin_session *handle, janus_plugin_rtp *packet); void janus_textroom_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rtcp *packet); void janus_textroom_incoming_data(janus_plugin_session *handle, janus_plugin_data *packet); +void janus_textroom_data_ready(janus_plugin_session *handle); void janus_textroom_slow_link(janus_plugin_session *handle, int uplink, int video); void janus_textroom_hangup_media(janus_plugin_session *handle); void janus_textroom_destroy_session(janus_plugin_session *handle, int *error); @@ -561,6 +562,7 @@ static janus_plugin janus_textroom_plugin = .incoming_rtp = janus_textroom_incoming_rtp, .incoming_rtcp = janus_textroom_incoming_rtcp, .incoming_data = janus_textroom_incoming_data, + .data_ready = janus_textroom_data_ready, .slow_link = janus_textroom_slow_link, .hangup_media = janus_textroom_hangup_media, .destroy_session = janus_textroom_destroy_session, @@ -687,6 +689,7 @@ typedef struct janus_textroom_session { GHashTable *rooms; /* Map of rooms this user is in, and related participant instance */ janus_mutex mutex; /* Mutex to lock this session */ volatile gint setup; + volatile gint dataready; volatile gint hangingup; volatile gint destroyed; janus_refcount ref; @@ -1109,6 +1112,7 @@ void janus_textroom_create_session(janus_plugin_session *handle, int *error) { janus_mutex_init(&session->mutex); janus_refcount_init(&session->ref, janus_textroom_session_free); g_atomic_int_set(&session->setup, 0); + g_atomic_int_set(&session->dataready, 0); g_atomic_int_set(&session->hangingup, 0); handle->plugin_handle = session; janus_mutex_lock(&sessions_mutex); @@ -1330,7 +1334,7 @@ json_t *janus_textroom_handle_admin_message(json_t *message) { } void janus_textroom_setup_media(janus_plugin_session *handle) { - JANUS_LOG(LOG_INFO, "WebRTC media is now available\n"); + JANUS_LOG(LOG_INFO, "[%s-%p] WebRTC media is now available\n", JANUS_TEXTROOM_PACKAGE, handle); if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) return; janus_mutex_lock(&sessions_mutex); @@ -1389,6 +1393,19 @@ void janus_textroom_incoming_data(janus_plugin_session *handle, janus_plugin_dat janus_refcount_decrease(&session->ref); } +void janus_textroom_data_ready(janus_plugin_session *handle) { + if(handle == NULL || g_atomic_int_get(&handle->stopped) || + g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized) || !gateway) + return; + /* Data channels are writable: we shouldn't send anything before this happens */ + janus_textroom_session *session = (janus_textroom_session *)handle->plugin_handle; + if(!session || g_atomic_int_get(&session->destroyed) || g_atomic_int_get(&session->hangingup)) + return; + if(g_atomic_int_compare_and_exchange(&session->dataready, 0, 1)) { + JANUS_LOG(LOG_INFO, "[%s-%p] Data channel available\n", JANUS_TEXTROOM_PACKAGE, handle); + } +} + /* Helper method to handle incoming messages from the data channel */ janus_plugin_result *janus_textroom_handle_incoming_request(janus_plugin_session *handle, char *text, json_t *json, gboolean internal) { janus_textroom_session *session = NULL; @@ -2811,7 +2828,7 @@ void janus_textroom_hangup_media(janus_plugin_session *handle) { } static void janus_textroom_hangup_media_internal(janus_plugin_session *handle) { - JANUS_LOG(LOG_INFO, "No WebRTC media anymore\n"); + JANUS_LOG(LOG_INFO, "[%s-%p] No WebRTC media anymore\n", JANUS_TEXTROOM_PACKAGE, handle); if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) return; janus_textroom_session *session = janus_textroom_lookup_session(handle); @@ -2823,6 +2840,7 @@ static void janus_textroom_hangup_media_internal(janus_plugin_session *handle) { return; if(!g_atomic_int_compare_and_exchange(&session->hangingup, 0, 1)) return; + g_atomic_int_set(&session->dataready, 0); /* Get rid of all participants */ janus_mutex_lock(&session->mutex); GList *list = NULL; diff --git a/plugins/janus_videocall.c b/plugins/janus_videocall.c index 62f005ea1a..d9c89b7783 100644 --- a/plugins/janus_videocall.c +++ b/plugins/janus_videocall.c @@ -291,6 +291,7 @@ void janus_videocall_setup_media(janus_plugin_session *handle); void janus_videocall_incoming_rtp(janus_plugin_session *handle, janus_plugin_rtp *packet); void janus_videocall_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rtcp *packet); void janus_videocall_incoming_data(janus_plugin_session *handle, janus_plugin_data *packet); +void janus_videocall_data_ready(janus_plugin_session *handle); void janus_videocall_slow_link(janus_plugin_session *handle, int uplink, int video); void janus_videocall_hangup_media(janus_plugin_session *handle); void janus_videocall_destroy_session(janus_plugin_session *handle, int *error); @@ -316,6 +317,7 @@ static janus_plugin janus_videocall_plugin = .incoming_rtp = janus_videocall_incoming_rtp, .incoming_rtcp = janus_videocall_incoming_rtcp, .incoming_data = janus_videocall_incoming_data, + .data_ready = janus_videocall_data_ready, .slow_link = janus_videocall_slow_link, .hangup_media = janus_videocall_hangup_media, .destroy_session = janus_videocall_destroy_session, @@ -384,6 +386,7 @@ typedef struct janus_videocall_session { janus_recorder *drc; /* The Janus recorder instance for this user's data, if enabled */ janus_mutex rec_mutex; /* Mutex to protect the recorders from race conditions */ volatile gint incall; + volatile gint dataready; volatile gint hangingup; volatile gint destroyed; janus_refcount ref; @@ -860,7 +863,8 @@ void janus_videocall_incoming_data(janus_plugin_session *handle, janus_plugin_da JANUS_LOG(LOG_ERR, "Session has no peer...\n"); return; } - if(g_atomic_int_get(&session->destroyed) || g_atomic_int_get(&peer->destroyed)) + if(g_atomic_int_get(&session->destroyed) || g_atomic_int_get(&peer->destroyed) || + !g_atomic_int_get(&peer->dataready)) return; if(packet->buffer == NULL || packet->length == 0) return; @@ -882,6 +886,19 @@ void janus_videocall_incoming_data(janus_plugin_session *handle, janus_plugin_da } } +void janus_videocall_data_ready(janus_plugin_session *handle) { + if(handle == NULL || g_atomic_int_get(&handle->stopped) || + g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized) || !gateway) + return; + /* Data channels are writable */ + janus_videocall_session *session = (janus_videocall_session *)handle->plugin_handle; + if(!session || g_atomic_int_get(&session->destroyed) || g_atomic_int_get(&session->hangingup)) + return; + if(g_atomic_int_compare_and_exchange(&session->dataready, 0, 1)) { + JANUS_LOG(LOG_INFO, "[%s-%p] Data channel available\n", JANUS_VIDEOCALL_PACKAGE, handle); + } +} + void janus_videocall_slow_link(janus_plugin_session *handle, int uplink, int video) { /* The core is informing us that our peer got or sent too many NACKs, are we pushing media too hard? */ if(handle == NULL || g_atomic_int_get(&handle->stopped) || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) @@ -964,6 +981,7 @@ static void janus_videocall_hangup_media_internal(janus_plugin_session *handle) return; if(!g_atomic_int_compare_and_exchange(&session->hangingup, 0, 1)) return; + g_atomic_int_set(&session->dataready, 0); /* Get rid of the recorders, if available */ janus_mutex_lock(&session->rec_mutex); janus_videocall_recorder_close(session); diff --git a/plugins/janus_videoroom.c b/plugins/janus_videoroom.c index 80eaf0ca77..0d325d0326 100644 --- a/plugins/janus_videoroom.c +++ b/plugins/janus_videoroom.c @@ -1091,6 +1091,7 @@ void janus_videoroom_setup_media(janus_plugin_session *handle); void janus_videoroom_incoming_rtp(janus_plugin_session *handle, janus_plugin_rtp *packet); void janus_videoroom_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rtcp *packet); void janus_videoroom_incoming_data(janus_plugin_session *handle, janus_plugin_data *packet); +void janus_videoroom_data_ready(janus_plugin_session *handle); void janus_videoroom_slow_link(janus_plugin_session *handle, int uplink, int video); void janus_videoroom_hangup_media(janus_plugin_session *handle); void janus_videoroom_destroy_session(janus_plugin_session *handle, int *error); @@ -1117,6 +1118,7 @@ static janus_plugin janus_videoroom_plugin = .incoming_rtp = janus_videoroom_incoming_rtp, .incoming_rtcp = janus_videoroom_incoming_rtcp, .incoming_data = janus_videoroom_incoming_data, + .data_ready = janus_videoroom_data_ready, .slow_link = janus_videoroom_slow_link, .hangup_media = janus_videoroom_hangup_media, .destroy_session = janus_videoroom_destroy_session, @@ -1384,6 +1386,7 @@ typedef struct janus_videoroom_session { gpointer participant; gboolean started; gboolean stopping; + volatile gint dataready; volatile gint hangingup; volatile gint destroyed; janus_mutex mutex; @@ -4987,6 +4990,19 @@ void janus_videoroom_incoming_data(janus_plugin_session *handle, janus_plugin_da janus_videoroom_publisher_dereference_nodebug(participant); } +void janus_videoroom_data_ready(janus_plugin_session *handle) { + if(handle == NULL || g_atomic_int_get(&handle->stopped) || + g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized) || !gateway) + return; + /* Data channels are writable */ + janus_videoroom_session *session = (janus_videoroom_session *)handle->plugin_handle; + if(!session || g_atomic_int_get(&session->destroyed) || g_atomic_int_get(&session->hangingup)) + return; + if(g_atomic_int_compare_and_exchange(&session->dataready, 0, 1)) { + JANUS_LOG(LOG_INFO, "[%s-%p] Data channel available\n", JANUS_VIDEOROOM_PACKAGE, handle); + } +} + void janus_videoroom_slow_link(janus_plugin_session *handle, int uplink, int video) { /* The core is informing us that our peer got too many NACKs, are we pushing media too hard? */ if(handle == NULL || g_atomic_int_get(&handle->stopped) || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) @@ -5192,6 +5208,7 @@ static void janus_videoroom_hangup_media_internal(janus_plugin_session *handle) janus_mutex_unlock(&sessions_mutex); return; } + g_atomic_int_set(&session->dataready, 0); janus_refcount_increase(&session->ref); janus_mutex_unlock(&sessions_mutex); /* Send an event to the browser and tell the PeerConnection is over */ @@ -7202,7 +7219,7 @@ static void janus_videoroom_relay_data_packet(gpointer data, gpointer user_data) if(!session || !session->handle) { return; } - if(!session->started) { + if(!session->started || !g_atomic_int_get(&session->dataready)) { return; } if(gateway != NULL && packet->data != NULL) { diff --git a/plugins/lua/echotest.lua b/plugins/lua/echotest.lua index 8c433b05f0..30515d4737 100644 --- a/plugins/lua/echotest.lua +++ b/plugins/lua/echotest.lua @@ -181,6 +181,14 @@ function incomingBinaryData(id, buf, len) relayBinaryData(id, buf, len); end +function dataReady(id) { + -- This callback is invoked when the datachannel first becomes + -- available (meaning you should never send data before it has been + -- invoked at least once), but also when the datachannel is ready to + -- receive more data (buffers are empty), which means it can be used + -- to throttle outgoing data and not send too much at a time. +} + function resumeScheduler() -- This is the function responsible for resuming coroutines associated -- with whatever is relevant to the Lua script, e.g., for this script, diff --git a/plugins/plugin.h b/plugins/plugin.h index 88b5870434..3f583db96c 100644 --- a/plugins/plugin.h +++ b/plugins/plugin.h @@ -65,6 +65,7 @@ janus_plugin *create(void) { * - \c incoming_rtp(): a callback to notify you a peer has sent you a RTP packet; * - \c incoming_rtcp(): a callback to notify you a peer has sent you a RTCP message; * - \c incoming_data(): a callback to notify you a peer has sent you a message on a SCTP DataChannel; + * - \c data_ready(): a callback to notify you data can be sent on the SCTP DataChannel; * - \c slow_link(): a callback to notify you a peer has sent a lot of NACKs recently, and the media path may be slow; * - \c hangup_media(): a callback to notify you the peer PeerConnection has been closed (e.g., after a DTLS alert); * - \c query_session(): this method is called by the core to get plugin-specific info on a session between you and a peer; @@ -170,7 +171,7 @@ janus_plugin *create(void) { * Janus instance or it will crash. * */ -#define JANUS_PLUGIN_API_VERSION 14 +#define JANUS_PLUGIN_API_VERSION 15 /*! \brief Initialization of all plugin properties to NULL * @@ -205,6 +206,7 @@ static janus_plugin janus_echotest_plugin = .incoming_rtp = NULL, \ .incoming_rtcp = NULL, \ .incoming_data = NULL, \ + .data_ready = NULL, \ .slow_link = NULL, \ .hangup_media = NULL, \ .destroy_session = NULL, \ @@ -310,6 +312,11 @@ struct janus_plugin { * @param[in] handle The plugin/gateway session used for this peer * @param[in] packet The message data and related info */ void (* const incoming_data)(janus_plugin_session *handle, janus_plugin_data *packet); + /*! \brief Method to be notified about the fact that the datachannel is ready to be written + * \note This is not only called when the PeerConnection first becomes available, but also + * when the SCTP socket becomes writable again, e.g., because the internal buffer is empty. + * @param[in] handle The plugin/gateway session used for this peer */ + void (* const data_ready)(janus_plugin_session *handle); /*! \brief Method to be notified by the core when too many NACKs have * been received or sent by Janus, and so a slow or potentially * unreliable network is to be expected for this peer diff --git a/sctp.c b/sctp.c index 66ae8621e7..555463b266 100644 --- a/sctp.c +++ b/sctp.c @@ -48,6 +48,7 @@ static const char *default_label = "JanusDataChannel"; #define SCTP_MAX_PACKET_SIZE (1<<16) +/* Events we're interested in */ static uint16_t event_types[] = { SCTP_ASSOC_CHANGE, SCTP_PEER_ADDR_CHANGE, @@ -55,10 +56,39 @@ static uint16_t event_types[] = { SCTP_SHUTDOWN_EVENT, SCTP_ADAPTATION_INDICATION, SCTP_SEND_FAILED_EVENT, + SCTP_SENDER_DRY_EVENT, SCTP_STREAM_RESET_EVENT, SCTP_STREAM_CHANGE_EVENT }; +/* Buffered message (in case we can't send right away) */ +typedef struct janus_sctp_pending_message { + uint16_t id; + gboolean textdata; + char *buf; + size_t len; +} janus_sctp_pending_message; +static janus_sctp_pending_message *janus_sctp_pending_message_create(uint16_t id, gboolean textdata, char *buf, size_t len) { + janus_sctp_pending_message *m = g_malloc(sizeof(janus_sctp_pending_message)); + m->id = id; + m->textdata = textdata; + if(buf != NULL && len > 0) { + m->buf = g_malloc(len); + memcpy(m->buf, buf, len); + } else { + m->buf = NULL; + m->len = 0; + } + return m; +} +static void janus_sctp_pending_message_free(janus_sctp_pending_message *m) { + if(m != NULL) { + g_free(m->buf); + g_free(m); + } +} + +/* usrsctp callbacks and methods */ int janus_sctp_data_to_dtls(void *instance, void *buffer, size_t length, uint8_t tos, uint8_t set_df); static int janus_sctp_incoming_data(struct socket *sock, union sctp_sockstore addr, void *data, size_t datalen, struct sctp_rcvinfo rcv, int flags, void *ulp_info); janus_sctp_channel *janus_sctp_find_channel_by_stream(janus_sctp_association *sctp, uint16_t stream); @@ -114,6 +144,8 @@ static void janus_sctp_association_free(const janus_refcount *sctp_ref) { /* This association can be destroyed, free all the resources */ janus_refcount_decrease(&sctp->handle->ref); janus_refcount_decrease(&sctp->dtls->ref); + if(sctp->pending_messages != NULL) + g_queue_free_full(sctp->pending_messages, (GDestroyNotify)janus_sctp_pending_message_free); #ifdef DEBUG_SCTP if(sctp->debug_dump != NULL) fclose(sctp->debug_dump); @@ -343,7 +375,24 @@ static int janus_sctp_incoming_data(struct socket *sock, union sctp_sockstore ad } void janus_sctp_send_data(janus_sctp_association *sctp, char *label, gboolean textdata, char *buf, int len) { - if(sctp == NULL || buf == NULL || len <= 0) + if(sctp == NULL) + return; + if(sctp->pending_messages != NULL && !g_queue_is_empty(sctp->pending_messages)) { + /* Messages waiting in the queue, send those first */ + janus_sctp_pending_message *m = g_queue_peek_head(sctp->pending_messages); + while(m != NULL) { + int res = janus_sctp_send_text_or_binary(sctp, m->id, m->textdata, m->buf, m->len); + if(res == -2) { + JANUS_LOG(LOG_WARN, "[%"SCNu64"] Got EAGAIN when trying to resend pending message on channel %"SCNu16"\n", + sctp->handle_id, m->id); + break; + } + (void)g_queue_pop_head(sctp->pending_messages); + janus_sctp_pending_message_free(m); + m = g_queue_peek_head(sctp->pending_messages); + } + } + if(buf == NULL || len <= 0) return; if(label == NULL) label = (char *)default_label; @@ -380,7 +429,28 @@ void janus_sctp_send_data(janus_sctp_association *sctp, char *label, gboolean te } } /* Send the data, whether it's text or binary */ - janus_sctp_send_text_or_binary(sctp, i, textdata, buf, len); + if(sctp->pending_messages != NULL && !g_queue_is_empty(sctp->pending_messages)) { + /* We couldn't send all pending messages, queue the new one as well */ + if(buf != NULL && len > 0) { + JANUS_LOG(LOG_WARN, "[%"SCNu64"] Couldn't send all pending messages, queueing new message\n", + sctp->handle_id); + janus_sctp_pending_message *m = janus_sctp_pending_message_create(i, textdata, buf, len); + if(sctp->pending_messages == NULL) + sctp->pending_messages = g_queue_new(); + g_queue_push_tail(sctp->pending_messages, m); + } + return; + } + int res = janus_sctp_send_text_or_binary(sctp, i, textdata, buf, len); + if(res == -2) { + /* Delivery failed with an EAGAIN, queue and retry later */ + JANUS_LOG(LOG_WARN, "[%"SCNu64"] Got EAGAIN when trying to send message on channel %"SCNu16", retrying later\n", + sctp->handle_id, i); + janus_sctp_pending_message *m = janus_sctp_pending_message_create(i, textdata, buf, len); + if(sctp->pending_messages == NULL) + sctp->pending_messages = g_queue_new(); + g_queue_push_tail(sctp->pending_messages, m); + } } @@ -447,7 +517,7 @@ void janus_sctp_request_more_streams(janus_sctp_association *sctp) { streams_needed = 0; for(i = 0; i < NUMBER_OF_CHANNELS; i++) { if((sctp->channels[i].state == DATA_CHANNEL_CONNECTING) && - (sctp->channels[i].stream == 0)) { + (sctp->channels[i].stream == 0)) { streams_needed++; } } @@ -510,10 +580,10 @@ int janus_sctp_send_open_request_message(struct socket *sock, uint16_t stream, c sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL); if(usrsctp_sendv(sock, - req, sizeof(janus_datachannel_open_request) + label_size, - NULL, 0, - &sndinfo, (socklen_t)sizeof(struct sctp_sndinfo), - SCTP_SENDV_SNDINFO, 0) < 0) { + req, sizeof(janus_datachannel_open_request) + label_size, + NULL, 0, + &sndinfo, (socklen_t)sizeof(struct sctp_sndinfo), + SCTP_SENDV_SNDINFO, 0) < 0) { JANUS_LOG(LOG_ERR, "usrsctp_sendv error (%d)\n", errno); g_free(req); req = NULL; @@ -540,10 +610,10 @@ int janus_sctp_send_open_response_message(struct socket *sock, uint16_t stream) sndinfo.snd_flags = SCTP_EOR; sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL); if(usrsctp_sendv(sock, - &rsp, sizeof(janus_datachannel_open_response), - NULL, 0, - &sndinfo, (socklen_t)sizeof(struct sctp_sndinfo), - SCTP_SENDV_SNDINFO, 0) < 0) { + &rsp, sizeof(janus_datachannel_open_response), + NULL, 0, + &sndinfo, (socklen_t)sizeof(struct sctp_sndinfo), + SCTP_SENDV_SNDINFO, 0) < 0) { JANUS_LOG(LOG_ERR, "usrsctp_sendv error (%d)\n", errno); return 0; } else { @@ -563,10 +633,10 @@ int janus_sctp_send_open_ack_message(struct socket *sock, uint16_t stream) { sndinfo.snd_flags = SCTP_EOR; sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL); if(usrsctp_sendv(sock, - &ack, sizeof(janus_datachannel_ack), - NULL, 0, - &sndinfo, (socklen_t)sizeof(struct sctp_sndinfo), - SCTP_SENDV_SNDINFO, 0) < 0) { + &ack, sizeof(janus_datachannel_ack), + NULL, 0, + &sndinfo, (socklen_t)sizeof(struct sctp_sndinfo), + SCTP_SENDV_SNDINFO, 0) < 0) { JANUS_LOG(LOG_ERR, "usrsctp_sendv error (%d)\n", errno); return 0; } else { @@ -696,7 +766,12 @@ int janus_sctp_send_text_or_binary(janus_sctp_association *sctp, uint16_t id, gb if(usrsctp_sendv(sctp->sock, text, length, NULL, 0, &spa, (socklen_t)sizeof(struct sctp_sendv_spa), SCTP_SENDV_SPA, 0) < 0) { - JANUS_LOG(LOG_ERR, "[%"SCNu64"] sctp_sendv error (%d)\n", sctp->handle_id, errno); + int res = errno; + if(res == EAGAIN) { + /* Couldn't send the message right away, add to the queue and retry later */ + return -2; + } + JANUS_LOG(LOG_ERR, "[%"SCNu64"] sctp_sendv error (%d)\n", sctp->handle_id, res); return -1; } JANUS_LOG(LOG_VERB, "[%"SCNu64"] Message sent on channel %"SCNu16"\n", sctp->handle_id, id); @@ -927,11 +1002,11 @@ void janus_sctp_handle_data_message(janus_sctp_association *sctp, gboolean textd } else { /* XXX: Protect for non 0 terminated buffer */ JANUS_LOG(LOG_VERB, "[%"SCNu64"] SCTP data received of length %zu on channel with id %d.\n", - sctp->handle_id, length, channel->id); + sctp->handle_id, length, channel->id); JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Incoming SCTP contents: %.*s\n", - sctp->handle_id, (int)length, buffer); + sctp->handle_id, (int)length, buffer); /* Pass this to the core */ - janus_dtls_notify_data(sctp->dtls, channel->label, textdata, buffer, (int)length); + janus_dtls_notify_sctp_data(sctp->dtls, channel->label, textdata, buffer, (int)length); } return; } @@ -1008,7 +1083,7 @@ void janus_sctp_handle_message(janus_sctp_association *sctp, char *buffer, size_ break; default: JANUS_LOG(LOG_VERB, "[%"SCNu64"] Message of length %zu, PPID %u on stream %u received.\n", - sctp->handle_id, length, ppid, stream); + sctp->handle_id, length, ppid, stream); break; } } @@ -1038,10 +1113,10 @@ void janus_sctp_handle_association_change_event(struct sctp_assoc_change *sac) { break; } JANUS_LOG(LOG_VERB, ", streams (in/out) = (%u/%u)", - sac->sac_inbound_streams, sac->sac_outbound_streams); + sac->sac_inbound_streams, sac->sac_outbound_streams); n = sac->sac_length - sizeof(struct sctp_assoc_change); if(((sac->sac_state == SCTP_COMM_UP) || - (sac->sac_state == SCTP_RESTART)) && (n > 0)) { + (sac->sac_state == SCTP_RESTART)) && (n > 0)) { JANUS_LOG(LOG_VERB, ", supports"); for(i = 0; i < n; i++) { switch (sac->sac_info[i]) { @@ -1066,7 +1141,7 @@ void janus_sctp_handle_association_change_event(struct sctp_assoc_change *sac) { } } } else if(((sac->sac_state == SCTP_COMM_LOST) || - (sac->sac_state == SCTP_CANT_STR_ASSOC)) && (n > 0)) { + (sac->sac_state == SCTP_CANT_STR_ASSOC)) && (n > 0)) { JANUS_LOG(LOG_VERB, ", ABORT ="); for(i = 0; i < n; i++) { JANUS_LOG(LOG_VERB, " 0x%02x", sac->sac_info[i]); @@ -1164,7 +1239,7 @@ void janus_sctp_handle_stream_reset_event(janus_sctp_association *sctp, struct s } JANUS_LOG(LOG_VERB, ".\n"); if(!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) && - !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) { + !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) { for(i = 0; i < n; i++) { if(strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN || strrst->strreset_flags & SCTP_STREAM_RESET_OUTGOING_SSN) { @@ -1218,8 +1293,8 @@ void janus_sctp_handle_send_failed_event(struct sctp_send_failed_event *ssfe) { JANUS_LOG(LOG_VERB, "(flags = %x) ", ssfe->ssfe_flags); } JANUS_LOG(LOG_VERB, "message with PPID = %d, SID = %d, flags: 0x%04x due to error = 0x%08x", - ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid, - ssfe->ssfe_info.snd_flags, ssfe->ssfe_error); + ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid, + ssfe->ssfe_info.snd_flags, ssfe->ssfe_error); n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event); for(i = 0; i < n; i++) { JANUS_LOG(LOG_VERB, " 0x%02x", ssfe->ssfe_data[i]); @@ -1252,8 +1327,12 @@ void janus_sctp_handle_notification(janus_sctp_association *sctp, union sctp_not break; case SCTP_AUTHENTICATION_EVENT: break; - case SCTP_SENDER_DRY_EVENT: + case SCTP_SENDER_DRY_EVENT: { + /* Internal buffers empty, notify the application they can send again */ + if(sctp != NULL && !g_atomic_int_get(&sctp->destroyed)) + janus_dtls_sctp_data_ready(sctp->dtls); break; + } case SCTP_NOTIFICATIONS_STOPPED_EVENT: break; case SCTP_SEND_FAILED_EVENT: @@ -1268,7 +1347,7 @@ void janus_sctp_handle_notification(janus_sctp_association *sctp, union sctp_not case SCTP_ASSOC_RESET_EVENT: break; case SCTP_STREAM_CHANGE_EVENT: - JANUS_LOG(LOG_VERB, "Stream change (in/out) = (%u/%u)\n", + JANUS_LOG(LOG_VERB, "[%"SCNu64"] Stream change (in/out) = (%u/%u)\n", sctp ? sctp->handle_id : 0, notif->sn_strchange_event.strchange_instrms, notif->sn_strchange_event.strchange_outstrms); break; default: diff --git a/sctp.h b/sctp.h index 1cd13f0a8b..c49e840b6a 100644 --- a/sctp.h +++ b/sctp.h @@ -123,6 +123,8 @@ typedef struct janus_sctp_association { size_t buflen; /*! \brief Current offset of the buffer for handling partial messages */ size_t offset; + /*! \brief Buffer of pending messages */ + GQueue *pending_messages; #ifdef DEBUG_SCTP FILE *debug_dump; #endif