Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send more packets together with nice_agent_send_messages_nonblocking #2295

Closed
wants to merge 10 commits into from
19 changes: 19 additions & 0 deletions conf/janus.jcfg.sample.in
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,25 @@ general: {
# As such, if you want to use this you should
# provision the correct value according to the
# available resources (e.g., CPUs available).
#disable_sendmmsg = true # By default, Janus tries to take advantage of
# sendmmsg to send multiple packets at the same
# time and optimize media delivery, by reducing
# the number of system calls. If you want to
# disable this behaviour and make it work more
# like Janus did previously, by sending one
# packet at the time, uncomment this property.
#allow_loop_indication = true # In case a static number of event loops is
# configured as explained above, by default
# new handles will be allocated on one loop or
# another by the Janus core itself. In some cases
# it may be helpful to manually tell the Janus
# core which loop a handle should be added to,
# e.g., to group viewers of the same stream on
# the same loop. This is possible via the Janus
# API when performing the 'attach' request, but
# only if allow_loop_indication is set to true;
# it's set to false by default to avoid abuses.
# Don't change if you don't know what you're doing!
#opaqueid_in_api = true # Opaque IDs set by applications are typically
# only passed to event handlers for correlation
# purposes, but not sent back to the user or
Expand Down
3 changes: 2 additions & 1 deletion html/janus.js
Original file line number Diff line number Diff line change
Expand Up @@ -1173,9 +1173,10 @@ function Janus(gatewayCallbacks) {
return;
}
var opaqueId = callbacks.opaqueId;
var loopIndex = callbacks.loopIndex;
var handleToken = callbacks.token ? callbacks.token : token;
var transaction = Janus.randomString(12);
var request = { "janus": "attach", "plugin": plugin, "opaque_id": opaqueId, "transaction": transaction };
var request = { "janus": "attach", "plugin": plugin, "opaque_id": opaqueId, "loop_index": loopIndex, "transaction": transaction };
if(handleToken)
request["token"] = handleToken;
if(apisecret)
Expand Down
196 changes: 165 additions & 31 deletions ice.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ gboolean janus_is_opaqueid_in_api_enabled(void) {
return opaqueid_in_api;
}

/* Since #2295, we support sendmmsg to send multiple packets at the same time
* and optimize media delivery; this can disabled at startup, if needed */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can disabled at startup

be seems to be missing before disabled.

static gboolean use_sendmmsg = TRUE;
void janus_disable_sendmmsg(void) {
JANUS_LOG(LOG_WARN, "Disabling sendmmsg mode, will send one packet at a time\n");
use_sendmmsg = FALSE;
}
gboolean janus_is_sendmmsg_enabled(void) {
return use_sendmmsg;
}

/* Only needed in case we're using static event loops spawned at startup (disabled by default) */
typedef struct janus_ice_static_event_loop {
int id;
Expand All @@ -155,6 +166,7 @@ typedef struct janus_ice_static_event_loop {
GThread *thread;
} janus_ice_static_event_loop;
static int static_event_loops = 0;
static gboolean allow_loop_indication = FALSE;
static GSList *event_loops = NULL, *current_loop = NULL;
static janus_mutex event_loops_mutex = JANUS_MUTEX_INITIALIZER;
static void *janus_ice_static_event_loop_thread(void *data) {
Expand All @@ -176,7 +188,7 @@ static void *janus_ice_static_event_loop_thread(void *data) {
int janus_ice_get_static_event_loops(void) {
return static_event_loops;
}
void janus_ice_set_static_event_loops(int loops) {
void janus_ice_set_static_event_loops(int loops, gboolean allow_api) {
if(loops == 0)
return;
else if(loops < 1) {
Expand Down Expand Up @@ -209,6 +221,9 @@ void janus_ice_set_static_event_loops(int loops) {
}
current_loop = event_loops;
JANUS_LOG(LOG_INFO, "Spawned %d static event loops (handles won't have a dedicated loop)\n", static_event_loops);
allow_loop_indication = allow_api;
JANUS_LOG(LOG_INFO, " -- Janus API %s be able to drive the loop choice for new handles\n",
allow_loop_indication ? "will" : "will NOT");
return;
}
void janus_ice_stop_static_event_loops(void) {
Expand Down Expand Up @@ -352,6 +367,9 @@ typedef struct janus_ice_queued_packet {
gboolean retransmission;
gboolean encrypted;
gint64 added;
/*! \brief Reference counter for this instance */
volatile int destroyed;
janus_refcount ref;
} janus_ice_queued_packet;
/* A few static, fake, messages we use as a trigger: e.g., to start a
* new DTLS handshake, hangup a PeerConnection or close a handle */
Expand Down Expand Up @@ -445,6 +463,13 @@ static GSource *janus_ice_outgoing_traffic_create(janus_ice_handle *handle, GDes
return source;
}

/* Helper method to send or prepare to send a packet: since we use
* nice_agent_send_messages_nonblocking, we may decide not so send a
* packet right away if there are messages waiting, and send them all
* together to take advantage of sendmmsg for performance reasons */
static gboolean janus_ice_send_or_store(janus_ice_handle *handle,
janus_ice_component *component, janus_ice_queued_packet *pkt);

/* Time, in seconds, that should pass with no media (audio or video) being
* received before Janus notifies you about this with a receiving=false */
#define DEFAULT_NO_MEDIA_TIMER 1
Expand Down Expand Up @@ -522,6 +547,12 @@ static void janus_ice_free_queued_packet(janus_ice_queued_packet *pkt) {
pkt == &janus_ice_data_ready) {
return;
}
if(!g_atomic_int_compare_and_exchange(&pkt->destroyed, 0, 1))
return;
janus_refcount_decrease(&pkt->ref);
}
static void janus_ice_queued_packet_free(const janus_refcount *pkt_ref) {
janus_ice_queued_packet *pkt = janus_refcount_containerof(pkt_ref, janus_ice_queued_packet, ref);
g_free(pkt->data);
g_free(pkt->label);
g_free(pkt->protocol);
Expand Down Expand Up @@ -1250,7 +1281,7 @@ janus_ice_handle *janus_ice_handle_create(void *core_session, const char *opaque
return handle;
}

gint janus_ice_handle_attach_plugin(void *core_session, janus_ice_handle *handle, janus_plugin *plugin) {
gint janus_ice_handle_attach_plugin(void *core_session, janus_ice_handle *handle, janus_plugin *plugin, int loop_index) {
if(core_session == NULL)
return JANUS_ERROR_SESSION_NOT_FOUND;
janus_session *session = (janus_session *)core_session;
Expand Down Expand Up @@ -1289,14 +1320,34 @@ gint janus_ice_handle_attach_plugin(void *core_session, janus_ice_handle *handle
handle->mainloop = g_main_loop_new(handle->mainctx, FALSE);
} else {
/* We're actually using static event loops, pick one from the list */
if(!allow_loop_indication && loop_index > -1) {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Manual allocation of event loops forbidden, ignoring provided loop index %d\n", handle->handle_id, loop_index);
}
janus_refcount_increase(&handle->ref);
janus_mutex_lock(&event_loops_mutex);
janus_ice_static_event_loop *loop = (janus_ice_static_event_loop *)current_loop->data;
handle->mainctx = loop->mainctx;
handle->mainloop = loop->mainloop;
current_loop = current_loop->next;
if(current_loop == NULL)
current_loop = event_loops;
gboolean automatic_selection = TRUE;
if(allow_loop_indication && loop_index != -1) {
/* The API can drive the selection and an index was provided, check if it exists */
janus_ice_static_event_loop *loop = g_slist_nth_data(event_loops, loop_index);
if(loop == NULL) {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Invalid loop index %d, picking event loop automatically\n", handle->handle_id, loop_index);
} else {
automatic_selection = FALSE;
handle->mainctx = loop->mainctx;
handle->mainloop = loop->mainloop;
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Manually added handle to loop #%d\n", handle->handle_id, loop->id);
}
}
if(automatic_selection) {
/* Pick an available loop automatically (round robin) */
janus_ice_static_event_loop *loop = (janus_ice_static_event_loop *)current_loop->data;
handle->mainctx = loop->mainctx;
handle->mainloop = loop->mainloop;
current_loop = current_loop->next;
if(current_loop == NULL)
current_loop = event_loops;
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Automatically added handle to loop #%d\n", handle->handle_id, loop->id);
}
janus_mutex_unlock(&event_loops_mutex);
}
handle->rtp_source = janus_ice_outgoing_traffic_create(handle, (GDestroyNotify)g_free);
Expand Down Expand Up @@ -1730,6 +1781,9 @@ static void janus_ice_component_free(const janus_refcount *component_ref) {
janus_seq_list_free(&component->last_seqs_video[1]);
if(component->last_seqs_video[2])
janus_seq_list_free(&component->last_seqs_video[2]);
int i = 0;
for(i=0; i<JANUS_MAX_PENDING_MESSAGES; i++)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no need to initialize i twice.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense 😄 The paranoid in me tends to initialize everything, these days, as I've had my share of issues caused by a lack of that.

g_free(component->pending_messages[i].buffers);
g_free(component);
//~ janus_mutex_unlock(&handle->mutex);
}
Expand Down Expand Up @@ -3051,6 +3105,8 @@ static void janus_ice_cb_nice_recv(NiceAgent *agent, guint stream_id, guint comp
retransmits_cnt++;
/* Enqueue it */
janus_ice_queued_packet *pkt = g_malloc(sizeof(janus_ice_queued_packet));
g_atomic_int_set(&pkt->destroyed, 0);
janus_refcount_init(&pkt->ref, janus_ice_queued_packet_free);
pkt->data = g_malloc(p->length+SRTP_MAX_TAG_LEN);
memcpy(pkt->data, p->data, p->length);
pkt->length = p->length;
Expand Down Expand Up @@ -4362,10 +4418,7 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu
component->noerrorlog = FALSE;
if(pkt->encrypted) {
/* Already SRTCP */
int sent = nice_agent_send(handle->agent, stream->stream_id, component->component_id, pkt->length, (const gchar *)pkt->data);
if(sent < pkt->length) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] ... only sent %d bytes? (was %d)\n", handle->handle_id, sent, pkt->length);
}
janus_ice_send_or_store(handle, component, pkt);
} else {
/* Check if there's anything we need to do before sending */
uint32_t bitrate = janus_rtcp_get_remb(pkt->data, pkt->length);
Expand Down Expand Up @@ -4412,10 +4465,8 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu
JANUS_LOG(LOG_DBG, "[%"SCNu64"] ... SRTCP protect error... %s (len=%d-->%d)...\n", handle->handle_id, janus_srtp_error_str(res), pkt->length, protected);
} else {
/* Shoot! */
int sent = nice_agent_send(handle->agent, stream->stream_id, component->component_id, protected, pkt->data);
if(sent < protected) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] ... only sent %d bytes? (was %d)\n", handle->handle_id, sent, protected);
}
pkt->length = protected;
janus_ice_send_or_store(handle, component, pkt);
}
}
janus_ice_free_queued_packet(pkt);
Expand All @@ -4442,10 +4493,7 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu
/* Already RTP (probably a retransmission?) */
janus_rtp_header *header = (janus_rtp_header *)pkt->data;
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] ... Retransmitting seq.nr %"SCNu16"\n\n", handle->handle_id, ntohs(header->seq_number));
int sent = nice_agent_send(handle->agent, stream->stream_id, component->component_id, pkt->length, (const gchar *)pkt->data);
if(sent < pkt->length) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] ... only sent %d bytes? (was %d)\n", handle->handle_id, sent, pkt->length);
}
janus_ice_send_or_store(handle, component, pkt);
} else {
/* Overwrite SSRC */
janus_rtp_header *header = (janus_rtp_header *)pkt->data;
Expand Down Expand Up @@ -4554,18 +4602,15 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu
janus_ice_free_rtp_packet(p);
} else {
/* Shoot! */
int sent = nice_agent_send(handle->agent, stream->stream_id, component->component_id, protected, pkt->data);
if(sent < protected) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] ... only sent %d bytes? (was %d)\n", handle->handle_id, sent, protected);
}
/* Update stats */
if(sent > 0) {
/* Update the RTCP context as well */
int length = pkt->length;
pkt->length = protected;
if(janus_ice_send_or_store(handle, component, pkt)) {
/* Update stats, and the RTCP context as well */
janus_rtp_header *header = (janus_rtp_header *)pkt->data;
guint32 timestamp = ntohl(header->timestamp);
if(pkt->type == JANUS_ICE_PACKET_AUDIO) {
component->out_stats.audio.packets++;
component->out_stats.audio.bytes += pkt->length;
component->out_stats.audio.bytes += length;
/* Last second outgoing audio */
gint64 now = janus_get_monotonic_time();
if(component->out_stats.audio.updated == 0)
Expand All @@ -4576,7 +4621,7 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu
component->out_stats.audio.bytes_lastsec_temp = 0;
component->out_stats.audio.updated = now;
}
component->out_stats.audio.bytes_lastsec_temp += pkt->length;
component->out_stats.audio.bytes_lastsec_temp += length;
struct timeval tv;
gettimeofday(&tv, NULL);
if(stream->audio_last_ntp_ts == 0 || (gint32)(timestamp - stream->audio_last_rtp_ts) > 0) {
Expand All @@ -4596,7 +4641,7 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu
rtcp_ctx->tb = clock_rate;
} else if(pkt->type == JANUS_ICE_PACKET_VIDEO) {
component->out_stats.video[0].packets++;
component->out_stats.video[0].bytes += pkt->length;
component->out_stats.video[0].bytes += length;
/* Last second outgoing video */
gint64 now = janus_get_monotonic_time();
if(component->out_stats.video[0].updated == 0)
Expand All @@ -4607,7 +4652,7 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu
component->out_stats.video[0].bytes_lastsec_temp = 0;
component->out_stats.video[0].updated = now;
}
component->out_stats.video[0].bytes_lastsec_temp += pkt->length;
component->out_stats.video[0].bytes_lastsec_temp += length;
struct timeval tv;
gettimeofday(&tv, NULL);
if(stream->video_last_ntp_ts == 0 || (gint32)(timestamp - stream->video_last_rtp_ts) > 0) {
Expand Down Expand Up @@ -4819,6 +4864,8 @@ void janus_ice_relay_rtp(janus_ice_handle *handle, janus_plugin_rtp *packet) {
}
/* Queue this packet */
janus_ice_queued_packet *pkt = g_malloc(sizeof(janus_ice_queued_packet));
g_atomic_int_set(&pkt->destroyed, 0);
janus_refcount_init(&pkt->ref, janus_ice_queued_packet_free);
pkt->data = g_malloc(totlen + SRTP_MAX_TAG_LEN);
/* RTP header first */
memcpy(pkt->data, packet->buffer, RTP_HEADER_SIZE);
Expand Down Expand Up @@ -4870,6 +4917,8 @@ void janus_ice_relay_rtcp_internal(janus_ice_handle *handle, janus_plugin_rtcp *
}
/* Queue this packet */
janus_ice_queued_packet *pkt = g_malloc(sizeof(janus_ice_queued_packet));
g_atomic_int_set(&pkt->destroyed, 0);
janus_refcount_init(&pkt->ref, janus_ice_queued_packet_free);
pkt->data = g_malloc(rtcp_len+SRTP_MAX_TAG_LEN+4);
memcpy(pkt->data, rtcp_buf, rtcp_len);
pkt->length = rtcp_len;
Expand Down Expand Up @@ -4936,6 +4985,8 @@ void janus_ice_relay_data(janus_ice_handle *handle, janus_plugin_data *packet) {
return;
/* Queue this packet */
janus_ice_queued_packet *pkt = g_malloc(sizeof(janus_ice_queued_packet));
g_atomic_int_set(&pkt->destroyed, 0);
janus_refcount_init(&pkt->ref, janus_ice_queued_packet_free);
pkt->data = g_malloc(packet->length);
memcpy(pkt->data, packet->buffer, packet->length);
pkt->length = packet->length;
Expand All @@ -4956,6 +5007,8 @@ void janus_ice_relay_sctp(janus_ice_handle *handle, char *buffer, int length) {
return;
/* Queue this packet */
janus_ice_queued_packet *pkt = g_malloc(sizeof(janus_ice_queued_packet));
g_atomic_int_set(&pkt->destroyed, 0);
janus_refcount_init(&pkt->ref, janus_ice_queued_packet_free);
pkt->data = g_malloc(length);
memcpy(pkt->data, buffer, length);
pkt->length = length;
Expand Down Expand Up @@ -5053,3 +5106,84 @@ void janus_ice_dtls_handshake_done(janus_ice_handle *handle, janus_ice_component
session->session_id, handle->handle_id, handle->opaque_id, info);
}
}

/* The following static methods help with sending multiple messages together */
static int janus_ice_send(janus_ice_handle *handle, janus_ice_component *component) {
if(!handle || !component || component->pending_messages_num == 0)
return -1;
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Sending %u packets we stored so far\n",
handle->handle_id, component->pending_messages_num);
GError *error = NULL;
int sent = nice_agent_send_messages_nonblocking(handle->agent,
component->stream_id, component->component_id,
&component->pending_messages[0], component->pending_messages_num, NULL, &error);
if(sent == 0) {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] No packets sent, should try again?\n", handle->handle_id);
} else if(sent < 0) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Error sending message: %d (%s)\n", handle->handle_id,
error->code, error->message ? error->message : "??");
}
return sent;
}
static gboolean janus_ice_send_or_store(janus_ice_handle *handle,
janus_ice_component *component, janus_ice_queued_packet *pkt) {
if(!handle || !component || !pkt)
return FALSE;
if(!use_sendmmsg) {
/* We're not using sendmmsg, use nice_agent_send right away */
int sent = nice_agent_send(handle->agent, component->stream_id,
component->component_id, pkt->length, (const gchar *)pkt->data);
if(sent < pkt->length) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] ... only sent %d bytes? (was %d)\n", handle->handle_id, sent, pkt->length);
}
return sent == pkt->length;
}
/* First of all, let's set this packet in the first available NiceOutputMessage */
if(component->pending_messages_num == JANUS_MAX_PENDING_MESSAGES) {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Too many packets pending? Dumping this packet...\n", handle->handle_id);
return FALSE;
}
component->pending_messages[component->pending_messages_num].n_buffers = 1;
if(component->pending_messages[component->pending_messages_num].buffers == NULL)
component->pending_messages[component->pending_messages_num].buffers = g_malloc(sizeof(GOutputVector));
component->pending_messages[component->pending_messages_num].buffers->buffer = pkt->data;
component->pending_messages[component->pending_messages_num].buffers->size = pkt->length;
component->pending_messages_data[component->pending_messages_num] = pkt;
janus_refcount_increase(&pkt->ref);
component->pending_messages_num++;
/* If there are messages in the queue and we have room, let's wait */
if(component->pending_messages_num < JANUS_MAX_PENDING_MESSAGES &&
g_async_queue_length(handle->queued_packets) > 0) {
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Storing packet, will deliver it later\n", handle->handle_id);
return TRUE;
}
/* We're ready to send the packets we stored so far: if janus_ice_send
* returns zero, it means we have to try again, so that's why we loop */
int sent = 0;
do {
sent = janus_ice_send(handle, component);
} while(sent == 0);
/* Update the indexes and clean resources if we sent some or all of the packets */
if(sent > 0) {
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Sent %d/%u packets, updating array\n",
handle->handle_id, sent, component->pending_messages_num);
int i=0;
for(i=0; i<component->pending_messages_num; i++) {
if (i < sent) {
/* We sent these packets, free them */
pkt = (janus_ice_queued_packet *)component->pending_messages_data[i];
if(pkt != NULL) {
janus_refcount_decrease(&pkt->ref);
}
} else {
/* We didn't send all messages, shift the arrays */
component->pending_messages[i-sent].buffers->buffer = component->pending_messages[i].buffers->buffer;
component->pending_messages[i-sent].buffers->size = component->pending_messages[i].buffers->size;
component->pending_messages_data[i-sent] = component->pending_messages_data[i];
}
}
component->pending_messages_num -= sent;

}
return sent > 0;
}
Loading