diff --git a/src/body/body.rs b/src/body/body.rs index 7df87404f6..866fc95657 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -389,7 +389,10 @@ impl HttpBody for Body { ping.record_non_data(); Poll::Ready(Ok(t)) } - Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))), + Err(e) => match e.reason() { + Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => Poll::Ready(Ok(None)), + reason => Poll::Ready(Err(crate::Error::new_h2(e))), + }, }, Kind::Chan { ref mut trailers_rx, diff --git a/tests/client.rs b/tests/client.rs index a5fc79da8c..9761ed1bee 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -3213,9 +3213,10 @@ mod conn { } #[tokio::test] - async fn http2_responds_before_consuming_request_body() { + async fn http2_responds_before_consuming_request_body_no_trailers() { // Test that a early-response from server works correctly (request body wasn't fully consumed). // https://github.com/hyperium/hyper/issues/2872 + use hyper::body::HttpBody; use hyper::service::service_fn; let _ = pretty_env_logger::try_init(); @@ -3260,11 +3261,118 @@ mod conn { let resp = client.send_request(req).await.expect("send_request"); assert!(resp.status().is_success()); - let body = concat(resp.into_body()) + let mut body = resp.into_body(); + + let data = body + .data() .await + .expect("response has body") .expect("get response body with no error"); + assert_eq!(data.as_ref(), b"No bread for you!"); + + let trailers = body + .trailers() + .await + .expect("get response trailers with no error"); + assert!(trailers.is_none()); + } + + #[tokio::test] + async fn http2_responds_before_consuming_request_body_with_trailers() { + // Test that a early-response from server works correctly (request body wasn't fully consumed). + // https://github.com/hyperium/hyper/issues/2872 + use hyper::body::{HttpBody, SizeHint}; + use hyper::header::{HeaderMap, HeaderValue}; + use hyper::service::service_fn; + + let _ = pretty_env_logger::try_init(); + + /// An `HttpBody` implementation whose `is_end_stream()` will + /// return `true` after sending trailers. + pub struct TrailersBody(Option); + + impl HttpBody for TrailersBody { + type Data = bytes::Bytes; + type Error = hyper::Error; + + fn poll_data( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll>> { + Poll::Ready(None) + } + + fn poll_trailers( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + Poll::Ready(Ok(self.0.take())) + } + + fn is_end_stream(&self) -> bool { + self.0.is_none() + } + + fn size_hint(&self) -> SizeHint { + SizeHint::with_exact(0) + } + } + + let listener = TkTcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))) + .await + .unwrap(); + let addr = listener.local_addr().unwrap(); + + // Spawn an HTTP2 server that responds before reading the whole request body. + // It's normal case to decline the request due to headers or size of the body. + tokio::spawn(async move { + let sock = listener.accept().await.unwrap().0; + hyper::server::conn::Http::new() + .http2_only(true) + .serve_connection( + sock, + service_fn(|_req| async move { + let mut trailers = HeaderMap::new(); + trailers.insert("grpc", HeaderValue::from_static("0")); + let body = TrailersBody(Some(trailers)); + Ok::<_, hyper::Error>(http::Response::new(body)) + }), + ) + .await + .expect("serve_connection"); + }); + + let io = tcp_connect(&addr).await.expect("tcp connect"); + let (mut client, conn) = conn::Builder::new() + .http2_only(true) + .handshake::<_, Body>(io) + .await + .expect("http handshake"); + + tokio::spawn(async move { + conn.await.expect("client conn shouldn't error"); + }); + + // Use a channel to keep request stream open + let (tx, body) = hyper::Body::channel(); + let req = Request::post("/a").body(body).unwrap(); + let resp = client.send_request(req).await.expect("send_request"); + assert!(resp.status().is_success()); + + let mut body = resp.into_body(); + + let data = body.data().await; + assert!(data.is_none()); + + let trailers = body + .trailers() + .await + .expect("get response trailers with no error") + .expect("trailers HeaderMaps is present"); - assert_eq!(body.as_ref(), b"No bread for you!"); + assert_eq!(trailers.len(), 1); + assert_eq!(trailers.get("grpc").unwrap(), "0"); + drop(tx); } #[tokio::test]