Skip to content

Commit

Permalink
Added support for data channel subprotocol (#2157)
Browse files Browse the repository at this point in the history
  • Loading branch information
lminiero committed May 19, 2020
1 parent c4a1e4c commit 9fe2894
Show file tree
Hide file tree
Showing 14 changed files with 145 additions and 57 deletions.
8 changes: 4 additions & 4 deletions dtls.c
Original file line number Diff line number Diff line change
Expand Up @@ -1052,10 +1052,10 @@ void janus_dtls_sctp_data_ready(janus_dtls_srtp *dtls) {
janus_ice_notify_data_ready(handle);
}

void janus_dtls_wrap_sctp_data(janus_dtls_srtp *dtls, char *label, gboolean textdata, char *buf, int len) {
void janus_dtls_wrap_sctp_data(janus_dtls_srtp *dtls, char *label, char *protocol, gboolean textdata, char *buf, int len) {
if(dtls == NULL || !dtls->ready || dtls->sctp == NULL || buf == NULL || len < 1)
return;
janus_sctp_send_data(dtls->sctp, label, textdata, buf, len);
janus_sctp_send_data(dtls->sctp, label, protocol, textdata, buf, len);
}

int janus_dtls_send_sctp_data(janus_dtls_srtp *dtls, char *buf, int len) {
Expand All @@ -1069,7 +1069,7 @@ int janus_dtls_send_sctp_data(janus_dtls_srtp *dtls, char *buf, int len) {
return res;
}

void janus_dtls_notify_sctp_data(janus_dtls_srtp *dtls, char *label, gboolean textdata, char *buf, int len) {
void janus_dtls_notify_sctp_data(janus_dtls_srtp *dtls, char *label, char *protocol, gboolean textdata, char *buf, int len) {
if(dtls == NULL || buf == NULL || len < 1)
return;
janus_ice_component *component = (janus_ice_component *)dtls->component;
Expand All @@ -1087,7 +1087,7 @@ void janus_dtls_notify_sctp_data(janus_dtls_srtp *dtls, char *label, gboolean te
JANUS_LOG(LOG_ERR, "No handle...\n");
return;
}
janus_ice_incoming_data(handle, label, textdata, buf, len);
janus_ice_incoming_data(handle, label, protocol, textdata, buf, len);
}
#endif

Expand Down
6 changes: 4 additions & 2 deletions dtls.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,11 @@ void janus_dtls_sctp_data_ready(janus_dtls_srtp *dtls);
/*! \brief Callback (called from the ICE handle) to encapsulate in DTLS outgoing SCTP data (DataChannel)
* @param[in] dtls The janus_dtls_srtp instance to use
* @param[in] label The label of the data channel to use
* @param[in] protocol The protocol of the data channel to use
* @param[in] textdata Whether the buffer is text (domstring) or binary data
* @param[in] buf The data buffer to encapsulate
* @param[in] len The data length */
void janus_dtls_wrap_sctp_data(janus_dtls_srtp *dtls, char *label, gboolean textdata, char *buf, int len);
void janus_dtls_wrap_sctp_data(janus_dtls_srtp *dtls, char *label, char *protocol, gboolean textdata, char *buf, int len);

/*! \brief Callback (called from the SCTP stack) to encapsulate in DTLS outgoing SCTP data (DataChannel)
* @param[in] dtls The janus_dtls_srtp instance to use
Expand All @@ -169,10 +170,11 @@ int janus_dtls_send_sctp_data(janus_dtls_srtp *dtls, char *buf, int len);
/*! \brief Callback to be notified about incoming SCTP data (DataChannel) to forward to the handle
* @param[in] dtls The janus_dtls_srtp instance to use
* @param[in] label The label of the data channel the message is from
* @param[in] protocol The protocol of the data channel the message is from
* @param[in] textdata Whether the buffer is text (domstring) or binary data
* @param[in] buf The data buffer
* @param[in] len The data length */
void janus_dtls_notify_sctp_data(janus_dtls_srtp *dtls, char *label, gboolean textdata, char *buf, int len);
void janus_dtls_notify_sctp_data(janus_dtls_srtp *dtls, char *label, char *protocol, gboolean textdata, char *buf, int len);
#endif

/*! \brief DTLS retransmission timer
Expand Down
18 changes: 11 additions & 7 deletions html/janus.js
Original file line number Diff line number Diff line change
Expand Up @@ -1420,7 +1420,7 @@ function Janus(gatewayCallbacks) {
}

// Private method to create a data channel
function createDataChannel(handleId, dclabel, incoming, pendingData) {
function createDataChannel(handleId, dclabel, dcprotocol, incoming, pendingData) {
var pluginHandle = pluginHandles[handleId];
if(!pluginHandle || !pluginHandle.webrtcStuff) {
Janus.warn("Invalid handle");
Expand All @@ -1435,6 +1435,7 @@ function Janus(gatewayCallbacks) {
var onDataChannelStateChange = function(event) {
Janus.log('Received state change on data channel:', event);
var label = event.target.label;
var protocol = event.target.protocol;
var dcState = config.dataChannel[label] ? config.dataChannel[label].readyState : "null";
Janus.log('State change on <' + label + '> data channel: ' + dcState);
if(dcState === 'open') {
Expand All @@ -1449,7 +1450,7 @@ function Janus(gatewayCallbacks) {
config.dataChannel[label].pending = [];
}
// Notify the open data channel
pluginHandle.ondataopen(label);
pluginHandle.ondataopen(label, protocol);
}
};
var onDataChannelError = function(error) {
Expand All @@ -1458,7 +1459,10 @@ function Janus(gatewayCallbacks) {
};
if(!incoming) {
// FIXME Add options (ordered, maxRetransmits, etc.)
config.dataChannel[dclabel] = config.pc.createDataChannel(dclabel, {ordered: true});
var dcoptions = { ordered: true };
if(dcprotocol)
dcoptions.protocol = dcprotocol;
config.dataChannel[dclabel] = config.pc.createDataChannel(dclabel, dcoptions);
} else {
// The channel was created by Janus
config.dataChannel[dclabel] = incoming;
Expand Down Expand Up @@ -1493,7 +1497,7 @@ function Janus(gatewayCallbacks) {
var label = callbacks.label ? callbacks.label : Janus.dataChanDefaultLabel;
if(!config.dataChannel[label]) {
// Create new data channel and wait for it to open
createDataChannel(handleId, label, false, data);
createDataChannel(handleId, label, callbacks.protocol, false, data, callbacks.protocol);
callbacks.success();
return;
}
Expand Down Expand Up @@ -1839,11 +1843,11 @@ function Janus(gatewayCallbacks) {
}
// Any data channel to create?
if(isDataEnabled(media) && !config.dataChannel[Janus.dataChanDefaultLabel]) {
Janus.log("Creating data channel");
createDataChannel(handleId, Janus.dataChanDefaultLabel, false);
Janus.log("Creating default data channel");
createDataChannel(handleId, Janus.dataChanDefaultLabel, null, false);
config.pc.ondatachannel = function(event) {
Janus.log("Data channel created by Janus:", event);
createDataChannel(handleId, event.channel.label, event.channel);
createDataChannel(handleId, event.channel.label, event.channel.protocol, event.channel);
};
}
// If there's a new local stream, let's notify the application
Expand Down
12 changes: 10 additions & 2 deletions ice.c
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ uint16_t rtp_range_max = 0;
typedef struct janus_ice_queued_packet {
char *data;
char *label;
char *protocol;
gint length;
gint type;
gboolean control;
Expand Down Expand Up @@ -487,6 +488,7 @@ static void janus_ice_free_queued_packet(janus_ice_queued_packet *pkt) {
}
g_free(pkt->data);
g_free(pkt->label);
g_free(pkt->protocol);
g_free(pkt);
}

Expand Down Expand Up @@ -2921,6 +2923,7 @@ static void janus_ice_cb_nice_recv(NiceAgent *agent, guint stream_id, guint comp
pkt->control = FALSE;
pkt->retransmission = TRUE;
pkt->label = NULL;
pkt->protocol = NULL;
pkt->added = janus_get_monotonic_time();
/* What to send and how depends on whether we're doing RFC4588 or not */
if(!video || !janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_RFC4588_RTX)) {
Expand Down Expand Up @@ -3005,7 +3008,7 @@ static void janus_ice_cb_nice_recv(NiceAgent *agent, guint stream_id, guint comp
}
}

void janus_ice_incoming_data(janus_ice_handle *handle, char *label, gboolean textdata, char *buffer, int length) {
void janus_ice_incoming_data(janus_ice_handle *handle, char *label, char *protocol, gboolean textdata, char *buffer, int length) {
if(handle == NULL || buffer == NULL || length <= 0)
return;
janus_plugin_data data = { .label = label, .binary = !textdata, .buffer = buffer, .length = length };
Expand Down Expand Up @@ -4498,7 +4501,8 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu
}
component->noerrorlog = FALSE;
/* TODO Support binary data */
janus_dtls_wrap_sctp_data(component->dtls, pkt->label, pkt->type == JANUS_ICE_PACKET_TEXT, pkt->data, pkt->length);
janus_dtls_wrap_sctp_data(component->dtls, pkt->label, pkt->protocol,
pkt->type == JANUS_ICE_PACKET_TEXT, pkt->data, pkt->length);
#endif
} else if(pkt->type == JANUS_ICE_PACKET_SCTP) {
/* SCTP data to push */
Expand Down Expand Up @@ -4649,6 +4653,7 @@ void janus_ice_relay_rtp(janus_ice_handle *handle, janus_plugin_rtp *packet) {
pkt->encrypted = FALSE;
pkt->retransmission = FALSE;
pkt->label = NULL;
pkt->protocol = NULL;
pkt->added = janus_get_monotonic_time();
janus_ice_queue_packet(handle, pkt);
/* Restore the extension flag to what the plugin set it to */
Expand Down Expand Up @@ -4692,6 +4697,7 @@ void janus_ice_relay_rtcp_internal(janus_ice_handle *handle, janus_plugin_rtcp *
pkt->encrypted = FALSE;
pkt->retransmission = FALSE;
pkt->label = NULL;
pkt->protocol = NULL;
pkt->added = janus_get_monotonic_time();
janus_ice_queue_packet(handle, pkt);
if(rtcp_buf != packet->buffer) {
Expand Down Expand Up @@ -4757,6 +4763,7 @@ void janus_ice_relay_data(janus_ice_handle *handle, janus_plugin_data *packet) {
pkt->encrypted = FALSE;
pkt->retransmission = FALSE;
pkt->label = packet->label ? g_strdup(packet->label) : NULL;
pkt->protocol = packet->protocol ? g_strdup(packet->protocol) : NULL;
pkt->added = janus_get_monotonic_time();
janus_ice_queue_packet(handle, pkt);
}
Expand All @@ -4776,6 +4783,7 @@ void janus_ice_relay_sctp(janus_ice_handle *handle, char *buffer, int length) {
pkt->encrypted = FALSE;
pkt->retransmission = FALSE;
pkt->label = NULL;
pkt->protocol = NULL;
pkt->added = janus_get_monotonic_time();
janus_ice_queue_packet(handle, pkt);
#endif
Expand Down
3 changes: 2 additions & 1 deletion ice.h
Original file line number Diff line number Diff line change
Expand Up @@ -632,10 +632,11 @@ void janus_ice_send_remb(janus_ice_handle *handle, uint32_t bitrate);
/*! \brief Plugin SCTP/DataChannel callback, called by the SCTP stack when when there's data for a plugin
* @param[in] handle The Janus ICE handle associated with the peer
* @param[in] label The label of the data channel the message is from
* @param[in] protocol The protocol of the data channel to use
* @param[in] textdata Whether the buffer is text (domstring) or binary data
* @param[in] buffer The message data (buffer)
* @param[in] length The buffer length */
void janus_ice_incoming_data(janus_ice_handle *handle, char *label, gboolean textdata, char *buffer, int length);
void janus_ice_incoming_data(janus_ice_handle *handle, char *label, char *protocol, gboolean textdata, char *buffer, int length);
/*! \brief Core SCTP/DataChannel callback, called by the SCTP stack when when there's data to send.
* @param[in] handle The Janus ICE handle associated with the peer
* @param[in] buffer The message data (buffer)
Expand Down
25 changes: 21 additions & 4 deletions plugins/janus_duktape.c
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,13 @@ static duk_ret_t janus_duktape_method_relaytextdata(duk_context *ctx) {
return duk_throw(ctx);
}
/* Send the data */
janus_plugin_data data = { .label = NULL, .binary = FALSE, .buffer = (char *)payload, .length = len };
janus_plugin_data data = {
.label = NULL,
.protocol = NULL,
.binary = TRUE,
.buffer = (char *)payload,
.length = len
};
janus_core->relay_data(session->handle, &data);
janus_refcount_decrease(&session->ref);
duk_push_int(ctx, 0);
Expand Down Expand Up @@ -1126,7 +1132,13 @@ static duk_ret_t janus_duktape_method_relaybinarydata(duk_context *ctx) {
duk_push_error_object(ctx, DUK_ERR_ERROR, "Datachannel not ready yet for session %"SCNu32, id);
return duk_throw(ctx);
}
janus_plugin_data data = { .label = NULL, .binary = TRUE, .buffer = (char *)payload, .length = len };
janus_plugin_data data = {
.label = NULL,
.protocol = NULL,
.binary = TRUE,
.buffer = (char *)payload,
.length = len
};
janus_core->relay_data(session->handle, &data);
janus_refcount_decrease(&session->ref);
duk_push_int(ctx, 0);
Expand Down Expand Up @@ -2488,8 +2500,13 @@ static void janus_duktape_relay_data_packet(gpointer data, gpointer user_data) {
if(janus_core != NULL) {
JANUS_LOG(LOG_VERB, "Forwarding %s DataChannel message (%d bytes) to session %"SCNu32"\n",
packet->textdata ? "text" : "binary", packet->length, session->id);
janus_plugin_data data = { .label = NULL, .binary = !packet->textdata,
.buffer = (char *)packet->data, .length = packet->length };
janus_plugin_data data = {
.label = NULL,
.protocol = NULL,
.binary = !packet->textdata,
.buffer = (char *)packet->data,
.length = packet->length
};
janus_core->relay_data(session->handle, &data);
}
return;
Expand Down
25 changes: 21 additions & 4 deletions plugins/janus_lua.c
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,13 @@ static int janus_lua_method_relaytextdata(lua_State *s) {
return 1;
}
/* Send the data */
janus_plugin_data data = { .label = NULL, .binary = FALSE, .buffer = (char *)payload, .length = len };
janus_plugin_data data = {
.label = NULL,
.protocol = NULL,
.binary = FALSE,
.buffer = (char *)payload,
.length = len
};
janus_core->relay_data(session->handle, &data);
janus_refcount_decrease(&session->ref);
lua_pushnumber(s, 0);
Expand Down Expand Up @@ -996,7 +1002,13 @@ static int janus_lua_method_relaybinarydata(lua_State *s) {
return 1;
}
/* Send the data */
janus_plugin_data data = { .label = NULL, .binary = TRUE, .buffer = (char *)payload, .length = len };
janus_plugin_data data = {
.label = NULL,
.protocol = NULL,
.binary = TRUE,
.buffer = (char *)payload,
.length = len
};
janus_core->relay_data(session->handle, &data);
janus_refcount_decrease(&session->ref);
lua_pushnumber(s, 0);
Expand Down Expand Up @@ -2158,8 +2170,13 @@ static void janus_lua_relay_data_packet(gpointer data, gpointer user_data) {
if(janus_core != NULL) {
JANUS_LOG(LOG_VERB, "Forwarding %s DataChannel message (%d bytes) to session %"SCNu32"\n",
packet->textdata ? "text" : "binary", packet->length, session->id);
janus_plugin_data data = { .label = NULL, .binary = !packet->textdata,
.buffer = (char *)packet->data, .length = packet->length };
janus_plugin_data data = {
.label = NULL,
.protocol = NULL,
.binary = !packet->textdata,
.buffer = (char *)packet->data,
.length = packet->length
};
janus_core->relay_data(session->handle, &data);
}
return;
Expand Down
9 changes: 7 additions & 2 deletions plugins/janus_streaming.c
Original file line number Diff line number Diff line change
Expand Up @@ -8217,8 +8217,13 @@ static void janus_streaming_relay_rtp_packet(gpointer data, gpointer user_data)
if(!session->data)
return;
if(gateway != NULL && packet->data != NULL && g_atomic_int_get(&session->dataready)) {
janus_plugin_data data = { .label = NULL, .binary = !packet->textdata,
.buffer = (char *)packet->data, .length = packet->length };
janus_plugin_data data = {
.label = NULL,
.protocol = NULL,
.binary = !packet->textdata,
.buffer = (char *)packet->data,
.length = packet->length
};
gateway->relay_data(session->handle, &data);
}
}
Expand Down
18 changes: 9 additions & 9 deletions plugins/janus_textroom.c
Original file line number Diff line number Diff line change
Expand Up @@ -1524,7 +1524,7 @@ janus_plugin_result *janus_textroom_handle_incoming_request(janus_plugin_session
janus_textroom_participant *top = g_hash_table_lookup(textroom->participants, to);
if(top) {
janus_refcount_increase(&top->ref);
janus_plugin_data data = { .label = NULL, .binary = FALSE, .buffer = msg_text, .length = strlen(msg_text) };
janus_plugin_data data = { .label = NULL, .protocol = NULL, .binary = FALSE, .buffer = msg_text, .length = strlen(msg_text) };
gateway->relay_data(top->session->handle, &data);
janus_refcount_decrease(&top->ref);
json_object_set_new(sent, to, json_true());
Expand All @@ -1544,7 +1544,7 @@ janus_plugin_result *janus_textroom_handle_incoming_request(janus_plugin_session
janus_textroom_participant *top = g_hash_table_lookup(textroom->participants, to);
if(top) {
janus_refcount_increase(&top->ref);
janus_plugin_data data = { .label = NULL, .binary = FALSE, .buffer = msg_text, .length = strlen(msg_text) };
janus_plugin_data data = { .label = NULL, .protocol = NULL, .binary = FALSE, .buffer = msg_text, .length = strlen(msg_text) };
gateway->relay_data(top->session->handle, &data);
janus_refcount_decrease(&top->ref);
json_object_set_new(sent, to, json_true());
Expand All @@ -1565,7 +1565,7 @@ janus_plugin_result *janus_textroom_handle_incoming_request(janus_plugin_session
janus_textroom_participant *top = value;
JANUS_LOG(LOG_VERB, " >> To %s in %s: %s\n", top->username, room_id_str, message);
janus_refcount_increase(&top->ref);
janus_plugin_data data = { .label = NULL, .binary = FALSE, .buffer = msg_text, .length = strlen(msg_text) };
janus_plugin_data data = { .label = NULL, .protocol = NULL, .binary = FALSE, .buffer = msg_text, .length = strlen(msg_text) };
gateway->relay_data(top->session->handle, &data);
janus_refcount_decrease(&top->ref);
}
Expand Down Expand Up @@ -1714,7 +1714,7 @@ janus_plugin_result *janus_textroom_handle_incoming_request(janus_plugin_session
json_object_set_new(event, "display", json_string(display_text));
char *event_text = json_dumps(event, json_format);
json_decref(event);
janus_plugin_data data = { .label = NULL, .binary = FALSE, .buffer = event_text, .length = strlen(event_text) };
janus_plugin_data data = { .label = NULL, .protocol = NULL, .binary = FALSE, .buffer = event_text, .length = strlen(event_text) };
gateway->relay_data(handle, &data);
/* Broadcast */
GHashTableIter iter;
Expand Down Expand Up @@ -1818,7 +1818,7 @@ janus_plugin_result *janus_textroom_handle_incoming_request(janus_plugin_session
json_object_set_new(event, "username", json_string(participant->username));
char *event_text = json_dumps(event, json_format);
json_decref(event);
janus_plugin_data data = { .label = NULL, .binary = FALSE, .buffer = event_text, .length = strlen(event_text) };
janus_plugin_data data = { .label = NULL, .protocol = NULL, .binary = FALSE, .buffer = event_text, .length = strlen(event_text) };
gateway->relay_data(handle, &data);
/* Broadcast */
GHashTableIter iter;
Expand Down Expand Up @@ -2140,7 +2140,7 @@ janus_plugin_result *janus_textroom_handle_incoming_request(janus_plugin_session
while(g_hash_table_iter_next(&iter, NULL, &value)) {
janus_textroom_participant *top = value;
JANUS_LOG(LOG_VERB, " >> To %s in %s\n", top->username, room_id_str);
janus_plugin_data data = { .label = NULL, .binary = FALSE, .buffer = event_text, .length = strlen(event_text) };
janus_plugin_data data = { .label = NULL, .protocol = NULL, .binary = FALSE, .buffer = event_text, .length = strlen(event_text) };
gateway->relay_data(top->session->handle, &data);
}
free(event_text);
Expand Down Expand Up @@ -2242,7 +2242,7 @@ janus_plugin_result *janus_textroom_handle_incoming_request(janus_plugin_session
janus_textroom_participant *top = value;
JANUS_LOG(LOG_VERB, " >> To %s in %s: %s\n", top->username, room_id_str, message);
janus_refcount_increase(&top->ref);
janus_plugin_data data = { .label = NULL, .binary = FALSE, .buffer = msg_text, .length = strlen(msg_text) };
janus_plugin_data data = { .label = NULL, .protocol = NULL, .binary = FALSE, .buffer = msg_text, .length = strlen(msg_text) };
gateway->relay_data(top->session->handle, &data);
janus_refcount_decrease(&top->ref);
}
Expand Down Expand Up @@ -2738,7 +2738,7 @@ janus_plugin_result *janus_textroom_handle_incoming_request(janus_plugin_session
json_object_set_new(event, "room", string_ids ? json_string(textroom->room_id_str) : json_integer(textroom->room_id));
char *event_text = json_dumps(event, json_format);
json_decref(event);
janus_plugin_data data = { .label = NULL, .binary = FALSE, .buffer = event_text, .length = strlen(event_text) };
janus_plugin_data data = { .label = NULL, .protocol = NULL, .binary = FALSE, .buffer = event_text, .length = strlen(event_text) };
gateway->relay_data(handle, &data);
/* Broadcast */
GHashTableIter iter;
Expand Down Expand Up @@ -2803,7 +2803,7 @@ janus_plugin_result *janus_textroom_handle_incoming_request(janus_plugin_session
/* Reply via data channels */
char *reply_text = json_dumps(reply, json_format);
json_decref(reply);
janus_plugin_data data = { .label = NULL, .binary = FALSE, .buffer = reply_text, .length = strlen(reply_text) };
janus_plugin_data data = { .label = NULL, .protocol = NULL, .binary = FALSE, .buffer = reply_text, .length = strlen(reply_text) };
gateway->relay_data(handle, &data);
free(reply_text);
} else {
Expand Down
Loading

0 comments on commit 9fe2894

Please sign in to comment.