Skip to content

Commit

Permalink
Allow building an HTTP/1 connection with prebuffered bytes
Browse files Browse the repository at this point in the history
Fixes #3704.
  • Loading branch information
nox committed Jul 16, 2024
1 parent 5a13041 commit df9edf4
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/client/conn/http1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ impl Builder {
trace!("client handshake HTTP/1");

let (tx, rx) = dispatch::channel();
let mut conn = proto::Conn::new(io);
let mut conn = proto::Conn::new_buffered(Bytes::new(), io);
conn.set_h1_parser_config(opts.h1_parser_config);
if let Some(writev) = opts.h1_writev {
if writev {
Expand Down
4 changes: 2 additions & 2 deletions src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ where
B: Buf,
T: Http1Transaction,
{
pub(crate) fn new(io: I) -> Conn<I, B, T> {
pub(crate) fn new_buffered(buffered: Bytes, io: I) -> Conn<I, B, T> {
Conn {
io: Buffered::new(io),
io: Buffered::new_buffered(buffered, io),
state: State {
allow_half_close: false,
cached_headers: None,
Expand Down
15 changes: 12 additions & 3 deletions src/proto/h1/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,10 @@ mod tests {
// Block at 0 for now, but we will release this response before
// the request is ready to write later...
let (mut tx, rx) = crate::client::dispatch::channel();
let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(Compat::new(io));
let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new_buffered(
Default::default(),
Compat::new(io),
);
let mut dispatcher = Dispatcher::new(Client::new(rx), conn);

// First poll is needed to allow tx to send...
Expand Down Expand Up @@ -756,7 +759,10 @@ mod tests {
.build_with_handle();

let (mut tx, rx) = crate::client::dispatch::channel();
let mut conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(Compat::new(io));
let mut conn = Conn::<_, bytes::Bytes, ClientTransaction>::new_buffered(
Default::default(),
Compat::new(io),
);
conn.set_write_strategy_queue();

let dispatcher = Dispatcher::new(Client::new(rx), conn);
Expand Down Expand Up @@ -787,7 +793,10 @@ mod tests {
.build();

let (mut tx, rx) = crate::client::dispatch::channel();
let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(Compat::new(io));
let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new_buffered(
Default::default(),
Compat::new(io),
);
let mut dispatcher = tokio_test::task::spawn(Dispatcher::new(Client::new(rx), conn));

// First poll is needed to allow tx to send...
Expand Down
18 changes: 12 additions & 6 deletions src/proto/h1/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ where
T: Read + Write + Unpin,
B: Buf,
{
pub(crate) fn new(io: T) -> Buffered<T, B> {
pub(crate) fn new_buffered(buffered: Bytes, io: T) -> Buffered<T, B> {
let strategy = if io.is_write_vectored() {
WriteStrategy::Queue
} else {
Expand All @@ -66,7 +66,9 @@ where
flush_pipeline: false,
io,
read_blocked: false,
read_buf: BytesMut::with_capacity(0),
// FIXME: This should use `From<Bytes>` which has not been released yet.
// https://github.com/tokio-rs/bytes/commit/fa1daac3ae1dcb07dffe3a41a041dffd6edf177b
read_buf: BytesMut::from(&*buffered),
read_buf_strategy: ReadStrategy::default(),
write_buf,
}
Expand Down Expand Up @@ -684,7 +686,8 @@ mod tests {
.wait(Duration::from_secs(1))
.build();

let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(Compat::new(mock));
let mut buffered =
Buffered::<_, Cursor<Vec<u8>>>::new_buffered(Bytes::new(), Compat::new(mock));

// We expect a `parse` to be not ready, and so can't await it directly.
// Rather, this `poll_fn` will wrap the `Poll` result.
Expand Down Expand Up @@ -826,7 +829,8 @@ mod tests {
#[cfg(debug_assertions)] // needs to trigger a debug_assert
fn write_buf_requires_non_empty_bufs() {
let mock = Mock::new().build();
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(Compat::new(mock));
let mut buffered =
Buffered::<_, Cursor<Vec<u8>>>::new_buffered(Bytes::new(), Compat::new(mock));

buffered.buffer(Cursor::new(Vec::new()));
}
Expand Down Expand Up @@ -861,7 +865,8 @@ mod tests {

let mock = Mock::new().write(b"hello world, it's hyper!").build();

let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(Compat::new(mock));
let mut buffered =
Buffered::<_, Cursor<Vec<u8>>>::new_buffered(Bytes::new(), Compat::new(mock));
buffered.write_buf.set_strategy(WriteStrategy::Flatten);

buffered.headers_buf().extend(b"hello ");
Expand Down Expand Up @@ -920,7 +925,8 @@ mod tests {
.write(b"hyper!")
.build();

let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(Compat::new(mock));
let mut buffered =
Buffered::<_, Cursor<Vec<u8>>>::new_buffered(Bytes::new(), Compat::new(mock));
buffered.write_buf.set_strategy(WriteStrategy::Queue);

// we have 4 buffers, and vec IO disabled, but explicitly said
Expand Down
22 changes: 21 additions & 1 deletion src/server/conn/http1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,27 @@ impl Builder {
<S::ResBody as Body>::Error: Into<Box<dyn StdError + Send + Sync>>,
I: Read + Write + Unpin,
{
let mut conn = proto::Conn::new(io);
self.serve_buffered_connection(Bytes::new(), io, service)
}

/// Bind a connection together with a [`Service`](crate::service::Service)
/// with a prebuffered chunk of bytes.
///
/// See [`Self::serve_connection`] for more info.
pub fn serve_buffered_connection<I, S>(
&self,
buffered: Bytes,
io: I,
service: S,
) -> Connection<I, S>
where
S: HttpService<IncomingBody>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
S::ResBody: 'static,
<S::ResBody as Body>::Error: Into<Box<dyn StdError + Send + Sync>>,
I: Read + Write + Unpin,
{
let mut conn = proto::Conn::new_buffered(buffered, io);
conn.set_timer(self.timer.clone());
if !self.h1_keep_alive {
conn.disable_keep_alive();
Expand Down

0 comments on commit df9edf4

Please sign in to comment.