WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,10 @@ impl HttpBody for Body {
ping.record_non_data();
Poll::Ready(Ok(t))
}
Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))),
Err(e) => match e.reason() {
Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => Poll::Ready(Ok(None)),
reason => Poll::Ready(Err(crate::Error::new_h2(e))),
},
},
Kind::Chan {
ref mut trailers_rx,
Expand Down
114 changes: 111 additions & 3 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3213,9 +3213,10 @@ mod conn {
}

#[tokio::test]
async fn http2_responds_before_consuming_request_body() {
async fn http2_responds_before_consuming_request_body_no_trailers() {
// Test that a early-response from server works correctly (request body wasn't fully consumed).
// https://github.com/hyperium/hyper/issues/2872
use hyper::body::HttpBody;
use hyper::service::service_fn;

let _ = pretty_env_logger::try_init();
Expand Down Expand Up @@ -3260,11 +3261,118 @@ mod conn {
let resp = client.send_request(req).await.expect("send_request");
assert!(resp.status().is_success());

let body = concat(resp.into_body())
let mut body = resp.into_body();

let data = body
.data()
.await
.expect("response has body")
.expect("get response body with no error");
assert_eq!(data.as_ref(), b"No bread for you!");

let trailers = body
.trailers()
.await
.expect("get response trailers with no error");
assert!(trailers.is_none());
}

#[tokio::test]
async fn http2_responds_before_consuming_request_body_with_trailers() {
// Test that a early-response from server works correctly (request body wasn't fully consumed).
// https://github.com/hyperium/hyper/issues/2872
use hyper::body::{HttpBody, SizeHint};
use hyper::header::{HeaderMap, HeaderValue};
use hyper::service::service_fn;

let _ = pretty_env_logger::try_init();

/// An `HttpBody` implementation whose `is_end_stream()` will
/// return `true` after sending trailers.
pub struct TrailersBody(Option<HeaderMap>);

impl HttpBody for TrailersBody {
type Data = bytes::Bytes;
type Error = hyper::Error;

fn poll_data(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
Poll::Ready(None)
}

fn poll_trailers(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
Poll::Ready(Ok(self.0.take()))
}

fn is_end_stream(&self) -> bool {
self.0.is_none()
}

fn size_hint(&self) -> SizeHint {
SizeHint::with_exact(0)
}
}

let listener = TkTcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.unwrap();
let addr = listener.local_addr().unwrap();

// Spawn an HTTP2 server that responds before reading the whole request body.
// It's normal case to decline the request due to headers or size of the body.
tokio::spawn(async move {
let sock = listener.accept().await.unwrap().0;
hyper::server::conn::Http::new()
.http2_only(true)
.serve_connection(
sock,
service_fn(|_req| async move {
let mut trailers = HeaderMap::new();
trailers.insert("grpc", HeaderValue::from_static("0"));
let body = TrailersBody(Some(trailers));
Ok::<_, hyper::Error>(http::Response::new(body))
}),
)
.await
.expect("serve_connection");
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let (mut client, conn) = conn::Builder::new()
.http2_only(true)
.handshake::<_, Body>(io)
.await
.expect("http handshake");

tokio::spawn(async move {
conn.await.expect("client conn shouldn't error");
});

// Use a channel to keep request stream open
let (tx, body) = hyper::Body::channel();
let req = Request::post("/a").body(body).unwrap();
let resp = client.send_request(req).await.expect("send_request");
assert!(resp.status().is_success());

let mut body = resp.into_body();

let data = body.data().await;
assert!(data.is_none());

let trailers = body
.trailers()
.await
.expect("get response trailers with no error")
.expect("trailers HeaderMaps is present");

assert_eq!(body.as_ref(), b"No bread for you!");
assert_eq!(trailers.len(), 1);
assert_eq!(trailers.get("grpc").unwrap(), "0");
drop(tx);
}

#[tokio::test]
Expand Down