Skip to content

Commit

Permalink
customizable behavior for unknown rtcp ssrcs
Browse files Browse the repository at this point in the history
  • Loading branch information
scottlamb committed Nov 28, 2023
1 parent 4bf9ea0 commit f04bb24
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 22 deletions.
12 changes: 10 additions & 2 deletions benches/depacketize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,16 @@ fn h264_aac<F: FnMut(CodecItem)>(mut f: F) {
Timeline::new(Some(0), 90_000, None).unwrap(),
];
let mut rtps = [
InorderParser::new(None, Some(1)),
InorderParser::new(None, Some(1)),
InorderParser::new(
None,
Some(1),
retina::client::UnknownRtcpSsrcPolicy::Default,
),
InorderParser::new(
None,
Some(1),
retina::client::UnknownRtcpSsrcPolicy::Default,
),
];
let mut depacketizers = [
Depacketizer::new("audio", "mpeg4-generic", 12_000, NonZeroU16::new(2), Some("profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1490")).unwrap(),
Expand Down
7 changes: 6 additions & 1 deletion examples/client/src/mp4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ pub struct Opts {
#[arg(default_value_t, long)]
initial_timestamp: retina::client::InitialTimestampPolicy,

/// Policy for handling unknown ssrcs in RTCP packets.
#[arg(default_value_t, long)]
unknown_rtcp_ssrc: retina::client::UnknownRtcpSsrcPolicy,

/// Don't attempt to include video streams.
#[arg(long)]
no_video: bool,
Expand Down Expand Up @@ -672,7 +676,8 @@ async fn write_mp4(
.play(
retina::client::PlayOptions::default()
.initial_timestamp(opts.initial_timestamp)
.enforce_timestamps_with_max_jump_secs(NonZeroU32::new(10).unwrap()),
.enforce_timestamps_with_max_jump_secs(NonZeroU32::new(10).unwrap())
.unknown_rtcp_ssrc(opts.unknown_rtcp_ssrc),
)
.await?
.demuxed()?;
Expand Down
62 changes: 60 additions & 2 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,52 @@ impl std::str::FromStr for InitialSequenceNumberPolicy {
}
}

/// Policy for handling unknown `ssrc` value in RTCP messages.
#[derive(Copy, Clone, Debug, Default)]
#[non_exhaustive]
pub enum UnknownRtcpSsrcPolicy {
/// Default policy: currently same as `AbortSession`.
#[default]
Default,

/// Abort the session on encountering an unknown `ssrc`.
AbortSession,

/// Drop RTCP packets with an unknown `ssrc`.
DropPackets,

/// Process the packets as if they had the expected `ssrc`.
ProcessPackets,
}

impl std::fmt::Display for UnknownRtcpSsrcPolicy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.pad(match self {
UnknownRtcpSsrcPolicy::Default => "default",
UnknownRtcpSsrcPolicy::AbortSession => "abort-session",
UnknownRtcpSsrcPolicy::DropPackets => "drop-packets",
UnknownRtcpSsrcPolicy::ProcessPackets => "process-packets",
})
}
}

impl std::str::FromStr for UnknownRtcpSsrcPolicy {
type Err = Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"default" => UnknownRtcpSsrcPolicy::Default,
"abort-session" => UnknownRtcpSsrcPolicy::AbortSession,
"drop-packets" => UnknownRtcpSsrcPolicy::DropPackets,
"process-packets" => UnknownRtcpSsrcPolicy::ProcessPackets,
_ => bail!(ErrorInt::InvalidArgument(format!(
"bad UnknownRtcpSsrcPolicy {s}; \
expected default, abort-session, drop-packets, or process-packets"
))),
})
}
}

/// Returns an appropriate keepalive interval for `session`.
///
/// This generally uses half the session timeout. However, it's capped in case
Expand Down Expand Up @@ -696,6 +742,7 @@ pub struct PlayOptions {
initial_timestamp: InitialTimestampPolicy,
initial_seq: InitialSequenceNumberPolicy,
enforce_timestamps_with_max_jump_secs: Option<NonZeroU32>,
unknown_rtcp_ssrc: UnknownRtcpSsrcPolicy,
}

impl PlayOptions {
Expand All @@ -714,6 +761,13 @@ impl PlayOptions {
}
}

pub fn unknown_rtcp_ssrc(self, unknown_rtcp_ssrc: UnknownRtcpSsrcPolicy) -> Self {
Self {
unknown_rtcp_ssrc,
..self
}
}

