Skip to content

Commit

Permalink
Add optioin to bind device or address
Browse files Browse the repository at this point in the history
  • Loading branch information
yujincheng08 committed Feb 14, 2024
1 parent acd8fc4 commit a2abdf6
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 11 deletions.
18 changes: 15 additions & 3 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::io;
use std::mem::MaybeUninit;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::num::NonZeroU32;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::task::Poll;
use std::{fmt::Debug, num::NonZeroU16, pin::Pin};
Expand All @@ -25,6 +26,7 @@ use url::Url;

use crate::client::parse::SessionHeader;
use crate::codec::CodecItem;
use crate::tokio::Bind;
use crate::{
Error, ErrorInt, RtspMessageContext, StreamContext, StreamContextInner, TcpStreamContext,
UdpStreamContext,
Expand Down Expand Up @@ -505,6 +507,7 @@ pub struct SessionOptions {
teardown: TeardownPolicy,
unassigned_channel_data: UnassignedChannelDataPolicy,
session_id: SessionIdPolicy,
bind: Option<String>,
}

/// Policy for handling data received on unassigned RTSP interleaved channels.
Expand Down Expand Up @@ -713,6 +716,11 @@ impl SessionOptions {
self.session_id = policy;
self
}

pub fn bind(mut self, bind: String) -> Self {
self.bind = Some(bind);
self
}
}

/// Per-stream options decided for `SETUP` time, for future expansion.
Expand Down Expand Up @@ -1198,11 +1206,15 @@ enum SessionFlag {
}

impl RtspConnection {
async fn connect(url: &Url) -> Result<Self, Error> {
async fn connect(url: &Url, bind: Option<&str>) -> Result<Self, Error> {
let host =
RtspConnection::validate_url(url).map_err(|e| wrap!(ErrorInt::InvalidArgument(e)))?;
let port = url.port().unwrap_or(554);
let inner = crate::tokio::Connection::connect(host, port)
let bind = match bind {
None => None,
Some(bind) => Some(Bind::from_str(bind)?),
};
let inner = crate::tokio::Connection::connect(host, port, bind)
.await
.map_err(|e| wrap!(ErrorInt::ConnectError(e)))?;
Ok(Self {
Expand Down Expand Up @@ -1487,7 +1499,7 @@ impl Session<Described> {
///
/// Expects to be called from a tokio runtime.
pub async fn describe(url: Url, options: SessionOptions) -> Result<Self, Error> {
let conn = RtspConnection::connect(&url).await?;
let conn = RtspConnection::connect(&url, options.bind.as_deref()).await?;
Self::describe_with_conn(conn, options, url).await
}

Expand Down
2 changes: 1 addition & 1 deletion src/client/teardown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ pub(super) async fn teardown_loop_forever(
.as_mut()
.reset(tokio::time::Instant::now() + timeout);
let attempt = async {
let conn = RtspConnection::connect(&url).await?;
let conn = RtspConnection::connect(&url, options.bind.as_deref()).await?;
attempt(&mut req, tool.as_ref(), options, &mut requested_auth, conn).await
};
tokio::select! {
Expand Down
94 changes: 87 additions & 7 deletions src/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures::{Sink, SinkExt, Stream, StreamExt};
use rtsp_types::{Data, Message};
use std::convert::TryFrom;
use std::net::IpAddr;
use std::str::FromStr;
use std::time::Instant;
use tokio::net::{TcpStream, UdpSocket};
use std::{convert::TryFrom, net::SocketAddr};
use tokio::net::{lookup_host, TcpSocket, TcpStream, UdpSocket};
use tokio_util::codec::Framed;
use url::Host;

Expand All @@ -21,13 +23,91 @@ use super::{ConnectionContext, ReceivedMessage, WallTime};
/// A RTSP connection which implements `Stream`, `Sink`, and `Unpin`.
pub(crate) struct Connection(Framed<TcpStream, Codec>);

pub(crate) enum Bind {
Device(String),
Ip(IpAddr),
}

impl Bind {
fn may_bind_socket(socket: &TcpSocket, bind: &Option<Bind>) -> Result<(), std::io::Error> {
match bind {
Some(Bind::Ip(addr)) => socket.bind(SocketAddr::new(*addr, 0))?,
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
Some(Bind::Device(ref dev)) => socket.bind_device(Some(dev.as_bytes()))?,
_ => {}
}
Ok(())
}
}

impl FromStr for Bind {
type Err = Error;

fn from_str(bind: &str) -> Result<Self, Self::Err> {
if let Ok(addr) = IpAddr::from_str(bind) {
Ok(Bind::Ip(addr))
} else {
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
{
Ok(Bind::Device(bind.to_string()))
}
#[cfg(not(any(target_os = "android", target_os = "fuchsia", target_os = "linux")))]
{
Err(wrap!(ErrorInt::InvalidArgument(format!(
"Invalid bind address: {}",
bind
))))
}
}
}
}

impl Connection {
pub(crate) async fn connect(host: Host<&str>, port: u16) -> Result<Self, std::io::Error> {
pub(crate) async fn connect(
host: Host<&str>,
port: u16,
bind: Option<Bind>,
) -> Result<Self, std::io::Error> {
let stream = match host {
Host::Domain(h) => TcpStream::connect((h, port)).await,
Host::Ipv4(h) => TcpStream::connect((h, port)).await,
Host::Ipv6(h) => TcpStream::connect((h, port)).await,
}?;
Host::Domain(h) => {
let mut last_err = None;
let mut stream = None;
for h in lookup_host((h, port)).await? {
let socket = match h {
SocketAddr::V4(_) => TcpSocket::new_v4()?,
SocketAddr::V6(_) => TcpSocket::new_v6()?,
};
Bind::may_bind_socket(&socket, &bind)?;
match socket.connect(h).await {
Ok(s) => {
stream = Some(s);
break;
}
Err(e) => last_err = Some(e),
}
}
if let Some(stream) = stream {
stream
} else {
return Err(last_err.unwrap_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::AddrNotAvailable,
"No addresses found",
)
}));
}
}
Host::Ipv4(h) => {
let socket = TcpSocket::new_v4()?;
Bind::may_bind_socket(&socket, &bind)?;
socket.connect(SocketAddr::new(h.into(), port)).await?
}
Host::Ipv6(h) => {
let socket = TcpSocket::new_v6()?;
Bind::may_bind_socket(&socket, &bind)?;
socket.connect(SocketAddr::new(h.into(), port)).await?
}
};
Self::from_stream(stream)
}

Expand Down

0 comments on commit a2abdf6

Please sign in to comment.