Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Playout with seek #2482

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
24b5360
Replay data channel recordings
Dec 2, 2020
8e31a7a
Support for seek
Dec 3, 2020
d36e483
Corrections based on pull request feedback
Dec 3, 2020
aa2a5d2
Now correctly handles data packet header
Dec 3, 2020
3d7b7d6
Removed unnecessary code.
Dec 3, 2020
8b76b17
Removed unnecessary code.
Dec 3, 2020
217856b
Merge pull request #1 from ricardo-salgado-tekever/play_datarecording
ricardo-salgado-tekever Dec 3, 2020
524fe7a
Minor corrections.
Dec 3, 2020
d83f407
Comment style fix.
Dec 3, 2020
99969b6
get_frames now correctly parses data recording
Dec 3, 2020
7906ba8
Merge branch 'play_datarecording' into play_with_seek
Dec 3, 2020
56f8422
Preparing for UI change for seek.
Dec 3, 2020
9e80c7d
Merged with play_data_recordings
Dec 3, 2020
04011a6
Allows seeking to new position while in playout
Dec 4, 2020
d05f09b
First data packet now plays on correct timestamp
Dec 4, 2020
837ab6d
Merge branch 'play_datarecording' into play_with_seek
Dec 4, 2020
da6ea82
Minor change to data recording header read.
Dec 4, 2020
c434e37
Revert "Preparing for UI change for seek."
Dec 3, 2020
31731a5
Added cleanup code to avoid memory leaks.
Dec 7, 2020
2887690
Minor cosmetic change to match project read usage
Dec 7, 2020
247a077
Merge branch 'play_datarecording' into play_with_seek
Dec 7, 2020
168a9e3
Merge remote-tracking branch 'origin/master' into play_with_seek
Dec 7, 2020
ba39af1
Refactoring of seek location find.
Dec 7, 2020
9d40f2f
Cosmetic change for code style.
Dec 7, 2020
e45ebab
Reverted change for testing.
Dec 7, 2020
662369f
Merge remote-tracking branch 'origin/record-binarydata' into play_wit…
Dec 7, 2020
d522b2a
Seek to next available keyframe.
Dec 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion html/janus.js
Original file line number Diff line number Diff line change
Expand Up @@ -1521,7 +1521,7 @@ function Janus(gatewayCallbacks) {
return;
}
var onDataChannelMessage = function(event) {
Janus.log('Received message on data channel:', event);
Janus.debug('Received message on data channel:', event);
var label = event.target.label;
pluginHandle.ondata(event.data, label);
};
Expand Down
205 changes: 202 additions & 3 deletions plugins/janus_recordplay.c
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,10 @@ static struct janus_json_parameter play_parameters[] = {
{"restart", JANUS_JSON_BOOL, 0}
};

static struct janus_json_parameter seek_parameters[] = {
{"timestamp", JSON_INTEGER, JANUS_JSON_PARAM_REQUIRED | JANUS_JSON_PARAM_POSITIVE}
};

/* Useful stuff */
static volatile gint initialized = 0, stopping = 0;
static gboolean notify_events = TRUE;
Expand Down Expand Up @@ -456,10 +460,17 @@ typedef struct janus_recordplay_session {
volatile gint hangingup;
volatile gint destroyed;
janus_refcount ref;
GAsyncQueue *seek_requests;
} janus_recordplay_session;
static GHashTable *sessions;
static janus_mutex sessions_mutex = JANUS_MUTEX_INITIALIZER;

typedef struct janus_recordplay_seek_request {
janus_recordplay_frame_packet *audioseekframe;
janus_recordplay_frame_packet *videoseekframe;
janus_recordplay_frame_packet *dataseekpacket;
} janus_recordplay_seek_request;