/// If the `RTP-Time` specifies `seq=0` or `seq=1`, ignore it.
///
/// `ignore_zero_seq(true)` is an outdated spelling of
Expand Down Expand Up @@ -1845,7 +1899,11 @@ impl Session<Described> {
description,
})
})?,
rtp_handler: rtp::InorderParser::new(ssrc, initial_seq),
rtp_handler: rtp::InorderParser::new(
ssrc,
initial_seq,
policy.unknown_rtcp_ssrc,
),
ctx,
udp_sockets,
};
Expand Down Expand Up @@ -1875,7 +1933,7 @@ fn note_stale_live555_data(
let known_to_have_live555_tcp_bug = tool.map(Tool::has_live555_tcp_bug).unwrap_or(false);
if !known_to_have_live555_tcp_bug {
log::warn!(
"saw unexpected RTSP packet at. This is presumed to be due to a bug in old
"saw unexpected RTSP packet. This is presumed to be due to a bug in old
live555 servers' TCP handling, though tool attribute {tool:?} does not refer to a \
known-buggy version. Consider switching to UDP.\n\n\
conn: {conn_ctx:?}\n\
Expand Down
63 changes: 46 additions & 17 deletions src/client/rtp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! RTP and RTCP handling; see [RFC 3550](https://datatracker.ietf.org/doc/html/rfc3550).

use bytes::Bytes;
use log::debug;
use log::{debug, warn};

use crate::client::PacketItem;
use crate::rtcp::ReceivedCompoundPacket;
Expand All @@ -14,7 +14,7 @@ use crate::{
StreamContextInner,
};

use super::{SessionOptions, Timeline};
use super::{SessionOptions, Timeline, UnknownRtcpSsrcPolicy};

/// Describes how Retina formed its initial expectation for the stream's `ssrc` or `seq`.
#[derive(Copy, Clone, Debug)]
Expand Down Expand Up @@ -74,6 +74,9 @@ pub struct InorderParser {

/// Total RTCP packets seen in this stream.
seen_rtcp_packets: u64,

unknown_rtcp_session: UnknownRtcpSsrcPolicy,
seen_unknown_rtcp_session: bool,
}

fn note_stale_live555_data_if_tcp(
Expand All @@ -99,7 +102,11 @@ fn note_stale_live555_data_if_tcp(
}

impl InorderParser {
pub fn new(ssrc: Option<u32>, next_seq: Option<u16>) -> Self {
pub fn new(
ssrc: Option<u32>,
next_seq: Option<u16>,
unknown_rtcp_session: UnknownRtcpSsrcPolicy,
) -> Self {
Self {
ssrc: ssrc.map(|ssrc| Ssrc {
init: InitialExpectation::PlayResponseHeader,
Expand All @@ -111,6 +118,8 @@ impl InorderParser {
}),
seen_rtp_packets: 0,
seen_rtcp_packets: 0,
unknown_rtcp_session,
seen_unknown_rtcp_session: false,
}
}

Expand Down Expand Up @@ -254,18 +263,38 @@ impl InorderParser {

let ssrc = sr.ssrc();
if matches!(self.ssrc, Some(s) if s.ssrc != ssrc) {
note_stale_live555_data_if_tcp(
tool,
session_options,
conn_ctx,
stream_ctx,
pkt_ctx,
);
return Err(format!(
"Expected ssrc={:08x?}, got RTCP SR ssrc={:08x}",
self.ssrc, ssrc
));
} else if self.ssrc.is_none() {
match self.unknown_rtcp_session {
UnknownRtcpSsrcPolicy::Default | UnknownRtcpSsrcPolicy::AbortSession => {
note_stale_live555_data_if_tcp(
tool,
session_options,
conn_ctx,
stream_ctx,
pkt_ctx,
);
return Err(format!(
"Expected ssrc={:08x?}, got RTCP SR ssrc={:08x}",
self.ssrc, ssrc
));
}
UnknownRtcpSsrcPolicy::DropPackets => {
if !self.seen_unknown_rtcp_session {
warn!(
"saw unknown rtcp ssrc {ssrc}; rtp session has ssrc {s:?}",
s = self.ssrc
);
self.seen_unknown_rtcp_session = true;
}
return Ok(None);
}
UnknownRtcpSsrcPolicy::ProcessPackets => {}
}
} else if self.ssrc.is_none()
&& !matches!(
self.unknown_rtcp_session,
UnknownRtcpSsrcPolicy::ProcessPackets
)
{
self.ssrc = Some(Ssrc {
init: InitialExpectation::RtcpPacket,
ssrc,
Expand Down Expand Up @@ -297,7 +326,7 @@ mod tests {
#[test]
fn geovision_pt50_packet() {
let mut timeline = Timeline::new(None, 90_000, None).unwrap();
let mut parser = InorderParser::new(Some(0xd25614e), None);
let mut parser = InorderParser::new(Some(0xd25614e), None, UnknownRtcpSsrcPolicy::Default);
let stream_ctx = StreamContext::dummy();

// Normal packet.
Expand Down Expand Up @@ -352,7 +381,7 @@ mod tests {
#[test]
fn out_of_order() {
let mut timeline = Timeline::new(None, 90_000, None).unwrap();
let mut parser = InorderParser::new(Some(0xd25614e), None);
let mut parser = InorderParser::new(Some(0xd25614e), None, UnknownRtcpSsrcPolicy::Default);
let stream_ctx = StreamContext(StreamContextInner::Udp(UdpStreamContext {
local_ip: IpAddr::V4(Ipv4Addr::UNSPECIFIED),
peer_ip: IpAddr::V4(Ipv4Addr::UNSPECIFIED),
Expand Down

0 comments on commit f04bb24

Please sign in to comment.