Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added changes to pause & resume recorder #918

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 75 additions & 3 deletions plugins/janus_videoroom.c
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ typedef struct janus_videoroom_publisher {
gint64 fir_latest; /* Time of latest sent FIR (to avoid flooding) */
gint fir_seq; /* FIR sequence number */
gboolean recording_active; /* Whether this publisher has to be recorded or not */
gboolean recording_paused; /* Whether this publisher recording has been paused or not */
gchar *recording_base; /* Base name for the recording (e.g., /path/to/filename, will generate /path/to/filename-audio.mjr and/or /path/to/filename-video.mjr */
janus_recorder *arc; /* The Janus recorder instance for this publisher's audio, if enabled */
janus_recorder *vrc; /* The Janus recorder instance for this publisher's video, if enabled */
Expand Down Expand Up @@ -602,7 +603,6 @@ static void janus_videoroom_publisher_free(const janus_refcount *p_ref) {
static void janus_videoroom_session_dereference(janus_videoroom_session *session) {
janus_refcount_decrease(&session->ref);
}

static void janus_videoroom_session_destroy(janus_videoroom_session *session) {
if(session && g_atomic_int_compare_and_exchange(&session->destroyed, 0, 1))
janus_refcount_decrease(&session->ref);
Expand Down Expand Up @@ -680,6 +680,8 @@ static void janus_videoroom_message_free(janus_videoroom_message *msg) {
#define JANUS_VIDEOROOM_ERROR_NOT_PUBLISHED 435
#define JANUS_VIDEOROOM_ERROR_ID_EXISTS 436
#define JANUS_VIDEOROOM_ERROR_INVALID_SDP 437
#define JANUS_VIDEOROOM_ERROR_NO_RECORDER 438
#define JANUS_VIDEOROOM_ERROR_INVALID_RECORDER 439


static guint32 janus_videoroom_rtp_forwarder_add_helper(janus_videoroom_publisher *p,
Expand Down Expand Up @@ -1176,6 +1178,7 @@ json_t *janus_videoroom_query_session(janus_plugin_session *handle) {
json_object_set_new(recording, "video", json_string(participant->vrc->filename));
if(participant->drc && participant->drc->filename)
json_object_set_new(recording, "data", json_string(participant->drc->filename));
json_object_set_new(recording, "state", json_string(participant->recording_paused ?"Paused":"Active"));
json_object_set_new(info, "recording", recording);
}
g_clear_pointer(&participant, janus_videoroom_publisher_dereference);
Expand Down Expand Up @@ -2222,7 +2225,7 @@ struct janus_plugin_result *janus_videoroom_handle_message(janus_plugin_session
} else if(!strcasecmp(request_text, "join") || !strcasecmp(request_text, "joinandconfigure")
|| !strcasecmp(request_text, "configure") || !strcasecmp(request_text, "publish") || !strcasecmp(request_text, "unpublish")
|| !strcasecmp(request_text, "start") || !strcasecmp(request_text, "pause") || !strcasecmp(request_text, "switch")
|| !strcasecmp(request_text, "stop") || !strcasecmp(request_text, "leave")) {
|| !strcasecmp(request_text, "stop") || !strcasecmp(request_text, "keyframe") || !strcasecmp(request_text, "leave")) {
/* These messages are handled asynchronously */

janus_videoroom_message *msg = g_malloc0(sizeof(janus_videoroom_message));
Expand Down Expand Up @@ -2699,7 +2702,24 @@ static void janus_videoroom_recorder_create(janus_videoroom_publisher *participa
}
}
}

static int janus_videoroom_recorder_pause(janus_videoroom_publisher *participant, gboolean paused) {
int err = JANUS_VIDEOROOM_ERROR_NO_RECORDER;
if(participant->drc) {
err = paused ? janus_recorder_pause(participant->drc) : janus_recorder_resume(participant->drc);
}
if(participant->arc) {
err = paused ? janus_recorder_pause(participant->arc) : janus_recorder_resume(participant->arc);
}
if(participant->vrc) {
JANUS_LOG(LOG_INFO, "%s video recording file: %s\n", paused?"Pause":"Resume", participant->vrc->filename ? participant->vrc->filename : "");
err = paused ? janus_recorder_pause(participant->vrc) : janus_recorder_resume(participant->vrc);
}
if(err && err != JANUS_VIDEOROOM_ERROR_NO_RECORDER) {
JANUS_LOG(LOG_ERR, "%s event on invalid recoder ? err: %d le: %s\n", paused?"Pause":"Resume", err);
err = JANUS_VIDEOROOM_ERROR_INVALID_RECORDER;
}
return err;
}
static void janus_videoroom_recorder_close(janus_videoroom_publisher *participant) {
if(participant->arc) {
janus_recorder_close(participant->arc);
Expand Down Expand Up @@ -2758,6 +2778,8 @@ void janus_videoroom_hangup_media(janus_plugin_session *handle) {
participant->remb_latest = 0;
participant->fir_latest = 0;
participant->fir_seq = 0;
participant->recording_paused = FALSE;
participant->recording_active = FALSE;
while(participant->subscribers) {
janus_videoroom_subscriber *s = (janus_videoroom_subscriber *)participant->subscribers->data;
if(s) {
Expand Down Expand Up @@ -2964,6 +2986,7 @@ static void *janus_videoroom_handler(void *data) {
publisher->video_active = FALSE;
publisher->data_active = FALSE;
publisher->recording_active = FALSE;
publisher->recording_paused = FALSE;
publisher->recording_base = NULL;
publisher->arc = NULL;
publisher->vrc = NULL;
Expand Down Expand Up @@ -3259,6 +3282,7 @@ static void *janus_videoroom_handler(void *data) {
json_t *data = json_object_get(root, "data");
json_t *bitrate = json_object_get(root, "bitrate");
json_t *record = json_object_get(root, "record");
json_t *record_pause = json_object_get(root, "record_pause");
json_t *recfile = json_object_get(root, "filename");
json_t *display = json_object_get(root, "display");
json_t *refresh = json_object_get(root, "refresh");
Expand Down Expand Up @@ -3349,6 +3373,28 @@ static void *janus_videoroom_handler(void *data) {
}
}
}
if(record_pause) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no check here on whether arc, vrc or drc exist. Trying to pause/resume a non existing recording should return an error, while in this case you're always succeeding no matter what.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is important, because you may set a recording to paused and then start it, and the fact that both succeeded may make you think that you started the recording as paused, and you can resume it later, which is not the case.

if(participant->recording_paused != json_is_true(record_pause)) {
JANUS_LOG(LOG_VERB, "Recording video, PAUSE changed %d to %d \n", participant->recording_paused, json_is_true(record_pause));
participant->recording_paused = json_is_true(record_pause);
error_code = janus_videoroom_recorder_pause(participant, participant->recording_paused);
if(!participant->recording_paused && strstr(participant->sdp, "m=video")) {
/* Send a FIR */
char buf[20];
memset(buf, 0, 20);
janus_rtcp_fir((char *)&buf, 20, &participant->fir_seq);
JANUS_LOG(LOG_VERB, "Recording RESUME video, sending FIR to %"SCNu64" (%s)\n",
participant->user_id, participant->display ? participant->display : "??");
gateway->relay_rtcp(participant->session->handle, 1, buf, 20);
/* Send a PLI too, just in case... */
memset(buf, 0, 12);
janus_rtcp_pli((char *)&buf, 12);
JANUS_LOG(LOG_VERB, "Recording RESUME video, sending PLI to %"SCNu64" (%s)\n",
participant->user_id, participant->display ? participant->display : "??");
gateway->relay_rtcp(participant->session->handle, 1, buf, 12);
}
}
}
janus_mutex_unlock(&participant->rec_mutex);
if(display) {
janus_mutex_lock(&participant->room->mutex);
Expand All @@ -3375,6 +3421,9 @@ static void *janus_videoroom_handler(void *data) {
event = json_object();
json_object_set_new(event, "videoroom", json_string("event"));
json_object_set_new(event, "room", json_integer(participant->room->room_id));
if(error_code) {
/* TODO Handle error in recorder pause/resume */
}
json_object_set_new(event, "configured", json_string("ok"));
/* Also notify event handlers */
if(notify_events && gateway->events_is_enabled()) {
Expand All @@ -3394,10 +3443,33 @@ static void *janus_videoroom_handler(void *data) {
json_object_set_new(recording, "video", json_string(participant->vrc->filename));
if(participant->drc && participant->drc->filename)
json_object_set_new(recording, "data", json_string(participant->drc->filename));
json_object_set_new(recording, "state", json_string(participant->recording_paused ?"Paused":"Active"));
json_object_set_new(info, "recording", recording);
}
gateway->notify_event(&janus_videoroom_plugin, session->handle, info);
}
} else if(!strcasecmp(request_text, "keyframe")) {
event = json_object();
json_object_set_new(event, "videoroom", json_string("event"));
json_object_set_new(event, "room", json_integer(participant->room->room_id));
if(strstr(participant->sdp, "m=video")) {
/* Send a FIR */
char buf[20];
memset(buf, 0, 20);
janus_rtcp_fir((char *)&buf, 20, &participant->fir_seq);
JANUS_LOG(LOG_VERB, "Video keyframe , sending FIR to %"SCNu64" (%s)\n",
participant->user_id, participant->display ? participant->display : "??");
gateway->relay_rtcp(participant->session->handle, 1, buf, 20);
/* Send a PLI too, just in case... */
memset(buf, 0, 12);
janus_rtcp_pli((char *)&buf, 12);
JANUS_LOG(LOG_VERB, "Video keyframe, sending PLI to %"SCNu64" (%s)\n",
participant->user_id, participant->display ? participant->display : "??");
gateway->relay_rtcp(participant->session->handle, 1, buf, 12);
json_object_set_new(event, "keyframe", json_string("requested"));
} else
json_object_set_new(event, "keyframe", json_string("not_required"));

} else if(!strcasecmp(request_text, "unpublish")) {
/* This participant wants to unpublish */
if(!participant->sdp) {
Expand Down
71 changes: 68 additions & 3 deletions record.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "record.h"
#include "debug.h"
#include "utils.h"
#include "rtp.h"

#define htonll(x) ((1==htonl(1)) ? (x) : ((gint64)htonl((x) & 0xFFFFFFFF) << 32) | htonl((x) >> 32))
#define ntohll(x) ((1==ntohl(1)) ? (x) : ((gint64)ntohl((x) & 0xFFFFFFFF) << 32) | ntohl((x) >> 32))
Expand Down Expand Up @@ -92,6 +93,11 @@ janus_recorder *janus_recorder_create(const char *dir, const char *codec, const
rc->file = NULL;
rc->codec = g_strdup(codec);
rc->created = janus_get_real_time();
rc->paused = 0;
rc->paused_ts = 0;
rc->offset_ts = 0;
rc->paused_seq = 0;
rc->offset_seq = 0;
if(dir != NULL) {
/* Check if this directory exists, and create it if needed */
struct stat s;
Expand Down Expand Up @@ -166,6 +172,7 @@ janus_recorder *janus_recorder_create(const char *dir, const char *codec, const
}

int janus_recorder_save_frame(janus_recorder *recorder, char *buffer, uint length) {
rtp_header *rtp = (rtp_header *)buffer;
if(!recorder)
return -1;
janus_mutex_lock_nodebug(&recorder->mutex);
Expand Down Expand Up @@ -205,17 +212,51 @@ int janus_recorder_save_frame(janus_recorder *recorder, char *buffer, uint lengt
/* Done */
recorder->header = 1;
}
if(recorder->paused) {
if(!recorder->paused_ts) {
if(recorder->type == JANUS_RECORDER_DATA) {
recorder->paused_ts = janus_get_real_time();
} else {
recorder->paused_ts = ntohl(rtp->timestamp);
recorder->paused_seq = ntohs(rtp->seq_number);
}
JANUS_LOG(LOG_INFO, "Pause TS: %ld Seq: %u, Offset TS: %ld Seq: %u \n",
recorder->paused_ts, recorder->paused_seq, recorder->offset_ts, recorder->offset_seq);
}
janus_mutex_unlock_nodebug(&recorder->mutex);
return 0;
}
if(recorder->paused_ts) {
if(recorder->type == JANUS_RECORDER_DATA) {
recorder->offset_ts += janus_get_real_time() - recorder->paused_ts;
} else {
recorder->offset_ts = (uint32_t) recorder->offset_ts + (ntohl(rtp->timestamp) - (uint32_t)recorder->paused_ts);
JANUS_LOG(LOG_INFO, "Resume TS: %u Pause TS: %ld Offset TS: %ld \n", rtp->timestamp, recorder->paused_ts, recorder->offset_ts);
}
recorder->paused_ts = 0;
}
if(recorder->paused_seq && (recorder->type != JANUS_RECORDER_DATA)) {
recorder->offset_seq = recorder->offset_seq + (ntohs(rtp->seq_number) - recorder->paused_seq);
JANUS_LOG(LOG_INFO, "Resume Seq: %u Pause Seq: %hd Offset: %hd \n", ntohs(rtp->seq_number), recorder->paused_seq, recorder->offset_seq);
recorder->paused_seq = 0;
}
/* Write frame header */
fwrite(frame_header, sizeof(char), strlen(frame_header), recorder->file);
uint16_t header_bytes = htons(recorder->type == JANUS_RECORDER_DATA ? (length+sizeof(gint64)) : length);
fwrite(&header_bytes, sizeof(uint16_t), 1, recorder->file);
/* Save packet on file */
int temp = 0, tot = length;
if(recorder->type == JANUS_RECORDER_DATA) {
/* If it's data, then we need to prepend timing related info, as it's not there by itself */
gint64 now = htonll(janus_get_real_time());
gint64 now = htonll(janus_get_real_time() - recorder->offset_ts);
fwrite(&now, sizeof(gint64), 1, recorder->file);
} else {
uint32_t ts = rtp->timestamp;
rtp->timestamp = htonl( ntohl(rtp->timestamp) - recorder->offset_ts);
fwrite(buffer, sizeof(char), 12, recorder->file);
tot -= 12;
rtp->timestamp = ts;
}
/* Save packet on file */
int temp = 0, tot = length;
while(tot > 0) {
temp = fwrite(buffer+length-tot, sizeof(char), tot, recorder->file);
if(temp <= 0) {
Expand All @@ -230,6 +271,30 @@ int janus_recorder_save_frame(janus_recorder *recorder, char *buffer, uint lengt
return 0;
}

int janus_recorder_pause(janus_recorder *recorder) {
if(!recorder || !recorder->writable)
return -1;
janus_mutex_lock_nodebug(&recorder->mutex);
recorder->paused = 1;
/*
//TODO Write some PAUSE Header with Timestamp, to synchronize between multiple streams
fwrite(pause_header, sizeof(char), strlen(frame_header), recorder->file);
uint16_t header_bytes = htons(0);
fwrite(&header_bytes, sizeof(uint16_t), 1, recorder->file);
*/
janus_mutex_unlock_nodebug(&recorder->mutex);
return 0;
}

int janus_recorder_resume(janus_recorder *recorder) {
if(!recorder || !recorder->writable)
return -1;
janus_mutex_lock_nodebug(&recorder->mutex);
recorder->paused = 0;
janus_mutex_unlock_nodebug(&recorder->mutex);
return 0;
}

int janus_recorder_close(janus_recorder *recorder) {
if(!recorder || !recorder->writable)
return -1;
Expand Down
14 changes: 14 additions & 0 deletions record.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ typedef struct janus_recorder {
gint64 created;
/*! \brief Media this instance is recording */
janus_recorder_medium type;
/*! \brief RTP Timestaps for pausing and resuming the recording */
gint64 paused_ts, offset_ts;
/*! \brief RTP Sequences for pausing and resuming the recording */
uint16_t paused_seq, offset_seq;
/*! \brief Whether the recording has been paused or not */
int paused:1;
/*! \brief Whether the info header for this recorder instance has already been written or not */
int header:1;
/*! \brief Whether this recorder instance can be used for writing or not */
Expand Down Expand Up @@ -77,6 +83,14 @@ janus_recorder *janus_recorder_create(const char *dir, const char *codec, const
* @param[in] length The frame data length
* @returns 0 in case of success, a negative integer otherwise */
int janus_recorder_save_frame(janus_recorder *recorder, char *buffer, uint length);
/*! \brief Pause the recorder
* @param[in] recorder The janus_recorder instance
* @returns 0 in case of success, a negative integer otherwise */
int janus_recorder_pause(janus_recorder *recorder);
/*! \brief Resume the recorder
* @param[in] recorder The janus_recorder instance
* @returns 0 in case of success, a negative integer otherwise */
int janus_recorder_resume(janus_recorder *recorder);
/*! \brief Close the recorder
* @param[in] recorder The janus_recorder instance to close
* @returns 0 in case of success, a negative integer otherwise */
Expand Down