Skip to content

Commit

Permalink
New plugin callback to know when datachannel is writable (#2060)
Browse files Browse the repository at this point in the history
  • Loading branch information
lminiero committed May 18, 2020
1 parent fa12368 commit 2a98ec2
Show file tree
Hide file tree
Showing 18 changed files with 418 additions and 76 deletions.
23 changes: 22 additions & 1 deletion dtls.c
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,27 @@ int janus_dtls_verify_callback(int preverify_ok, X509_STORE_CTX *ctx) {
}

#ifdef HAVE_SCTP
void janus_dtls_sctp_data_ready(janus_dtls_srtp *dtls) {
if(dtls == NULL)
return;
janus_ice_component *component = (janus_ice_component *)dtls->component;
if(component == NULL) {
JANUS_LOG(LOG_ERR, "No component...\n");
return;
}
janus_ice_stream *stream = component->stream;
if(!stream) {
JANUS_LOG(LOG_ERR, "No stream...\n");
return;
}
janus_ice_handle *handle = stream->handle;
if(!handle || !handle->agent || !dtls->write_bio) {
JANUS_LOG(LOG_ERR, "No handle...\n");
return;
}
janus_ice_notify_data_ready(handle);
}

void janus_dtls_wrap_sctp_data(janus_dtls_srtp *dtls, char *label, gboolean textdata, char *buf, int len) {
if(dtls == NULL || !dtls->ready || dtls->sctp == NULL || buf == NULL || len < 1)
return;
Expand All @@ -1048,7 +1069,7 @@ int janus_dtls_send_sctp_data(janus_dtls_srtp *dtls, char *buf, int len) {
return res;
}

void janus_dtls_notify_data(janus_dtls_srtp *dtls, char *label, gboolean textdata, char *buf, int len) {
void janus_dtls_notify_sctp_data(janus_dtls_srtp *dtls, char *label, gboolean textdata, char *buf, int len) {
if(dtls == NULL || buf == NULL || len < 1)
return;
janus_ice_component *component = (janus_ice_component *)dtls->component;
Expand Down
8 changes: 6 additions & 2 deletions dtls.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ int janus_dtls_srtp_create_sctp(janus_dtls_srtp *dtls);
/*! \brief Handle an incoming DTLS message
* @param[in] dtls The janus_dtls_srtp instance to start the handshake on
* @param[in] buf The DTLS message data
* @param[in] len The DTLS message data lenght */
* @param[in] len The DTLS message data length */
void janus_dtls_srtp_incoming_msg(janus_dtls_srtp *dtls, char *buf, uint16_t len);
/*! \brief Send an alert on a janus_dtls_srtp instance
* @param[in] dtls The janus_dtls_srtp instance to send the alert on */
Expand All @@ -147,6 +147,10 @@ void janus_dtls_callback(const SSL *ssl, int where, int ret);
int janus_dtls_verify_callback(int preverify_ok, X509_STORE_CTX *ctx);

#ifdef HAVE_SCTP
/*! \brief Callback (called from the SCTP stack) that SCTP data (DataChannel) can be sent
* @param[in] dtls The janus_dtls_srtp instance to use */
void janus_dtls_sctp_data_ready(janus_dtls_srtp *dtls);

/*! \brief Callback (called from the ICE handle) to encapsulate in DTLS outgoing SCTP data (DataChannel)
* @param[in] dtls The janus_dtls_srtp instance to use
* @param[in] label The label of the data channel to use
Expand All @@ -168,7 +172,7 @@ int janus_dtls_send_sctp_data(janus_dtls_srtp *dtls, char *buf, int len);
* @param[in] textdata Whether the buffer is text (domstring) or binary data
* @param[in] buf The data buffer
* @param[in] len The data length */
void janus_dtls_notify_data(janus_dtls_srtp *dtls, char *label, gboolean textdata, char *buf, int len);
void janus_dtls_notify_sctp_data(janus_dtls_srtp *dtls, char *label, gboolean textdata, char *buf, int len);
#endif

/*! \brief DTLS retransmission timer
Expand Down
28 changes: 26 additions & 2 deletions ice.c
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,8 @@ static janus_ice_queued_packet
janus_ice_add_candidates,
janus_ice_dtls_handshake,
janus_ice_hangup_peerconnection,
janus_ice_detach_handle;
janus_ice_detach_handle,
janus_ice_data_ready;

/* Janus NACKed packet we're tracking (to avoid duplicates) */
typedef struct janus_ice_nacked_packet {
Expand Down Expand Up @@ -480,7 +481,8 @@ static void janus_ice_free_queued_packet(janus_ice_queued_packet *pkt) {
pkt == &janus_ice_add_candidates ||
pkt == &janus_ice_dtls_handshake ||
pkt == &janus_ice_hangup_peerconnection ||
pkt == &janus_ice_detach_handle) {
pkt == &janus_ice_detach_handle ||
pkt == &janus_ice_data_ready) {
return;
}
g_free(pkt->data);
Expand Down Expand Up @@ -4131,6 +4133,14 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu
session->session_id, handle->handle_id, "detached",
plugin ? plugin->get_package() : NULL, handle->opaque_id);
return G_SOURCE_REMOVE;
} else if(pkt == &janus_ice_data_ready) {
/* Data is writable on this PeerConnection, notify the plugin */
janus_plugin *plugin = (janus_plugin *)handle->app;
if(plugin != NULL && plugin->data_ready != NULL && handle->app_handle != NULL) {
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Telling the plugin about the data channel being ready (%s)\n",
handle->handle_id, plugin ? plugin->get_name() : "??");
plugin->data_ready(handle->app_handle);
}
}
if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_READY)) {
janus_ice_free_queued_packet(pkt);
Expand Down Expand Up @@ -4771,6 +4781,20 @@ void janus_ice_relay_sctp(janus_ice_handle *handle, char *buffer, int length) {
#endif
}

void janus_ice_notify_data_ready(janus_ice_handle *handle) {
#ifdef HAVE_SCTP
if(!handle || handle->queued_packets == NULL)
return;
/* Queue this event */
#if GLIB_CHECK_VERSION(2, 46, 0)
g_async_queue_push_front(handle->queued_packets, &janus_ice_data_ready);
#else
g_async_queue_push(handle->queued_packets, &janus_ice_data_ready);
#endif
g_main_context_wakeup(handle->mainctx);
#endif
}

void janus_ice_dtls_handshake_done(janus_ice_handle *handle, janus_ice_component *component) {
if(!handle || !component)
return;
Expand Down
7 changes: 5 additions & 2 deletions ice.h
Original file line number Diff line number Diff line change
Expand Up @@ -634,13 +634,16 @@ void janus_ice_send_remb(janus_ice_handle *handle, uint32_t bitrate);
* @param[in] label The label of the data channel the message is from
* @param[in] textdata Whether the buffer is text (domstring) or binary data
* @param[in] buffer The message data (buffer)
* @param[in] length The buffer lenght */
* @param[in] length The buffer length */
void janus_ice_incoming_data(janus_ice_handle *handle, char *label, gboolean textdata, char *buffer, int length);
/*! \brief Core SCTP/DataChannel callback, called by the SCTP stack when when there's data to send.
* @param[in] handle The Janus ICE handle associated with the peer
* @param[in] buffer The message data (buffer)
* @param[in] length The buffer lenght */
* @param[in] length The buffer length */
void janus_ice_relay_sctp(janus_ice_handle *handle, char *buffer, int length);
/*! \brief Plugin SCTP/DataChannel callback, called by the SCTP stack when data can be written
* @param[in] handle The Janus ICE handle associated with the peer */
void janus_ice_notify_data_ready(janus_ice_handle *handle);
///@}


Expand Down
8 changes: 8 additions & 0 deletions plugins/duktape/echotest.js
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ function incomingBinaryData(id, buf, len) {
relayBinaryData(id, buf, len);
}

function dataReady(id) {
// This callback is invoked when the datachannel first becomes
// available (meaning you should never send data before it has been
// invoked at least once), but also when the datachannel is ready to
// receive more data (buffers are empty), which means it can be used
// to throttle outgoing data and not send too much at a time.
}

function resumeScheduler() {
// This is the function responsible for resuming coroutines associated
// with whatever is relevant to the JS script, e.g., for this script,
Expand Down
58 changes: 55 additions & 3 deletions plugins/janus_duktape.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@
* with \c incomingTextData() or \c incomingBinaryData
* though, the performance impact of directly processing and manipulating
* RTP an RTCP packets is probably too high, and so their usage is currently
* discouraged. As an additional note, JavaScript scripts can also decide to
* discouraged. The \c dataReady() callback can be used to figure out when
* data can be sent. As an additional note, JavaScript scripts can also decide to
* implement the functions that return information about the plugin itself,
* namely \c getVersion() \c getVersionString() \c getDescription()
* \c getName() \c getAuthor() and \c getPackage(). If not implemented,
Expand Down Expand Up @@ -217,6 +218,7 @@ void janus_duktape_setup_media(janus_plugin_session *handle);
void janus_duktape_incoming_rtp(janus_plugin_session *handle, janus_plugin_rtp *packet);
void janus_duktape_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rtcp *packet);
void janus_duktape_incoming_data(janus_plugin_session *handle, janus_plugin_data *packet);
void janus_duktape_data_ready(janus_plugin_session *handle);
void janus_duktape_slow_link(janus_plugin_session *handle, int uplink, int video);
void janus_duktape_hangup_media(janus_plugin_session *handle);
void janus_duktape_destroy_session(janus_plugin_session *handle, int *error);
Expand All @@ -243,6 +245,7 @@ static janus_plugin janus_duktape_plugin =
.incoming_rtp = janus_duktape_incoming_rtp,
.incoming_rtcp = janus_duktape_incoming_rtcp,
.incoming_data = janus_duktape_incoming_data,
.data_ready = janus_duktape_data_ready,
.slow_link = janus_duktape_slow_link,
.hangup_media = janus_duktape_hangup_media,
.destroy_session = janus_duktape_destroy_session,
Expand Down Expand Up @@ -289,6 +292,7 @@ static gboolean has_incoming_rtcp = FALSE;
static gboolean has_incoming_data_legacy = FALSE, /* Legacy callback */
has_incoming_text_data = FALSE,
has_incoming_binary_data = FALSE;
static gboolean has_data_ready = FALSE;
static gboolean has_slow_link = FALSE;
/* JavaScript C scheduler (for coroutines) */
static GThread *scheduler_thread = NULL;
Expand Down Expand Up @@ -1069,6 +1073,11 @@ static duk_ret_t janus_duktape_method_relaytextdata(duk_context *ctx) {
}
janus_refcount_increase(&session->ref);
janus_mutex_unlock(&duktape_sessions_mutex);
if(!g_atomic_int_get(&session->dataready)) {
janus_refcount_decrease(&session->ref);
duk_push_error_object(ctx, DUK_ERR_ERROR, "Datachannel not ready yet for session %"SCNu32, id);
return duk_throw(ctx);
}
/* Send the data */
janus_plugin_data data = { .label = NULL, .binary = FALSE, .buffer = (char *)payload, .length = len };
janus_core->relay_data(session->handle, &data);
Expand Down Expand Up @@ -1112,6 +1121,11 @@ static duk_ret_t janus_duktape_method_relaybinarydata(duk_context *ctx) {
}
janus_refcount_increase(&session->ref);
janus_mutex_unlock(&duktape_sessions_mutex);
if(!g_atomic_int_get(&session->dataready)) {
janus_refcount_decrease(&session->ref);
duk_push_error_object(ctx, DUK_ERR_ERROR, "Datachannel not ready yet for session %"SCNu32, id);
return duk_throw(ctx);
}
janus_plugin_data data = { .label = NULL, .binary = TRUE, .buffer = (char *)payload, .length = len };
janus_core->relay_data(session->handle, &data);
janus_refcount_decrease(&session->ref);
Expand Down Expand Up @@ -1478,6 +1492,9 @@ int janus_duktape_init(janus_callbacks *callback, const char *config_path) {
duk_get_global_string(duktape_ctx, "incomingBinaryData");
if(duk_is_function(duktape_ctx, duk_get_top(duktape_ctx)-1) != 0)
has_incoming_binary_data = TRUE;
duk_get_global_string(duktape_ctx, "dataReady");
if(duk_is_function(duktape_ctx, duk_get_top(duktape_ctx)-1) != 0)
has_data_ready = TRUE;
duk_get_global_string(duktape_ctx, "slowLink");
if(duk_is_function(duktape_ctx, duk_get_top(duktape_ctx)-1) != 0)
has_slow_link = TRUE;
Expand Down Expand Up @@ -2292,6 +2309,39 @@ void janus_duktape_incoming_data(janus_plugin_session *handle, janus_plugin_data
janus_mutex_unlock_nodebug(&session->recipients_mutex);
}

void janus_duktape_data_ready(janus_plugin_session *handle) {
if(handle == NULL || handle->stopped || g_atomic_int_get(&duktape_stopping) || !g_atomic_int_get(&duktape_initialized))
return;
janus_duktape_session *session = (janus_duktape_session *)handle->plugin_handle;
if(!session) {
JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
return;
}
if(g_atomic_int_get(&session->destroyed) || g_atomic_int_get(&session->hangingup))
return;
if(g_atomic_int_compare_and_exchange(&session->dataready, 0, 1)) {
JANUS_LOG(LOG_INFO, "[%s-%p] Data channel available\n", JANUS_DUKTAPE_PACKAGE, handle);
}
/* Check if the JS script wants to receive this event */
if(has_data_ready) {
/* Yep, pass the event to the JS script and return */
janus_mutex_lock(&duktape_mutex);
duk_idx_t thr_idx = duk_push_thread(duktape_ctx);
duk_context *t = duk_get_context(duktape_ctx, thr_idx);
duk_get_global_string(t, "dataReady");
duk_push_number(t, session->id);
int res = duk_pcall(t, 1);
if(res != DUK_EXEC_SUCCESS) {
/* Something went wrong... */
JANUS_LOG(LOG_ERR, "Duktape error: %s\n", duk_safe_to_string(t, -1));
}
duk_pop(t);
duk_pop(duktape_ctx);
janus_mutex_unlock(&duktape_mutex);
return;
}
}

void janus_duktape_slow_link(janus_plugin_session *handle, int uplink, int video) {
if(handle == NULL || handle->stopped || g_atomic_int_get(&duktape_stopping) || !g_atomic_int_get(&duktape_initialized))
return;
Expand Down Expand Up @@ -2345,11 +2395,12 @@ void janus_duktape_hangup_media(janus_plugin_session *handle) {
janus_refcount_decrease(&session->ref);
return;
}
if(g_atomic_int_add(&session->hangingup, 1)) {
if(!g_atomic_int_compare_and_exchange(&session->hangingup, 0, 1)) {
janus_refcount_decrease(&session->ref);
return;
}
g_atomic_int_set(&session->started, 0);
g_atomic_int_set(&session->dataready, 0);

/* Reset the media properties */
session->accept_audio = FALSE;
Expand Down Expand Up @@ -2430,7 +2481,8 @@ static void janus_duktape_relay_data_packet(gpointer data, gpointer user_data) {
return;
}
janus_duktape_session *session = (janus_duktape_session *)data;
if(!session || !session->handle || !g_atomic_int_get(&session->started) || !session->accept_data) {
if(!session || !session->handle || !g_atomic_int_get(&session->started) ||
!session->accept_data || !g_atomic_int_get(&session->dataready)) {
return;
}
if(janus_core != NULL) {
Expand Down
1 change: 1 addition & 0 deletions plugins/janus_duktape_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ typedef struct janus_duktape_session {
janus_recorder *drc; /* The Janus recorder instance for data, if enabled */
janus_mutex rec_mutex; /* Mutex to protect the recorders from race conditions */
volatile gint started; /* Whether this session's PeerConnection is ready or not */
volatile gint dataready; /* Whether the data channel was established on this sessions's PeerConnection */
volatile gint hangingup; /* Whether this session's PeerConnection is hanging up */
volatile gint destroyed; /* Whether this session's been marked as destroyed */
/* If you need any additional property (e.g., for hooks you added in janus_duktape_extra.c) add them below this line */
Expand Down
9 changes: 9 additions & 0 deletions plugins/janus_echotest.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ void janus_echotest_setup_media(janus_plugin_session *handle);
void janus_echotest_incoming_rtp(janus_plugin_session *handle, janus_plugin_rtp *packet);
void janus_echotest_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rtcp *packet);
void janus_echotest_incoming_data(janus_plugin_session *handle, janus_plugin_data *packet);
void janus_echotest_data_ready(janus_plugin_session *handle);
void janus_echotest_slow_link(janus_plugin_session *handle, int uplink, int video);
void janus_echotest_hangup_media(janus_plugin_session *handle);
void janus_echotest_destroy_session(janus_plugin_session *handle, int *error);
Expand All @@ -174,6 +175,7 @@ static janus_plugin janus_echotest_plugin =
.incoming_rtp = janus_echotest_incoming_rtp,
.incoming_rtcp = janus_echotest_incoming_rtcp,
.incoming_data = janus_echotest_incoming_data,
.data_ready = janus_echotest_data_ready,
.slow_link = janus_echotest_slow_link,
.hangup_media = janus_echotest_hangup_media,
.destroy_session = janus_echotest_destroy_session,
Expand Down Expand Up @@ -689,6 +691,13 @@ void janus_echotest_incoming_data(janus_plugin_session *handle, janus_plugin_dat
}
}

void janus_echotest_data_ready(janus_plugin_session *handle) {
if(handle == NULL || g_atomic_int_get(&handle->stopped) ||
g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized) || !gateway)
return;
/* Data channels are writable */
}

void janus_echotest_slow_link(janus_plugin_session *handle, int uplink, int video) {
/* The core is informing us that our peer got or sent too many NACKs, are we pushing media too hard? */
if(handle == NULL || g_atomic_int_get(&handle->stopped) || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
Expand Down
Loading

0 comments on commit 2a98ec2

Please sign in to comment.