Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New plugin callback to know when datachannel is writable #2060

Merged
merged 3 commits into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion dtls.c
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,27 @@ int janus_dtls_verify_callback(int preverify_ok, X509_STORE_CTX *ctx) {
}

#ifdef HAVE_SCTP
void janus_dtls_sctp_data_ready(janus_dtls_srtp *dtls) {
if(dtls == NULL)
return;
janus_ice_component *component = (janus_ice_component *)dtls->component;
if(component == NULL) {
JANUS_LOG(LOG_ERR, "No component...\n");
return;
}
janus_ice_stream *stream = component->stream;
if(!stream) {
JANUS_LOG(LOG_ERR, "No stream...\n");
return;
}
janus_ice_handle *handle = stream->handle;
if(!handle || !handle->agent || !dtls->write_bio) {
JANUS_LOG(LOG_ERR, "No handle...\n");
return;
}
janus_ice_notify_data_ready(handle);
}

void janus_dtls_wrap_sctp_data(janus_dtls_srtp *dtls, char *label, gboolean textdata, char *buf, int len) {
if(dtls == NULL || !dtls->ready || dtls->sctp == NULL || buf == NULL || len < 1)
return;
Expand All @@ -1048,7 +1069,7 @@ int janus_dtls_send_sctp_data(janus_dtls_srtp *dtls, char *buf, int len) {
return res;
}

void janus_dtls_notify_data(janus_dtls_srtp *dtls, char *label, gboolean textdata, char *buf, int len) {
void janus_dtls_notify_sctp_data(janus_dtls_srtp *dtls, char *label, gboolean textdata, char *buf, int len) {
if(dtls == NULL || buf == NULL || len < 1)
return;
janus_ice_component *component = (janus_ice_component *)dtls->component;
Expand Down
8 changes: 6 additions & 2 deletions dtls.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ int janus_dtls_srtp_create_sctp(janus_dtls_srtp *dtls);
/*! \brief Handle an incoming DTLS message
* @param[in] dtls The janus_dtls_srtp instance to start the handshake on
* @param[in] buf The DTLS message data
* @param[in] len The DTLS message data lenght */
* @param[in] len The DTLS message data length */
void janus_dtls_srtp_incoming_msg(janus_dtls_srtp *dtls, char *buf, uint16_t len);
/*! \brief Send an alert on a janus_dtls_srtp instance
* @param[in] dtls The janus_dtls_srtp instance to send the alert on */
Expand All @@ -147,6 +147,10 @@ void janus_dtls_callback(const SSL *ssl, int where, int ret);
int janus_dtls_verify_callback(int preverify_ok, X509_STORE_CTX *ctx);

#ifdef HAVE_SCTP
/*! \brief Callback (called from the SCTP stack) that SCTP data (DataChannel) can be sent
* @param[in] dtls The janus_dtls_srtp instance to use */
void janus_dtls_sctp_data_ready(janus_dtls_srtp *dtls);

/*! \brief Callback (called from the ICE handle) to encapsulate in DTLS outgoing SCTP data (DataChannel)
* @param[in] dtls The janus_dtls_srtp instance to use
* @param[in] label The label of the data channel to use
Expand All @@ -168,7 +172,7 @@ int janus_dtls_send_sctp_data(janus_dtls_srtp *dtls, char *buf, int len);
* @param[in] textdata Whether the buffer is text (domstring) or binary data
* @param[in] buf The data buffer
* @param[in] len The data length */
void janus_dtls_notify_data(janus_dtls_srtp *dtls, char *label, gboolean textdata, char *buf, int len);
void janus_dtls_notify_sctp_data(janus_dtls_srtp *dtls, char *label, gboolean textdata, char *buf, int len);
#endif

/*! \brief DTLS retransmission timer
Expand Down
28 changes: 26 additions & 2 deletions ice.c
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ static janus_ice_queued_packet
janus_ice_add_candidates,
janus_ice_dtls_handshake,
janus_ice_hangup_peerconnection,
janus_ice_detach_handle;
janus_ice_detach_handle,
janus_ice_data_ready;

/* Janus NACKed packet we're tracking (to avoid duplicates) */
typedef struct janus_ice_nacked_packet {
Expand Down Expand Up @@ -478,7 +479,8 @@ static void janus_ice_free_queued_packet(janus_ice_queued_packet *pkt) {
pkt == &janus_ice_add_candidates ||
pkt == &janus_ice_dtls_handshake ||
pkt == &janus_ice_hangup_peerconnection ||
pkt == &janus_ice_detach_handle) {
pkt == &janus_ice_detach_handle ||
pkt == &janus_ice_data_ready) {
return;
}
g_free(pkt->data);
Expand Down Expand Up @@ -4100,6 +4102,14 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu
session->session_id, handle->handle_id, "detached",
plugin ? plugin->get_package() : NULL, handle->opaque_id);
return G_SOURCE_REMOVE;
} else if(pkt == &janus_ice_data_ready) {
/* Data is writable on this PeerConnection, notify the plugin */
janus_plugin *plugin = (janus_plugin *)handle->app;
if(plugin != NULL && plugin->data_ready != NULL && handle->app_handle != NULL) {
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Telling the plugin about the data channel being ready (%s)\n",
handle->handle_id, plugin ? plugin->get_name() : "??");
plugin->data_ready(handle->app_handle);
}
}
if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_READY)) {
janus_ice_free_queued_packet(pkt);
Expand Down Expand Up @@ -4740,6 +4750,20 @@ void janus_ice_relay_sctp(janus_ice_handle *handle, char *buffer, int length) {
#endif
}

void janus_ice_notify_data_ready(janus_ice_handle *handle) {
#ifdef HAVE_SCTP
if(!handle || handle->queued_packets == NULL)
return;
/* Queue this event */
#if GLIB_CHECK_VERSION(2, 46, 0)
g_async_queue_push_front(handle->queued_packets, &janus_ice_data_ready);
#else
g_async_queue_push(handle->queued_packets, &janus_ice_data_ready);
#endif
g_main_context_wakeup(handle->mainctx);
#endif
}

void janus_ice_dtls_handshake_done(janus_ice_handle *handle, janus_ice_component *component) {
if(!handle || !component)
return;
Expand Down
7 changes: 5 additions & 2 deletions ice.h
Original file line number Diff line number Diff line change
Expand Up @@ -630,13 +630,16 @@ void janus_ice_send_remb(janus_ice_handle *handle, uint32_t bitrate);
* @param[in] label The label of the data channel the message is from
* @param[in] textdata Whether the buffer is text (domstring) or binary data
* @param[in] buffer The message data (buffer)
* @param[in] length The buffer lenght */
* @param[in] length The buffer length */
void janus_ice_incoming_data(janus_ice_handle *handle, char *label, gboolean textdata, char *buffer, int length);
/*! \brief Core SCTP/DataChannel callback, called by the SCTP stack when when there's data to send.
* @param[in] handle The Janus ICE handle associated with the peer
* @param[in] buffer The message data (buffer)
* @param[in] length The buffer lenght */
* @param[in] length The buffer length */
void janus_ice_relay_sctp(janus_ice_handle *handle, char *buffer, int length);
/*! \brief Plugin SCTP/DataChannel callback, called by the SCTP stack when data can be written
* @param[in] handle The Janus ICE handle associated with the peer */
void janus_ice_notify_data_ready(janus_ice_handle *handle);
///@}


Expand Down
8 changes: 8 additions & 0 deletions plugins/duktape/echotest.js
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ function incomingBinaryData(id, buf, len) {
relayBinaryData(id, buf, len);
}

function dataReady(id) {
// This callback is invoked when the datachannel first becomes
// available (meaning you should never send data before it has been
// invoked at least once), but also when the datachannel is ready to
// receive more data (buffers are empty), which means it can be used
// to throttle outgoing data and not send too much at a time.
}

function resumeScheduler() {
// This is the function responsible for resuming coroutines associated
// with whatever is relevant to the JS script, e.g., for this script,
Expand Down
58 changes: 55 additions & 3 deletions plugins/janus_duktape.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@
* with \c incomingTextData() or \c incomingBinaryData
* though, the performance impact of directly processing and manipulating
* RTP an RTCP packets is probably too high, and so their usage is currently
* discouraged. As an additional note, JavaScript scripts can also decide to
* discouraged. The \c dataReady() callback can be used to figure out when
* data can be sent. As an additional note, JavaScript scripts can also decide to
* implement the functions that return information about the plugin itself,
* namely \c getVersion() \c getVersionString() \c getDescription()
* \c getName() \c getAuthor() and \c getPackage(). If not implemented,
Expand Down Expand Up @@ -217,6 +218,7 @@ void janus_duktape_setup_media(janus_plugin_session *handle);
void janus_duktape_incoming_rtp(janus_plugin_session *handle, janus_plugin_rtp *packet);
void janus_duktape_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rtcp *packet);
void janus_duktape_incoming_data(janus_plugin_session *handle, janus_plugin_data *packet);
void janus_duktape_data_ready(janus_plugin_session *handle);
void janus_duktape_slow_link(janus_plugin_session *handle, int uplink, int video);
void janus_duktape_hangup_media(janus_plugin_session *handle);
void janus_duktape_destroy_session(janus_plugin_session *handle, int *error);
Expand All @@ -243,6 +245,7 @@ static janus_plugin janus_duktape_plugin =
.incoming_rtp = janus_duktape_incoming_rtp,
.incoming_rtcp = janus_duktape_incoming_rtcp,
.incoming_data = janus_duktape_incoming_data,
.data_ready = janus_duktape_data_ready,
.slow_link = janus_duktape_slow_link,
.hangup_media = janus_duktape_hangup_media,
.destroy_session = janus_duktape_destroy_session,
Expand Down Expand Up @@ -289,6 +292,7 @@ static gboolean has_incoming_rtcp = FALSE;
static gboolean has_incoming_data_legacy = FALSE, /* Legacy callback */
has_incoming_text_data = FALSE,
has_incoming_binary_data = FALSE;
static gboolean has_data_ready = FALSE;
static gboolean has_slow_link = FALSE;
/* JavaScript C scheduler (for coroutines) */
static GThread *scheduler_thread = NULL;
Expand Down Expand Up @@ -1069,6 +1073,11 @@ static duk_ret_t janus_duktape_method_relaytextdata(duk_context *ctx) {
}
janus_refcount_increase(&session->ref);
janus_mutex_unlock(&duktape_sessions_mutex);
if(!g_atomic_int_get(&session->dataready)) {
janus_refcount_decrease(&session->ref);
duk_push_error_object(ctx, DUK_ERR_ERROR, "Datachannel not ready yet for session %"SCNu32, id);
return duk_throw(ctx);
}
/* Send the data */
janus_plugin_data data = { .label = NULL, .binary = FALSE, .buffer = (char *)payload, .length = len };
janus_core->relay_data(session->handle, &data);
Expand Down Expand Up @@ -1112,6 +1121,11 @@ static duk_ret_t janus_duktape_method_relaybinarydata(duk_context *ctx) {
}
janus_refcount_increase(&session->ref);
janus_mutex_unlock(&duktape_sessions_mutex);
if(!g_atomic_int_get(&session->dataready)) {
janus_refcount_decrease(&session->ref);
duk_push_error_object(ctx, DUK_ERR_ERROR, "Datachannel not ready yet for session %"SCNu32, id);
return duk_throw(ctx);
}
janus_plugin_data data = { .label = NULL, .binary = TRUE, .buffer = (char *)payload, .length = len };
janus_core->relay_data(session->handle, &data);
janus_refcount_decrease(&session->ref);
Expand Down Expand Up @@ -1467,6 +1481,9 @@ int janus_duktape_init(janus_callbacks *callback, const char *config_path) {
duk_get_global_string(duktape_ctx, "incomingBinaryData");
if(duk_is_function(duktape_ctx, duk_get_top(duktape_ctx)-1) != 0)
has_incoming_binary_data = TRUE;
duk_get_global_string(duktape_ctx, "dataReady");
if(duk_is_function(duktape_ctx, duk_get_top(duktape_ctx)-1) != 0)
has_data_ready = TRUE;
duk_get_global_string(duktape_ctx, "slowLink");
if(duk_is_function(duktape_ctx, duk_get_top(duktape_ctx)-1) != 0)
has_slow_link = TRUE;
Expand Down Expand Up @@ -2281,6 +2298,39 @@ void janus_duktape_incoming_data(janus_plugin_session *handle, janus_plugin_data
janus_mutex_unlock_nodebug(&session->recipients_mutex);
}

void janus_duktape_data_ready(janus_plugin_session *handle) {
if(handle == NULL || handle->stopped || g_atomic_int_get(&duktape_stopping) || !g_atomic_int_get(&duktape_initialized))
return;
janus_duktape_session *session = (janus_duktape_session *)handle->plugin_handle;
if(!session) {
JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");
return;
}
if(g_atomic_int_get(&session->destroyed) || g_atomic_int_get(&session->hangingup))
return;
if(g_atomic_int_compare_and_exchange(&session->dataready, 0, 1)) {
JANUS_LOG(LOG_INFO, "[%s-%p] Data channel available\n", JANUS_DUKTAPE_PACKAGE, handle);
}
/* Check if the JS script wants to receive this event */
if(has_data_ready) {
/* Yep, pass the event to the JS script and return */
janus_mutex_lock(&duktape_mutex);
duk_idx_t thr_idx = duk_push_thread(duktape_ctx);
duk_context *t = duk_get_context(duktape_ctx, thr_idx);
duk_get_global_string(t, "dataReady");
duk_push_number(t, session->id);
int res = duk_pcall(t, 1);
if(res != DUK_EXEC_SUCCESS) {
/* Something went wrong... */
JANUS_LOG(LOG_ERR, "Duktape error: %s\n", duk_safe_to_string(t, -1));
}
duk_pop(t);
duk_pop(duktape_ctx);
janus_mutex_unlock(&duktape_mutex);
return;
}
}

void janus_duktape_slow_link(janus_plugin_session *handle, int uplink, int video) {
if(handle == NULL || handle->stopped || g_atomic_int_get(&duktape_stopping) || !g_atomic_int_get(&duktape_initialized))
return;
Expand Down Expand Up @@ -2334,11 +2384,12 @@ void janus_duktape_hangup_media(janus_plugin_session *handle) {
janus_refcount_decrease(&session->ref);
return;
}
if(g_atomic_int_add(&session->hangingup, 1)) {
if(!g_atomic_int_compare_and_exchange(&session->hangingup, 0, 1)) {
janus_refcount_decrease(&session->ref);
return;
}
g_atomic_int_set(&session->started, 0);
g_atomic_int_set(&session->dataready, 0);

/* Reset the media properties */
session->accept_audio = FALSE;
Expand Down Expand Up @@ -2419,7 +2470,8 @@ static void janus_duktape_relay_data_packet(gpointer data, gpointer user_data) {
return;
}
janus_duktape_session *session = (janus_duktape_session *)data;
if(!session || !session->handle || !g_atomic_int_get(&session->started) || !session->accept_data) {
if(!session || !session->handle || !g_atomic_int_get(&session->started) ||
!session->accept_data || !g_atomic_int_get(&session->dataready)) {
return;
}
if(janus_core != NULL) {
Expand Down
1 change: 1 addition & 0 deletions plugins/janus_duktape_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ typedef struct janus_duktape_session {
janus_recorder *drc; /* The Janus recorder instance for data, if enabled */
janus_mutex rec_mutex; /* Mutex to protect the recorders from race conditions */
volatile gint started; /* Whether this session's PeerConnection is ready or not */
volatile gint dataready; /* Whether the data channel was established on this sessions's PeerConnection */
volatile gint hangingup; /* Whether this session's PeerConnection is hanging up */
volatile gint destroyed; /* Whether this session's been marked as destroyed */
/* If you need any additional property (e.g., for hooks you added in janus_duktape_extra.c) add them below this line */
Expand Down
9 changes: 9 additions & 0 deletions plugins/janus_echotest.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ void janus_echotest_setup_media(janus_plugin_session *handle);
void janus_echotest_incoming_rtp(janus_plugin_session *handle, janus_plugin_rtp *packet);
void janus_echotest_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rtcp *packet);
void janus_echotest_incoming_data(janus_plugin_session *handle, janus_plugin_data *packet);
void janus_echotest_data_ready(janus_plugin_session *handle);
void janus_echotest_slow_link(janus_plugin_session *handle, int uplink, int video);
void janus_echotest_hangup_media(janus_plugin_session *handle);
void janus_echotest_destroy_session(janus_plugin_session *handle, int *error);
Expand All @@ -171,6 +172,7 @@ static janus_plugin janus_echotest_plugin =
.incoming_rtp = janus_echotest_incoming_rtp,
.incoming_rtcp = janus_echotest_incoming_rtcp,
.incoming_data = janus_echotest_incoming_data,
.data_ready = janus_echotest_data_ready,
.slow_link = janus_echotest_slow_link,
.hangup_media = janus_echotest_hangup_media,
.destroy_session = janus_echotest_destroy_session,
Expand Down Expand Up @@ -684,6 +686,13 @@ void janus_echotest_incoming_data(janus_plugin_session *handle, janus_plugin_dat
}
}

void janus_echotest_data_ready(janus_plugin_session *handle) {
if(handle == NULL || g_atomic_int_get(&handle->stopped) ||
g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized) || !gateway)
return;
/* Data channels are writable */
}

void janus_echotest_slow_link(janus_plugin_session *handle, int uplink, int video) {
/* The core is informing us that our peer got or sent too many NACKs, are we pushing media too hard? */
if(handle == NULL || g_atomic_int_get(&handle->stopped) || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
Expand Down
Loading