static void janus_recordplay_session_destroy(janus_recordplay_session *session) {
if(session && g_atomic_int_compare_and_exchange(&session->destroyed, 0, 1))
janus_refcount_decrease(&session->ref);
Expand Down Expand Up @@ -926,6 +937,14 @@ void janus_recordplay_destroy_session(janus_plugin_session *handle, int *error)
*error = -2;
return;
}
if(session->seek_requests) {
janus_recordplay_seek_request *seek_request;
while((seek_request =
g_async_queue_try_pop(session->seek_requests))) {
g_free(seek_request);
}
g_free(session->seek_requests);
}
JANUS_LOG(LOG_VERB, "Removing Record&Play session...\n");
janus_recordplay_hangup_media_internal(handle);
g_hash_table_remove(sessions, handle);
Expand Down Expand Up @@ -1079,6 +1098,7 @@ struct janus_plugin_result *janus_recordplay_handle_message(janus_plugin_session
json_object_set_new(response, "settings", settings);
goto plugin_response;
} else if(!strcasecmp(request_text, "record") || !strcasecmp(request_text, "play")
|| !strcasecmp(request_text, "seek")
|| !strcasecmp(request_text, "start") || !strcasecmp(request_text, "stop")) {
/* These messages are handled asynchronously */
janus_recordplay_message *msg = g_malloc(sizeof(janus_recordplay_message));
Expand Down Expand Up @@ -1456,6 +1476,58 @@ static void janus_recordplay_hangup_media_internal(janus_plugin_session *handle)
g_atomic_int_set(&session->hangingup, 0);
}

static janus_recordplay_seek_request *janus_recordplay_generate_seek_request(janus_recordplay_session *session, int seek_timestamp) {
if(!session)
return NULL;
/* Find correct frame for audio, video, data */
janus_recordplay_frame_packet *audio = session->aframes, *video = session->vframes, *data = session->dframes;
u_int64_t ts = (u_int64_t)seek_timestamp;

int audio_pt = session->recording? session->recording->audio_pt : 0;
int akhz = 48;
if(audio_pt == 0 || audio_pt == 8 || audio_pt == 9)
akhz = 8;
int vkhz = 90;

if(audio) {
u_int64_t audio_start_ts = audio->ts;
while(audio) {
if(audio->next && (((audio->next->ts - audio_start_ts)*1000/akhz) >= ts))
break;
audio = audio->next;
}
}
if(video) {
u_int64_t video_start_ts = video->ts;
while(video) {
if(video->next && (((video->next->ts - video_start_ts)*1000/vkhz) >= ts))
break;
video = video->next;
}
}
if(data) {
u_int64_t data_start_ts = data->ts;
while(data) {
if(data->next && ((data->next->ts - data_start_ts) >= ts))
break;
data = data->next;
}
}

if (!audio && !video) {
JANUS_LOG(LOG_ERR, "Seek location not found, can't seek\n");
return NULL;
}

/* Found seek location */
janus_recordplay_seek_request *seek_request = g_malloc(sizeof(janus_recordplay_frame_packet));
seek_request->audioseekframe = audio;
seek_request->videoseekframe = video;
seek_request->dataseekpacket = data;

return seek_request;
}

