diff --git a/src/plugins/janus_audiobridge.c b/src/plugins/janus_audiobridge.c index aa59b7f055..5af9139b43 100644 --- a/src/plugins/janus_audiobridge.c +++ b/src/plugins/janus_audiobridge.c @@ -1729,9 +1729,9 @@ typedef struct janus_audiobridge_participant { OpusDecoder *decoder; /* Opus decoder instance */ gboolean fec; /* Opus FEC status */ int expected_loss; /* Percentage of expected loss, to configure libopus FEC behaviour (default=0, no FEC even if negotiated) */ - uint16_t expected_seq; /* Expected sequence number */ uint16_t probation; /* Used to determine new ssrc validity */ uint32_t last_timestamp; /* Last in seq timestamp */ + uint16_t last_seq; /* Last sequence number */ gboolean reset; /* Whether or not the Opus context must be reset, without re-joining the room */ GThread *thread; /* Encoding thread for this participant */ gboolean mjr_active; /* Whether this participant has to be recorded to an mjr file or not */ @@ -2189,6 +2189,7 @@ static int janus_audiobridge_resample(int16_t *input, int input_num, int input_r /* Jitter Buffer and queue-in settings */ #define JITTER_BUFFER_MIN_PACKETS 2 #define JITTER_BUFFER_MAX_PACKETS 50 +#define JITTER_BUFFER_MAX_GAP_SIZE 20 #define JITTER_BUFFER_CHECK_USECS 1*G_USEC_PER_SEC #define QUEUE_IN_MAX_PACKETS 4 @@ -6603,9 +6604,9 @@ static void *janus_audiobridge_handler(void *data) { participant->decoder = NULL; participant->reset = FALSE; participant->fec = FALSE; - participant->expected_seq = 0; participant->probation = 0; participant->last_timestamp = 0; + participant->last_seq = 0; janus_mutex_init(&participant->qmutex); participant->arc = NULL; janus_audiobridge_plainrtp_media_cleanup(&participant->plainrtp_media); @@ -8747,8 +8748,9 @@ static void *janus_audiobridge_participant_thread(void *data) { int jitter_ticks = 0; janus_rtp_header *rtp = NULL; gint64 now = janus_get_monotonic_time(), before = now; - gboolean first = TRUE, use_fec = FALSE; + gboolean first = TRUE; int ret = 0; + int lost_packets_gap = 0; /* Start working: check both the incoming queue (to decode and queue) and the outgoing one (to encode and send) */ while(!g_atomic_int_get(&stopping) && g_atomic_int_get(&session->destroyed) == 0) { @@ -8779,8 +8781,54 @@ static void *janus_audiobridge_participant_thread(void *data) { jitter_buffer_tick(participant->jitter); janus_mutex_unlock(&participant->qmutex); if(ret != JITTER_BUFFER_OK) { - /* No packet in the jitter buffer? Move on the talking detection, if needed */ - janus_audiobridge_participant_istalking(session, participant, NULL, NULL); + if(!first && participant->codec == JANUS_AUDIOCODEC_OPUS && lost_packets_gap <= JITTER_BUFFER_MAX_GAP_SIZE && !participant->muted) { + lost_packets_gap += 1; + + if(!g_atomic_int_compare_and_exchange(&participant->decoding, 0, 1)) { + /* This means we're cleaning up, so don't try to decode */ + janus_audiobridge_buffer_packet_destroy(bpkt); + break; + } + + int32_t output_samples; + opus_decoder_ctl(participant->decoder, OPUS_GET_LAST_PACKET_DURATION(&output_samples)); + + pkt = g_malloc(sizeof(janus_audiobridge_rtp_relay_packet)); + pkt->data = g_malloc0(BUFFER_SAMPLES * sizeof(opus_int16)); + pkt->ssrc = 0; + pkt->timestamp = participant->last_timestamp + OPUS_SAMPLES; + pkt->seq_number = participant->last_seq + 1; + /* This is a redundant packet, so we can't parse any extension info */ + pkt->silence = FALSE; + janus_audiobridge_participant_istalking(session, participant, NULL, NULL); + pkt->length = opus_decode(participant->decoder, NULL, 0, (opus_int16 *)pkt->data, output_samples, 0); +#ifdef HAVE_RNNOISE + /* Check if we need to denoise this packet */ + if(participant->denoise) + janus_audiobridge_participant_denoise(participant, (char *)pkt->data, pkt->length); +#endif + /* Update the details */ + participant->last_seq = pkt->seq_number; + participant->last_timestamp = pkt->timestamp; + g_atomic_int_set(&participant->decoding, 0); + if(pkt->length < 0) { + JANUS_LOG(LOG_ERR, "[Opus] Ops! got an error decoding the Opus frame: %d (%s)\n", pkt->length, opus_strerror(pkt->length)); + g_free(pkt->data); + g_free(pkt); + break; + } + /* Queue the decoded packet for the mixer */ + /* Do not let queue-in grow too much */ + guint count = g_list_length(participant->inbuf); + if((int) count > QUEUE_IN_MAX_PACKETS) { + JANUS_LOG(LOG_WARN, "Participant queue-in contains too many packets, clearing now (count=%u)\n", count); + janus_audiobridge_participant_clear_inbuf(participant); + } + participant->inbuf = g_list_append(participant->inbuf, pkt); + } else { + /* No packet in the jitter buffer? Move on the talking detection, if needed */ + janus_audiobridge_participant_istalking(session, participant, NULL, NULL); + } } else { /* Decode the audio packet */ bpkt = (janus_audiobridge_buffer_packet *)jbp.data; @@ -8802,42 +8850,8 @@ static void *janus_audiobridge_participant_thread(void *data) { break; } rtp = (janus_rtp_header *)buffer; - /* If this is Opus, check if there's a packet gap we should fix with FEC */ - use_fec = FALSE; - /* FIXME Disable inbound FEC due to potential deadlocks on qmutex */ - /* TODO replacing FEC with PLC will eliminate the issue */ - //if(!first && participant->codec == JANUS_AUDIOCODEC_OPUS && participant->fec) { - // if(ntohs(rtp->seq_number) == (participant->expected_seq + 1)) { - // /* Lost a packet here? Use FEC to recover */ - // use_fec = TRUE; - // } - //} first = FALSE; - if(use_fec) { - /* There was a gap, try to get decode from redundant info first */ - pkt = g_malloc(sizeof(janus_audiobridge_rtp_relay_packet)); - pkt->data = g_malloc0(BUFFER_SAMPLES*sizeof(opus_int16)); - pkt->ssrc = 0; - pkt->timestamp = participant->last_timestamp + 960; /* FIXME */ - pkt->seq_number = participant->expected_seq; /* FIXME */ - /* This is a redundant packet, so we can't parse any extension info */ - pkt->silence = FALSE; - /* Decode the lost packet using fec=1 */ - int32_t output_samples; - opus_decoder_ctl(participant->decoder, OPUS_GET_LAST_PACKET_DURATION(&output_samples)); - pkt->length = opus_decode(participant->decoder, payload, plen, (opus_int16 *)pkt->data, output_samples, 1); -#ifdef HAVE_RNNOISE - /* Check if we need to denoise this packet */ - if(participant->denoise) - janus_audiobridge_participant_denoise(participant, (char *)pkt->data, pkt->length); -#endif - /* Queue the decoded redundant packet for the mixer */ - /* FIXME This mutex lock can race with hangup_media_internal */ - janus_mutex_lock(&participant->qmutex); - participant->inbuf = g_list_append(participant->inbuf, pkt); - janus_mutex_unlock(&participant->qmutex); - /* Now we can process the next packet */ - } + lost_packets_gap = 0; /* Decode the packet */ pkt = g_malloc(sizeof(janus_audiobridge_rtp_relay_packet)); pkt->data = g_malloc0(BUFFER_SAMPLES*sizeof(opus_int16)); @@ -8882,8 +8896,8 @@ static void *janus_audiobridge_participant_thread(void *data) { /* Get rid of the buffered packet */ janus_audiobridge_buffer_packet_destroy(bpkt); /* Update the details */ + participant->last_seq = pkt->seq_number; participant->last_timestamp = pkt->timestamp; - participant->expected_seq = pkt->seq_number + 1; g_atomic_int_set(&participant->decoding, 0); if(pkt->length < 0) { if(participant->codec == JANUS_AUDIOCODEC_OPUS) {