Skip to content

Commit

Permalink
Fix rare deadlock in AudioBridge plugin when closing connections (#3387)
Browse files Browse the repository at this point in the history
  • Loading branch information
lminiero committed Jun 17, 2024
1 parent 92f537b commit 14f0f16
Showing 1 changed file with 11 additions and 15 deletions.
26 changes: 11 additions & 15 deletions src/plugins/janus_audiobridge.c
Original file line number Diff line number Diff line change
Expand Up @@ -4259,16 +4259,16 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s
goto prepare_response;
}

janus_mutex_lock(&participant->qmutex);
participant->muted = muted;
JANUS_LOG(LOG_VERB, "Setting muted property: %s (room %s, user %s)\n",
participant->muted ? "true" : "false", participant->room->room_id_str, participant->user_id_str);
if(participant->muted) {
/* Clear the queued packets waiting to be handled */
janus_mutex_lock(&participant->qmutex);
janus_audiobridge_participant_clear_jitter_buffer(participant);
janus_audiobridge_participant_clear_inbuf(participant);
janus_mutex_unlock(&participant->qmutex);
}
janus_mutex_unlock(&participant->qmutex);

json_t *list = json_array();
json_t *pl = json_object();
Expand Down Expand Up @@ -7036,16 +7036,16 @@ static void *janus_audiobridge_handler(void *data) {
}
if(muted || display || (participant->stereo && spatial) || denoise) {
if(muted) {
janus_mutex_lock(&participant->qmutex);
participant->muted = json_is_true(muted);
JANUS_LOG(LOG_VERB, "Setting muted property: %s (room %s, user %s)\n",
participant->muted ? "true" : "false", participant->room->room_id_str, participant->user_id_str);
if(participant->muted) {
/* Clear the queued packets waiting to be handled */
janus_mutex_lock(&participant->qmutex);
janus_audiobridge_participant_clear_jitter_buffer(participant);
janus_audiobridge_participant_clear_inbuf(participant);
janus_mutex_unlock(&participant->qmutex);
}
janus_mutex_unlock(&participant->qmutex);
}
if(display) {
char *old_display = participant->display;
Expand Down Expand Up @@ -8765,7 +8765,6 @@ static void *janus_audiobridge_participant_thread(void *data) {
/* Start with packets to decode and queue for the mixer */
now = janus_get_monotonic_time();
janus_mutex_lock(&participant->qmutex);
gboolean locked = TRUE;
/* Start by reading packets to decode from the jitter buffer on a clock */
if(now - before >= 18000) {
before += 20000;
Expand All @@ -8784,9 +8783,8 @@ static void *janus_audiobridge_participant_thread(void *data) {
} else {
/* Decode the audio packet */
bpkt = (janus_audiobridge_buffer_packet *)jbp.data;
janus_mutex_unlock(&participant->qmutex);
locked = FALSE;
if(!g_atomic_int_compare_and_exchange(&participant->decoding, 0, 1)) {
janus_mutex_unlock(&participant->qmutex);
/* This means we're cleaning up, so don't try to decode */
janus_audiobridge_buffer_packet_destroy(bpkt);
break;
Expand All @@ -8797,9 +8795,10 @@ static void *janus_audiobridge_participant_thread(void *data) {
int plen = 0;
const unsigned char *payload = (const unsigned char *)janus_rtp_payload(buffer, len, &plen);
if(!payload) {
g_atomic_int_set(&participant->decoding, 0);
JANUS_LOG(LOG_ERR, "[%s] Ops! got an error accessing the RTP payload\n",
participant->codec == JANUS_AUDIOCODEC_OPUS ? "Opus" : "G.711");
g_atomic_int_set(&participant->decoding, 0);
janus_mutex_unlock(&participant->qmutex);
janus_audiobridge_buffer_packet_destroy(bpkt);
break;
}
Expand Down Expand Up @@ -8833,9 +8832,7 @@ static void *janus_audiobridge_participant_thread(void *data) {
janus_audiobridge_participant_denoise(participant, (char *)pkt->data, pkt->length);
#endif
/* Queue the decoded redundant packet for the mixer */
janus_mutex_lock(&participant->qmutex);
participant->inbuf = g_list_append(participant->inbuf, pkt);
janus_mutex_unlock(&participant->qmutex);
/* Now we can process the next packet */
}
/* Decode the packet */
Expand All @@ -8854,8 +8851,9 @@ static void *janus_audiobridge_participant_thread(void *data) {
} else if(participant->codec == JANUS_AUDIOCODEC_PCMA || participant->codec == JANUS_AUDIOCODEC_PCMU) {
/* G.711 */
if(plen != 160) {
g_atomic_int_set(&participant->decoding, 0);
JANUS_LOG(LOG_WARN, "[G.711] Wrong packet size (expected 160, got %d), skipping audio packet\n", plen);
g_atomic_int_set(&participant->decoding, 0);
janus_mutex_unlock(&participant->qmutex);
janus_audiobridge_buffer_packet_destroy(bpkt);
break;
}
Expand Down Expand Up @@ -8889,13 +8887,12 @@ static void *janus_audiobridge_participant_thread(void *data) {
} else {
JANUS_LOG(LOG_ERR, "[G.711] Ops! got an error decoding the audio frame\n");
}
janus_mutex_unlock(&participant->qmutex);
g_free(pkt->data);
g_free(pkt);
break;
}
/* Queue the decoded packet for the mixer */
janus_mutex_lock(&participant->qmutex);
locked = TRUE;
/* Do not let queue-in grow too much */
guint count = g_list_length(participant->inbuf);
if(count > QUEUE_IN_MAX_PACKETS) {
Expand All @@ -8906,8 +8903,7 @@ static void *janus_audiobridge_participant_thread(void *data) {
}
}
}
if(locked)
janus_mutex_unlock(&participant->qmutex);
janus_mutex_unlock(&participant->qmutex);
/* Now check if there's packets to encode */
mixedpkt = g_async_queue_try_pop(participant->outbuf);
if(mixedpkt != NULL && g_atomic_int_get(&session->destroyed) == 0 && g_atomic_int_get(&session->started)) {
Expand Down

0 comments on commit 14f0f16

Please sign in to comment.