/* Thread to handle incoming messages */
static void *janus_recordplay_handler(void *data) {
JANUS_LOG(LOG_VERB, "Joining Record&Play handler thread\n");
Expand Down Expand Up @@ -1866,14 +1938,14 @@ static void *janus_recordplay_handler(void *data) {
JANUS_LOG(LOG_WARN, "Error opening audio recording, trying to go on anyway\n");
warning = "Broken audio file, playing video only";
}
}
} else session->aframes = NULL;
if(rec->vrc_file) {
session->vframes = janus_recordplay_get_frames(recordings_path, rec->vrc_file);
if(session->vframes == NULL) {
JANUS_LOG(LOG_WARN, "Error opening video recording, trying to go on anyway\n");
warning = "Broken video file, playing audio only";
}
}
} else session->vframes = NULL;
if(rec->drc_file) {
session->dframes = janus_recordplay_get_frames(recordings_path, rec->drc_file);
if(session->dframes == NULL) {
Expand All @@ -1885,11 +1957,24 @@ static void *janus_recordplay_handler(void *data) {
error_code = JANUS_RECORDPLAY_ERROR_INVALID_RECORDING;
g_snprintf(error_cause, 512, "Error opening recording files");
goto error;
}
}
session->recording = rec;
session->recorder = FALSE;
rec->viewers = g_list_append(rec->viewers, session);
e2ee = rec->e2ee;

json_t *timestamp = json_object_get(root, "start_at");
if(timestamp) {
/* Got a seek request, validate and extract timestamp to seek */
guint64 ts = json_integer_value(timestamp);

/* Search seek location and send seek request to */
janus_recordplay_seek_request *seek_request = janus_recordplay_generate_seek_request(session, ts);
if (!session->seek_requests)
session->seek_requests = g_async_queue_new();
if(seek_request)
g_async_queue_push(session->seek_requests, seek_request);
}
/* Send this viewer the prepared offer */
sdp = g_strdup(rec->offer);
playdone:
Expand All @@ -1910,6 +1995,44 @@ static void *janus_recordplay_handler(void *data) {
json_object_set_new(info, "data", session->dframes ? json_true() : json_false());
gateway->notify_event(&janus_recordplay_plugin, session->handle, info);
}
} else if(!strcasecmp(request_text, "seek")) {
if(!session->aframes && !session->vframes) {
JANUS_LOG(LOG_ERR, "Not a playout session, can't seek\n");
error_code = JANUS_RECORDPLAY_ERROR_INVALID_STATE;
g_snprintf(error_cause, 512, "Not a playout session, can't seek");
goto error;
}
/* Got a seek request, validate and extract timestamp to seek */
JANUS_VALIDATE_JSON_OBJECT(root, seek_parameters,
error_code, error_cause, TRUE,
JANUS_RECORDPLAY_ERROR_MISSING_ELEMENT, JANUS_RECORDPLAY_ERROR_INVALID_ELEMENT);
if(error_code != 0)
goto error;
json_t *timestamp = json_object_get(root, "timestamp");
guint64 ts = json_integer_value(timestamp);

/* Search seek location and send seek request to */
janus_recordplay_seek_request *seek_request = janus_recordplay_generate_seek_request(session, ts);
if (!session->seek_requests)
session->seek_requests = g_async_queue_new();
if(seek_request)
g_async_queue_push(session->seek_requests, seek_request);
else {
error_code = JANUS_RECORDPLAY_ERROR_INVALID_ELEMENT;
g_snprintf(error_cause, 512, "Seek location not found, can't seek");
goto error;
}
/* Done! */
result = json_object();
json_object_set_new(result, "status", json_string("playing"));
/* Also notify event handlers */
if(notify_events && gateway->events_is_enabled()) {
json_t *info = json_object();
json_object_set_new(info, "event", json_string("playing"));
json_object_set_new(info, "id", json_integer(session->recording->id));
gateway->notify_event(&janus_recordplay_plugin, session->handle, info);
}

} else if(!strcasecmp(request_text, "start")) {
if(!session->aframes && !session->vframes) {
JANUS_LOG(LOG_ERR, "Not a playout session, can't start\n");
Expand Down Expand Up @@ -2527,6 +2650,46 @@ janus_recordplay_frame_packet *janus_recordplay_get_frames(const char *dir, cons
return list;
}

static janus_recordplay_frame_packet *janus_recordplay_find_next_keyframe(janus_videocodec codec, FILE *vfile, janus_recordplay_frame_packet *frame, u_int64_t *keyframe_offset) {
*keyframe_offset = 0;
janus_recordplay_frame_packet *video = frame;
gboolean (*is_keyframe)(const char *buffer, int len);
if(codec == JANUS_VIDEOCODEC_VP8)
is_keyframe = &janus_vp8_is_keyframe;
else if(codec == JANUS_VIDEOCODEC_VP9)
is_keyframe = &janus_vp9_is_keyframe;
else if(codec == JANUS_VIDEOCODEC_H264)
is_keyframe = &janus_h264_is_keyframe;
else if(codec == JANUS_VIDEOCODEC_AV1)
is_keyframe = &janus_av1_is_keyframe;
else if(codec == JANUS_VIDEOCODEC_H265)
is_keyframe = &janus_h265_is_keyframe;
else
/* Don't know how to identify keyframe */
return frame;

char *buffer = g_malloc0(1500);
while(video) {
fseek(vfile, video->offset, SEEK_SET);
int bytes = fread(buffer, sizeof(char), video->len, vfile);
if(bytes != video->len)
JANUS_LOG(LOG_WARN, "Didn't manage to read all the bytes we needed (%d < %d)...\n", bytes, video->len);
janus_plugin_rtp packet = { .video = TRUE, .buffer = (char *)buffer, .length = bytes };
int plen = 0;
char *payload = janus_rtp_payload(packet.buffer, packet.length, &plen);
if((*is_keyframe)(payload, plen))
break;
*keyframe_offset += video->ts - frame->ts;
video = video->next;
}

free(buffer);
if(video)
return video;
else
return frame;
}

static void *janus_recordplay_playout_thread(void *sessiondata) {
janus_recordplay_session *session = (janus_recordplay_session *)sessiondata;
if(!session) {
Expand Down Expand Up @@ -2645,6 +2808,38 @@ static void *janus_recordplay_playout_thread(void *sessiondata) {
asent = FALSE;
vsent = FALSE;
dsent = FALSE;
if(session->seek_requests) {
janus_recordplay_seek_request *seek_request =
g_async_queue_try_pop(session->seek_requests);
if(seek_request) {
u_int64_t key_frame_offset = 0;
if(seek_request->videoseekframe) {
seek_request->videoseekframe = janus_recordplay_find_next_keyframe(rec->vcodec, vfile, seek_request->videoseekframe, &key_frame_offset);
session->context.v_seq_reset = TRUE;
session->context.v_base_ts_prev += (video->ts - session->context.v_base_ts);
session->context.v_base_ts = seek_request->videoseekframe->ts;
video = seek_request->videoseekframe;
}
if(seek_request->audioseekframe) {
u_int64_t audioseek_ts = seek_request->audioseekframe->ts;
while(seek_request->audioseekframe
&& (key_frame_offset > (seek_request->audioseekframe->ts - audioseek_ts)))
seek_request->audioseekframe = seek_request->audioseekframe->next;
session->context.a_seq_reset = TRUE;
session->context.a_base_ts_prev += (audio->ts - session->context.a_base_ts);
session->context.a_base_ts = seek_request->audioseekframe->ts;
audio = seek_request->audioseekframe;
}
if(seek_request->dataseekpacket) {
u_int64_t dataseek_ts = seek_request->dataseekpacket->ts;
while(seek_request->dataseekpacket
&& (key_frame_offset > (seek_request->dataseekpacket->ts - dataseek_ts)))
seek_request->dataseekpacket = seek_request->dataseekpacket->next;
data = seek_request->dataseekpacket;
}
g_free(seek_request);
}
}
if(audio) {
if(audio == session->aframes) {
/* First packet, send now */
Expand All @@ -2655,6 +2850,7 @@ static void *janus_recordplay_playout_thread(void *sessiondata) {
/* Update payload type */
janus_rtp_header *rtp = (janus_rtp_header *)buffer;
rtp->type = audio_pt;
janus_rtp_header_update(rtp, &session->context, FALSE, 0);
janus_plugin_rtp prtp = { .video = FALSE, .buffer = (char *)buffer, .length = bytes };
janus_plugin_rtp_extensions_reset(&prtp.extensions);
gateway->relay_rtp(session->handle, &prtp);
Expand Down Expand Up @@ -2697,6 +2893,7 @@ static void *janus_recordplay_playout_thread(void *sessiondata) {
/* Update payload type */
janus_rtp_header *rtp = (janus_rtp_header *)buffer;
rtp->type = audio_pt;
janus_rtp_header_update(rtp, &session->context, FALSE, 0);
janus_plugin_rtp prtp = { .video = FALSE, .buffer = (char *)buffer, .length = bytes };
janus_plugin_rtp_extensions_reset(&prtp.extensions);
gateway->relay_rtp(session->handle, &prtp);
Expand All @@ -2717,6 +2914,7 @@ static void *janus_recordplay_playout_thread(void *sessiondata) {
/* Update payload type */
janus_rtp_header *rtp = (janus_rtp_header *)buffer;
rtp->type = video_pt;
janus_rtp_header_update(rtp, &session->context, TRUE, 0);
janus_plugin_rtp prtp = { .video = TRUE, .buffer = (char *)buffer, .length = bytes };
janus_plugin_rtp_extensions_reset(&prtp.extensions);
gateway->relay_rtp(session->handle, &prtp);
Expand Down Expand Up @@ -2763,6 +2961,7 @@ static void *janus_recordplay_playout_thread(void *sessiondata) {
/* Update payload type */
janus_rtp_header *rtp = (janus_rtp_header *)buffer;
rtp->type = video_pt;
janus_rtp_header_update(rtp, &session->context, TRUE, 0);
janus_plugin_rtp prtp = { .video = TRUE, .buffer = (char *)buffer, .length = bytes };
janus_plugin_rtp_extensions_reset(&prtp.extensions);
gateway->relay_rtp(session->handle, &prtp);
Expand Down