From c7877ac4213b2f859831366f5a35b353e0dc9f66 Mon Sep 17 00:00:00 2001 From: Damien Neil Date: Mon, 18 Mar 2024 13:24:51 -0700 Subject: [PATCH] http2: convert the remaining clientTester tests to testClientConn Change-Id: Ia7f213346baff48504fef6dfdc112575a5459f35 Reviewed-on: https://go-review.googlesource.com/c/net/+/572380 Reviewed-by: Jonathan Amsterdam LUCI-TryBot-Result: Go LUCI --- http2/clientconn_test.go | 74 ++ http2/transport_test.go | 1651 +++++++++++--------------------------- 2 files changed, 535 insertions(+), 1190 deletions(-) diff --git a/http2/clientconn_test.go b/http2/clientconn_test.go index 73ceefd7b..4237b1436 100644 --- a/http2/clientconn_test.go +++ b/http2/clientconn_test.go @@ -14,6 +14,7 @@ import ( "net" "net/http" "reflect" + "slices" "testing" "time" @@ -209,6 +210,71 @@ func (tc *testClientConn) wantFrameType(want FrameType) { } } +// wantUnorderedFrames reads frames from the conn until every condition in want has been satisfied. +// +// want is a list of func(*SomeFrame) bool. +// wantUnorderedFrames will call each func with frames of the appropriate type +// until the func returns true. +// It calls t.Fatal if an unexpected frame is received (no func has that frame type, +// or all funcs with that type have returned true), or if the conn runs out of frames +// with unsatisfied funcs. +// +// Example: +// +// // Read a SETTINGS frame, and any number of DATA frames for a stream. +// // The SETTINGS frame may appear anywhere in the sequence. +// // The last DATA frame must indicate the end of the stream. +// tc.wantUnorderedFrames( +// func(f *SettingsFrame) bool { +// return true +// }, +// func(f *DataFrame) bool { +// return f.StreamEnded() +// }, +// ) +func (tc *testClientConn) wantUnorderedFrames(want ...any) { + tc.t.Helper() + want = slices.Clone(want) + seen := 0 +frame: + for seen < len(want) && !tc.t.Failed() { + fr := tc.readFrame() + if fr == nil { + break + } + for i, f := range want { + if f == nil { + continue + } + typ := reflect.TypeOf(f) + if typ.Kind() != reflect.Func || + typ.NumIn() != 1 || + typ.NumOut() != 1 || + typ.Out(0) != reflect.TypeOf(true) { + tc.t.Fatalf("expected func(*SomeFrame) bool, got %T", f) + } + if typ.In(0) == reflect.TypeOf(fr) { + out := reflect.ValueOf(f).Call([]reflect.Value{reflect.ValueOf(fr)}) + if out[0].Bool() { + want[i] = nil + seen++ + } + continue frame + } + } + tc.t.Errorf("got unexpected frame type %T", fr) + } + if seen < len(want) { + for _, f := range want { + if f == nil { + continue + } + tc.t.Errorf("did not see expected frame: %v", reflect.TypeOf(f).In(0)) + } + tc.t.Fatalf("did not see %v expected frame types", len(want)-seen) + } +} + type wantHeader struct { streamID uint32 endStream bool @@ -401,6 +467,14 @@ func (tc *testClientConn) writeData(streamID uint32, endStream bool, data []byte tc.sync() } +func (tc *testClientConn) writeDataPadded(streamID uint32, endStream bool, data, pad []byte) { + tc.t.Helper() + if err := tc.fr.WriteDataPadded(streamID, endStream, data, pad); err != nil { + tc.t.Fatal(err) + } + tc.sync() +} + // makeHeaderBlockFragment encodes headers in a form suitable for inclusion // in a HEADERS or CONTINUATION frame. // diff --git a/http2/transport_test.go b/http2/transport_test.go index 18d4db3ed..855c107ef 100644 --- a/http2/transport_test.go +++ b/http2/transport_test.go @@ -2724,122 +2724,75 @@ func testTransportUsesGoAwayDebugError(t *testing.T, failMidBody bool) { } func testTransportReturnsUnusedFlowControl(t *testing.T, oneDataFrame bool) { - ct := newClientTester(t) - - ct.client = func() error { - req, _ := http.NewRequest("GET", "https://dummy.tld/", nil) - res, err := ct.tr.RoundTrip(req) - if err != nil { - return err - } + tc := newTestClientConn(t) + tc.greet() - if n, err := res.Body.Read(make([]byte, 1)); err != nil || n != 1 { - return fmt.Errorf("body read = %v, %v; want 1, nil", n, err) - } - res.Body.Close() // leaving 4999 bytes unread + req, _ := http.NewRequest("GET", "https://dummy.tld/", nil) + rt := tc.roundTrip(req) - return nil + tc.wantFrameType(FrameHeaders) + tc.writeHeaders(HeadersFrameParam{ + StreamID: rt.streamID(), + EndHeaders: true, + EndStream: false, + BlockFragment: tc.makeHeaderBlockFragment( + ":status", "200", + "content-length", "5000", + ), + }) + initialInflow := tc.inflowWindow(0) + + // Two cases: + // - Send one DATA frame with 5000 bytes. + // - Send two DATA frames with 1 and 4999 bytes each. + // + // In both cases, the client should consume one byte of data, + // refund that byte, then refund the following 4999 bytes. + // + // In the second case, the server waits for the client to reset the + // stream before sending the second DATA frame. This tests the case + // where the client receives a DATA frame after it has reset the stream. + const streamNotEnded = false + if oneDataFrame { + tc.writeData(rt.streamID(), streamNotEnded, make([]byte, 5000)) + } else { + tc.writeData(rt.streamID(), streamNotEnded, make([]byte, 1)) } - ct.server = func() error { - ct.greet() - - var hf *HeadersFrame - for { - f, err := ct.fr.ReadFrame() - if err != nil { - return fmt.Errorf("ReadFrame while waiting for Headers: %v", err) - } - switch f.(type) { - case *WindowUpdateFrame, *SettingsFrame: - continue - } - var ok bool - hf, ok = f.(*HeadersFrame) - if !ok { - return fmt.Errorf("Got %T; want HeadersFrame", f) - } - break - } - var buf bytes.Buffer - enc := hpack.NewEncoder(&buf) - enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) - enc.WriteField(hpack.HeaderField{Name: "content-length", Value: "5000"}) - ct.fr.WriteHeaders(HeadersFrameParam{ - StreamID: hf.StreamID, - EndHeaders: true, - EndStream: false, - BlockFragment: buf.Bytes(), - }) - initialInflow := ct.inflowWindow(0) - - // Two cases: - // - Send one DATA frame with 5000 bytes. - // - Send two DATA frames with 1 and 4999 bytes each. - // - // In both cases, the client should consume one byte of data, - // refund that byte, then refund the following 4999 bytes. - // - // In the second case, the server waits for the client to reset the - // stream before sending the second DATA frame. This tests the case - // where the client receives a DATA frame after it has reset the stream. - if oneDataFrame { - ct.fr.WriteData(hf.StreamID, false /* don't end stream */, make([]byte, 5000)) - } else { - ct.fr.WriteData(hf.StreamID, false /* don't end stream */, make([]byte, 1)) - } + res := rt.response() + if n, err := res.Body.Read(make([]byte, 1)); err != nil || n != 1 { + t.Fatalf("body read = %v, %v; want 1, nil", n, err) + } + res.Body.Close() // leaving 4999 bytes unread + tc.sync() - wantRST := true - wantWUF := true - if !oneDataFrame { - wantWUF = false // flow control update is small, and will not be sent - } - for wantRST || wantWUF { - f, err := ct.readNonSettingsFrame() - if err != nil { - return err + sentAdditionalData := false + tc.wantUnorderedFrames( + func(f *RSTStreamFrame) bool { + if f.ErrCode != ErrCodeCancel { + t.Fatalf("Expected a RSTStreamFrame with code cancel; got %v", summarizeFrame(f)) } - switch f := f.(type) { - case *RSTStreamFrame: - if !wantRST { - return fmt.Errorf("Unexpected frame: %v", summarizeFrame(f)) - } - if f.ErrCode != ErrCodeCancel { - return fmt.Errorf("Expected a RSTStreamFrame with code cancel; got %v", summarizeFrame(f)) - } - wantRST = false - case *WindowUpdateFrame: - if !wantWUF { - return fmt.Errorf("Unexpected frame: %v", summarizeFrame(f)) - } - if f.Increment != 5000 { - return fmt.Errorf("Expected WindowUpdateFrames for 5000 bytes; got %v", summarizeFrame(f)) - } - wantWUF = false - default: - return fmt.Errorf("Unexpected frame: %v", summarizeFrame(f)) + if !oneDataFrame { + // Send the remaining data now. + tc.writeData(rt.streamID(), streamNotEnded, make([]byte, 4999)) + sentAdditionalData = true } - } - if !oneDataFrame { - ct.fr.WriteData(hf.StreamID, false /* don't end stream */, make([]byte, 4999)) - f, err := ct.readNonSettingsFrame() - if err != nil { - return err + return true + }, + func(f *WindowUpdateFrame) bool { + if !oneDataFrame && !sentAdditionalData { + t.Fatalf("Got WindowUpdateFrame, don't expect one yet") } - wuf, ok := f.(*WindowUpdateFrame) - if !ok || wuf.Increment != 5000 { - return fmt.Errorf("want WindowUpdateFrame for 5000 bytes; got %v", summarizeFrame(f)) + if f.Increment != 5000 { + t.Fatalf("Expected WindowUpdateFrames for 5000 bytes; got %v", summarizeFrame(f)) } - } - if err := ct.writeReadPing(); err != nil { - return err - } - if got, want := ct.inflowWindow(0), initialInflow; got != want { - return fmt.Errorf("connection flow tokens = %v, want %v", got, want) - } - return nil + return true + }, + ) + + if got, want := tc.inflowWindow(0), initialInflow; got != want { + t.Fatalf("connection flow tokens = %v, want %v", got, want) } - ct.run() } // See golang.org/issue/16481 @@ -2855,199 +2808,124 @@ func TestTransportReturnsUnusedFlowControlMultipleWrites(t *testing.T) { // Issue 16612: adjust flow control on open streams when transport // receives SETTINGS with INITIAL_WINDOW_SIZE from server. func TestTransportAdjustsFlowControl(t *testing.T) { - ct := newClientTester(t) - clientDone := make(chan struct{}) - const bodySize = 1 << 20 - ct.client = func() error { - defer ct.cc.(*net.TCPConn).CloseWrite() - if runtime.GOOS == "plan9" { - // CloseWrite not supported on Plan 9; Issue 17906 - defer ct.cc.(*net.TCPConn).Close() - } - defer close(clientDone) + tc := newTestClientConn(t) + tc.wantFrameType(FrameSettings) + tc.wantFrameType(FrameWindowUpdate) + // Don't write our SETTINGS yet. - req, _ := http.NewRequest("POST", "https://dummy.tld/", struct{ io.Reader }{io.LimitReader(neverEnding('A'), bodySize)}) - res, err := ct.tr.RoundTrip(req) - if err != nil { - return err + body := tc.newRequestBody() + body.writeBytes(bodySize) + body.closeWithError(io.EOF) + + req, _ := http.NewRequest("POST", "https://dummy.tld/", body) + rt := tc.roundTrip(req) + + tc.wantFrameType(FrameHeaders) + + gotBytes := int64(0) + for { + f := testClientConnReadFrame[*DataFrame](tc) + gotBytes += int64(len(f.Data())) + // After we've got half the client's initial flow control window's worth + // of request body data, give it just enough flow control to finish. + if gotBytes >= initialWindowSize/2 { + break } - res.Body.Close() - return nil } - ct.server = func() error { - _, err := io.ReadFull(ct.sc, make([]byte, len(ClientPreface))) - if err != nil { - return fmt.Errorf("reading client preface: %v", err) - } - var gotBytes int64 - var sentSettings bool - for { - f, err := ct.fr.ReadFrame() - if err != nil { - select { - case <-clientDone: - return nil - default: - return fmt.Errorf("ReadFrame while waiting for Headers: %v", err) - } - } - switch f := f.(type) { - case *DataFrame: - gotBytes += int64(len(f.Data())) - // After we've got half the client's - // initial flow control window's worth - // of request body data, give it just - // enough flow control to finish. - if gotBytes >= initialWindowSize/2 && !sentSettings { - sentSettings = true - - ct.fr.WriteSettings(Setting{ID: SettingInitialWindowSize, Val: bodySize}) - ct.fr.WriteWindowUpdate(0, bodySize) - ct.fr.WriteSettingsAck() - } + tc.writeSettings(Setting{ID: SettingInitialWindowSize, Val: bodySize}) + tc.writeWindowUpdate(0, bodySize) + tc.writeSettingsAck() - if f.StreamEnded() { - var buf bytes.Buffer - enc := hpack.NewEncoder(&buf) - enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) - ct.fr.WriteHeaders(HeadersFrameParam{ - StreamID: f.StreamID, - EndHeaders: true, - EndStream: true, - BlockFragment: buf.Bytes(), - }) - } - } - } + tc.wantUnorderedFrames( + func(f *SettingsFrame) bool { return true }, + func(f *DataFrame) bool { + gotBytes += int64(len(f.Data())) + return f.StreamEnded() + }, + ) + + if gotBytes != bodySize { + t.Fatalf("server received %v bytes of body, want %v", gotBytes, bodySize) } - ct.run() + + tc.writeHeaders(HeadersFrameParam{ + StreamID: rt.streamID(), + EndHeaders: true, + EndStream: true, + BlockFragment: tc.makeHeaderBlockFragment( + ":status", "200", + ), + }) + rt.wantStatus(200) } // See golang.org/issue/16556 func TestTransportReturnsDataPaddingFlowControl(t *testing.T) { - ct := newClientTester(t) + tc := newTestClientConn(t) + tc.greet() - unblockClient := make(chan bool, 1) + req, _ := http.NewRequest("GET", "https://dummy.tld/", nil) + rt := tc.roundTrip(req) - ct.client = func() error { - req, _ := http.NewRequest("GET", "https://dummy.tld/", nil) - res, err := ct.tr.RoundTrip(req) - if err != nil { - return err - } - defer res.Body.Close() - <-unblockClient - return nil - } - ct.server = func() error { - ct.greet() + tc.wantFrameType(FrameHeaders) + tc.writeHeaders(HeadersFrameParam{ + StreamID: rt.streamID(), + EndHeaders: true, + EndStream: false, + BlockFragment: tc.makeHeaderBlockFragment( + ":status", "200", + "content-length", "5000", + ), + }) - var hf *HeadersFrame - for { - f, err := ct.fr.ReadFrame() - if err != nil { - return fmt.Errorf("ReadFrame while waiting for Headers: %v", err) - } - switch f.(type) { - case *WindowUpdateFrame, *SettingsFrame: - continue - } - var ok bool - hf, ok = f.(*HeadersFrame) - if !ok { - return fmt.Errorf("Got %T; want HeadersFrame", f) - } - break - } + initialConnWindow := tc.inflowWindow(0) + initialStreamWindow := tc.inflowWindow(rt.streamID()) - initialConnWindow := ct.inflowWindow(0) + pad := make([]byte, 5) + tc.writeDataPadded(rt.streamID(), false, make([]byte, 5000), pad) - var buf bytes.Buffer - enc := hpack.NewEncoder(&buf) - enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) - enc.WriteField(hpack.HeaderField{Name: "content-length", Value: "5000"}) - ct.fr.WriteHeaders(HeadersFrameParam{ - StreamID: hf.StreamID, - EndHeaders: true, - EndStream: false, - BlockFragment: buf.Bytes(), - }) - initialStreamWindow := ct.inflowWindow(hf.StreamID) - pad := make([]byte, 5) - ct.fr.WriteDataPadded(hf.StreamID, false, make([]byte, 5000), pad) // without ending stream - if err := ct.writeReadPing(); err != nil { - return err - } - // Padding flow control should have been returned. - if got, want := ct.inflowWindow(0), initialConnWindow-5000; got != want { - t.Errorf("conn inflow window = %v, want %v", got, want) - } - if got, want := ct.inflowWindow(hf.StreamID), initialStreamWindow-5000; got != want { - t.Errorf("stream inflow window = %v, want %v", got, want) - } - unblockClient <- true - return nil + // Padding flow control should have been returned. + if got, want := tc.inflowWindow(0), initialConnWindow-5000; got != want { + t.Errorf("conn inflow window = %v, want %v", got, want) + } + if got, want := tc.inflowWindow(rt.streamID()), initialStreamWindow-5000; got != want { + t.Errorf("stream inflow window = %v, want %v", got, want) } - ct.run() } // golang.org/issue/16572 -- RoundTrip shouldn't hang when it gets a // StreamError as a result of the response HEADERS func TestTransportReturnsErrorOnBadResponseHeaders(t *testing.T) { - ct := newClientTester(t) + tc := newTestClientConn(t) + tc.greet() - ct.client = func() error { - req, _ := http.NewRequest("GET", "https://dummy.tld/", nil) - res, err := ct.tr.RoundTrip(req) - if err == nil { - res.Body.Close() - return errors.New("unexpected successful GET") - } - want := StreamError{1, ErrCodeProtocol, headerFieldNameError(" content-type")} - if !reflect.DeepEqual(want, err) { - t.Errorf("RoundTrip error = %#v; want %#v", err, want) - } - return nil - } - ct.server = func() error { - ct.greet() + req, _ := http.NewRequest("GET", "https://dummy.tld/", nil) + rt := tc.roundTrip(req) - hf, err := ct.firstHeaders() - if err != nil { - return err - } - - var buf bytes.Buffer - enc := hpack.NewEncoder(&buf) - enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) - enc.WriteField(hpack.HeaderField{Name: " content-type", Value: "bogus"}) // bogus spaces - ct.fr.WriteHeaders(HeadersFrameParam{ - StreamID: hf.StreamID, - EndHeaders: true, - EndStream: false, - BlockFragment: buf.Bytes(), - }) + tc.wantFrameType(FrameHeaders) + tc.writeHeaders(HeadersFrameParam{ + StreamID: rt.streamID(), + EndHeaders: true, + EndStream: false, + BlockFragment: tc.makeHeaderBlockFragment( + ":status", "200", + " content-type", "bogus", + ), + }) - for { - fr, err := ct.readFrame() - if err != nil { - return fmt.Errorf("error waiting for RST_STREAM from client: %v", err) - } - if _, ok := fr.(*SettingsFrame); ok { - continue - } - if rst, ok := fr.(*RSTStreamFrame); !ok || rst.StreamID != 1 || rst.ErrCode != ErrCodeProtocol { - t.Errorf("Frame = %v; want RST_STREAM for stream 1 with ErrCodeProtocol", summarizeFrame(fr)) - } - break - } + err := rt.err() + want := StreamError{1, ErrCodeProtocol, headerFieldNameError(" content-type")} + if !reflect.DeepEqual(err, want) { + t.Fatalf("RoundTrip error = %#v; want %#v", err, want) + } - return nil + fr := testClientConnReadFrame[*RSTStreamFrame](tc) + if fr.StreamID != 1 || fr.ErrCode != ErrCodeProtocol { + t.Errorf("Frame = %v; want RST_STREAM for stream 1 with ErrCodeProtocol", summarizeFrame(fr)) } - ct.run() } // byteAndEOFReader returns is in an io.Reader which reads one byte @@ -3461,261 +3339,84 @@ func TestTransportPingWhenReadingPingDisabled(t *testing.T) { } } -func testTransportPingWhenReading(t *testing.T, readIdleTimeout, deadline time.Duration, expectedPingCount int) { - var pingCount int - ct := newClientTester(t) - ct.tr.ReadIdleTimeout = readIdleTimeout - - ctx, cancel := context.WithTimeout(context.Background(), deadline) - defer cancel() - ct.client = func() error { - defer ct.cc.(*net.TCPConn).CloseWrite() - if runtime.GOOS == "plan9" { - // CloseWrite not supported on Plan 9; Issue 17906 - defer ct.cc.(*net.TCPConn).Close() - } - req, _ := http.NewRequestWithContext(ctx, "GET", "https://dummy.tld/", nil) - res, err := ct.tr.RoundTrip(req) - if err != nil { - return fmt.Errorf("RoundTrip: %v", err) - } - defer res.Body.Close() - if res.StatusCode != 200 { - return fmt.Errorf("status code = %v; want %v", res.StatusCode, 200) - } - _, err = ioutil.ReadAll(res.Body) - if expectedPingCount == 0 && errors.Is(ctx.Err(), context.DeadlineExceeded) { - return nil - } +func TestTransportRetryAfterGOAWAY(t *testing.T) { + tt := newTestTransport(t) - cancel() - return err - } + req, _ := http.NewRequest("GET", "https://dummy.tld/", nil) + rt := tt.roundTrip(req) - ct.server = func() error { - ct.greet() - var buf bytes.Buffer - enc := hpack.NewEncoder(&buf) - var streamID uint32 - for { - f, err := ct.fr.ReadFrame() - if err != nil { - select { - case <-ctx.Done(): - // If the client's done, it - // will have reported any - // errors on its side. - return nil - default: - return err - } - } - switch f := f.(type) { - case *WindowUpdateFrame, *SettingsFrame: - case *HeadersFrame: - if !f.HeadersEnded() { - return fmt.Errorf("headers should have END_HEADERS be ended: %v", f) - } - enc.WriteField(hpack.HeaderField{Name: ":status", Value: strconv.Itoa(200)}) - ct.fr.WriteHeaders(HeadersFrameParam{ - StreamID: f.StreamID, - EndHeaders: true, - EndStream: false, - BlockFragment: buf.Bytes(), - }) - streamID = f.StreamID - case *PingFrame: - pingCount++ - if pingCount == expectedPingCount { - if err := ct.fr.WriteData(streamID, true, []byte("hello, this is last server data frame")); err != nil { - return err - } - } - if err := ct.fr.WritePing(true, f.Data); err != nil { - return err - } - case *RSTStreamFrame: - default: - return fmt.Errorf("Unexpected client frame %v", f) - } - } + // First attempt: Server sends a GOAWAY. + tc := tt.getConn() + tc.wantFrameType(FrameSettings) + tc.wantFrameType(FrameWindowUpdate) + tc.wantHeaders(wantHeader{ + streamID: 1, + endStream: true, + }) + tc.writeSettings() + tc.writeGoAway(0 /*max id*/, ErrCodeNo, nil) + if rt.done() { + t.Fatalf("after GOAWAY, RoundTrip is done; want it to be retrying") } - ct.run() -} - -func testClientMultipleDials(t *testing.T, client func(*Transport), server func(int, *clientTester)) { - ln := newLocalListener(t) - defer ln.Close() - var ( - mu sync.Mutex - count int - conns []net.Conn - ) - var wg sync.WaitGroup - tr := &Transport{ - TLSClientConfig: tlsConfigInsecure, - } - tr.DialTLS = func(network, addr string, cfg *tls.Config) (net.Conn, error) { - mu.Lock() - defer mu.Unlock() - count++ - cc, err := net.Dial("tcp", ln.Addr().String()) - if err != nil { - return nil, fmt.Errorf("dial error: %v", err) - } - conns = append(conns, cc) - sc, err := ln.Accept() - if err != nil { - return nil, fmt.Errorf("accept error: %v", err) - } - conns = append(conns, sc) - ct := &clientTester{ - t: t, - tr: tr, - cc: cc, - sc: sc, - fr: NewFramer(sc, sc), - } - wg.Add(1) - go func(count int) { - defer wg.Done() - server(count, ct) - }(count) - return cc, nil - } + // Second attempt succeeds on a new connection. + tc = tt.getConn() + tc.wantFrameType(FrameSettings) + tc.wantFrameType(FrameWindowUpdate) + tc.wantHeaders(wantHeader{ + streamID: 1, + endStream: true, + }) + tc.writeSettings() + tc.writeHeaders(HeadersFrameParam{ + StreamID: 1, + EndHeaders: true, + EndStream: true, + BlockFragment: tc.makeHeaderBlockFragment( + ":status", "200", + ), + }) - client(tr) - tr.CloseIdleConnections() - ln.Close() - for _, c := range conns { - c.Close() - } - wg.Wait() + rt.wantStatus(200) } -func TestTransportRetryAfterGOAWAY(t *testing.T) { - client := func(tr *Transport) { - req, _ := http.NewRequest("GET", "https://dummy.tld/", nil) - res, err := tr.RoundTrip(req) - if res != nil { - res.Body.Close() - if got := res.Header.Get("Foo"); got != "bar" { - err = fmt.Errorf("foo header = %q; want bar", got) - } - } - if err != nil { - t.Errorf("RoundTrip: %v", err) - } - } - - server := func(count int, ct *clientTester) { - switch count { - case 1: - ct.greet() - hf, err := ct.firstHeaders() - if err != nil { - t.Errorf("server1 failed reading HEADERS: %v", err) - return - } - t.Logf("server1 got %v", hf) - if err := ct.fr.WriteGoAway(0 /*max id*/, ErrCodeNo, nil); err != nil { - t.Errorf("server1 failed writing GOAWAY: %v", err) - return - } - case 2: - ct.greet() - hf, err := ct.firstHeaders() - if err != nil { - t.Errorf("server2 failed reading HEADERS: %v", err) - return - } - t.Logf("server2 got %v", hf) - - var buf bytes.Buffer - enc := hpack.NewEncoder(&buf) - enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) - enc.WriteField(hpack.HeaderField{Name: "foo", Value: "bar"}) - err = ct.fr.WriteHeaders(HeadersFrameParam{ - StreamID: hf.StreamID, - EndHeaders: true, - EndStream: false, - BlockFragment: buf.Bytes(), - }) - if err != nil { - t.Errorf("server2 failed writing response HEADERS: %v", err) - } - default: - t.Errorf("unexpected number of dials") - return - } - } +func TestTransportRetryAfterRefusedStream(t *testing.T) { + tt := newTestTransport(t) - testClientMultipleDials(t, client, server) -} + req, _ := http.NewRequest("GET", "https://dummy.tld/", nil) + rt := tt.roundTrip(req) -func TestTransportRetryAfterRefusedStream(t *testing.T) { - clientDone := make(chan struct{}) - client := func(tr *Transport) { - defer close(clientDone) - req, _ := http.NewRequest("GET", "https://dummy.tld/", nil) - resp, err := tr.RoundTrip(req) - if err != nil { - t.Errorf("RoundTrip: %v", err) - return - } - resp.Body.Close() - if resp.StatusCode != 204 { - t.Errorf("Status = %v; want 204", resp.StatusCode) - return - } + // First attempt: Server sends a RST_STREAM. + tc := tt.getConn() + tc.wantFrameType(FrameSettings) + tc.wantFrameType(FrameWindowUpdate) + tc.wantHeaders(wantHeader{ + streamID: 1, + endStream: true, + }) + tc.writeSettings() + tc.wantFrameType(FrameSettings) // settings ACK + tc.writeRSTStream(1, ErrCodeRefusedStream) + if rt.done() { + t.Fatalf("after RST_STREAM, RoundTrip is done; want it to be retrying") } - server := func(_ int, ct *clientTester) { - ct.greet() - var buf bytes.Buffer - enc := hpack.NewEncoder(&buf) - var count int - for { - f, err := ct.fr.ReadFrame() - if err != nil { - select { - case <-clientDone: - // If the client's done, it - // will have reported any - // errors on its side. - default: - t.Error(err) - } - return - } - switch f := f.(type) { - case *WindowUpdateFrame, *SettingsFrame: - case *HeadersFrame: - if !f.HeadersEnded() { - t.Errorf("headers should have END_HEADERS be ended: %v", f) - return - } - count++ - if count == 1 { - ct.fr.WriteRSTStream(f.StreamID, ErrCodeRefusedStream) - } else { - enc.WriteField(hpack.HeaderField{Name: ":status", Value: "204"}) - ct.fr.WriteHeaders(HeadersFrameParam{ - StreamID: f.StreamID, - EndHeaders: true, - EndStream: true, - BlockFragment: buf.Bytes(), - }) - } - default: - t.Errorf("Unexpected client frame %v", f) - return - } - } - } + // Second attempt succeeds on the same connection. + tc.wantHeaders(wantHeader{ + streamID: 3, + endStream: true, + }) + tc.writeSettings() + tc.writeHeaders(HeadersFrameParam{ + StreamID: 3, + EndHeaders: true, + EndStream: true, + BlockFragment: tc.makeHeaderBlockFragment( + ":status", "204", + ), + }) - testClientMultipleDials(t, client, server) + rt.wantStatus(204) } func TestTransportRetryHasLimit(t *testing.T) { @@ -3765,67 +3466,34 @@ func TestTransportRetryHasLimit(t *testing.T) { } func TestTransportResponseDataBeforeHeaders(t *testing.T) { - // This test use not valid response format. - // Discarding logger output to not spam tests output. - log.SetOutput(ioutil.Discard) - defer log.SetOutput(os.Stderr) + // Discard log output complaining about protocol error. + log.SetOutput(io.Discard) + t.Cleanup(func() { log.SetOutput(os.Stderr) }) // after other cleanup is done - ct := newClientTester(t) - ct.client = func() error { - defer ct.cc.(*net.TCPConn).CloseWrite() - if runtime.GOOS == "plan9" { - // CloseWrite not supported on Plan 9; Issue 17906 - defer ct.cc.(*net.TCPConn).Close() - } - req := httptest.NewRequest("GET", "https://dummy.tld/", nil) - // First request is normal to ensure the check is per stream and not per connection. - _, err := ct.tr.RoundTrip(req) - if err != nil { - return fmt.Errorf("RoundTrip expected no error, got: %v", err) - } - // Second request returns a DATA frame with no HEADERS. - resp, err := ct.tr.RoundTrip(req) - if err == nil { - return fmt.Errorf("RoundTrip expected error, got response: %+v", resp) - } - if err, ok := err.(StreamError); !ok || err.Code != ErrCodeProtocol { - return fmt.Errorf("expected stream PROTOCOL_ERROR, got: %v", err) - } - return nil - } - ct.server = func() error { - ct.greet() - for { - f, err := ct.fr.ReadFrame() - if err == io.EOF { - return nil - } else if err != nil { - return err - } - switch f := f.(type) { - case *WindowUpdateFrame, *SettingsFrame, *RSTStreamFrame: - case *HeadersFrame: - switch f.StreamID { - case 1: - // Send a valid response to first request. - var buf bytes.Buffer - enc := hpack.NewEncoder(&buf) - enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) - ct.fr.WriteHeaders(HeadersFrameParam{ - StreamID: f.StreamID, - EndHeaders: true, - EndStream: true, - BlockFragment: buf.Bytes(), - }) - case 3: - ct.fr.WriteData(f.StreamID, true, []byte("payload")) - } - default: - return fmt.Errorf("Unexpected client frame %v", f) - } - } + tc := newTestClientConn(t) + tc.greet() + + // First request is normal to ensure the check is per stream and not per connection. + req, _ := http.NewRequest("GET", "https://dummy.tld/", nil) + rt1 := tc.roundTrip(req) + tc.wantFrameType(FrameHeaders) + tc.writeHeaders(HeadersFrameParam{ + StreamID: rt1.streamID(), + EndHeaders: true, + EndStream: true, + BlockFragment: tc.makeHeaderBlockFragment( + ":status", "200", + ), + }) + rt1.wantStatus(200) + + // Second request returns a DATA frame with no HEADERS. + rt2 := tc.roundTrip(req) + tc.wantFrameType(FrameHeaders) + tc.writeData(rt2.streamID(), true, []byte("payload")) + if err, ok := rt2.err().(StreamError); !ok || err.Code != ErrCodeProtocol { + t.Fatalf("expected stream PROTOCOL_ERROR, got: %v", err) } - ct.run() } func TestTransportMaxFrameReadSize(t *testing.T) { @@ -3839,30 +3507,17 @@ func TestTransportMaxFrameReadSize(t *testing.T) { maxReadFrameSize: 1024, want: minMaxFrameSize, }} { - ct := newClientTester(t) - ct.tr.MaxReadFrameSize = test.maxReadFrameSize - ct.client = func() error { - req, _ := http.NewRequest("GET", "https://dummy.tld/", http.NoBody) - ct.tr.RoundTrip(req) - return nil - } - ct.server = func() error { - defer ct.cc.(*net.TCPConn).Close() - ct.greet() - var got uint32 - ct.settings.ForeachSetting(func(s Setting) error { - switch s.ID { - case SettingMaxFrameSize: - got = s.Val - } - return nil - }) - if got != test.want { - t.Errorf("Transport.MaxReadFrameSize = %v; server got %v, want %v", test.maxReadFrameSize, got, test.want) - } - return nil + tc := newTestClientConn(t, func(tr *Transport) { + tr.MaxReadFrameSize = test.maxReadFrameSize + }) + + fr := testClientConnReadFrame[*SettingsFrame](tc) + got, ok := fr.Value(SettingMaxFrameSize) + if !ok { + t.Errorf("Transport.MaxReadFrameSize = %v; server got no setting, want %v", test.maxReadFrameSize, test.want) + } else if got != test.want { + t.Errorf("Transport.MaxReadFrameSize = %v; server got %v, want %v", test.maxReadFrameSize, got, test.want) } - ct.run() } } @@ -3902,337 +3557,126 @@ func TestTransportRequestsLowServerLimit(t *testing.T) { t.Errorf("StatusCode = %v; want %v", got, want) } if res != nil && res.Body != nil { - res.Body.Close() - } - } - - if connCount != 1 { - t.Errorf("created %v connections for %v requests, want 1", connCount, reqCount) - } -} - -// tests Transport.StrictMaxConcurrentStreams -func TestTransportRequestsStallAtServerLimit(t *testing.T) { - const maxConcurrent = 2 - - greet := make(chan struct{}) // server sends initial SETTINGS frame - gotRequest := make(chan struct{}) // server received a request - clientDone := make(chan struct{}) - cancelClientRequest := make(chan struct{}) - - // Collect errors from goroutines. - var wg sync.WaitGroup - errs := make(chan error, 100) - defer func() { - wg.Wait() - close(errs) - for err := range errs { - t.Error(err) - } - }() - - // We will send maxConcurrent+2 requests. This checker goroutine waits for the - // following stages: - // 1. The first maxConcurrent requests are received by the server. - // 2. The client will cancel the next request - // 3. The server is unblocked so it can service the first maxConcurrent requests - // 4. The client will send the final request - wg.Add(1) - unblockClient := make(chan struct{}) - clientRequestCancelled := make(chan struct{}) - unblockServer := make(chan struct{}) - go func() { - defer wg.Done() - // Stage 1. - for k := 0; k < maxConcurrent; k++ { - <-gotRequest - } - // Stage 2. - close(unblockClient) - <-clientRequestCancelled - // Stage 3: give some time for the final RoundTrip call to be scheduled and - // verify that the final request is not sent. - time.Sleep(50 * time.Millisecond) - select { - case <-gotRequest: - errs <- errors.New("last request did not stall") - close(unblockServer) - return - default: - } - close(unblockServer) - // Stage 4. - <-gotRequest - }() - - ct := newClientTester(t) - ct.tr.StrictMaxConcurrentStreams = true - ct.client = func() error { - var wg sync.WaitGroup - defer func() { - wg.Wait() - close(clientDone) - ct.cc.(*net.TCPConn).CloseWrite() - if runtime.GOOS == "plan9" { - // CloseWrite not supported on Plan 9; Issue 17906 - ct.cc.(*net.TCPConn).Close() - } - }() - for k := 0; k < maxConcurrent+2; k++ { - wg.Add(1) - go func(k int) { - defer wg.Done() - // Don't send the second request until after receiving SETTINGS from the server - // to avoid a race where we use the default SettingMaxConcurrentStreams, which - // is much larger than maxConcurrent. We have to send the first request before - // waiting because the first request triggers the dial and greet. - if k > 0 { - <-greet - } - // Block until maxConcurrent requests are sent before sending any more. - if k >= maxConcurrent { - <-unblockClient - } - body := newStaticCloseChecker("") - req, _ := http.NewRequest("GET", fmt.Sprintf("https://dummy.tld/%d", k), body) - if k == maxConcurrent { - // This request will be canceled. - req.Cancel = cancelClientRequest - close(cancelClientRequest) - _, err := ct.tr.RoundTrip(req) - close(clientRequestCancelled) - if err == nil { - errs <- fmt.Errorf("RoundTrip(%d) should have failed due to cancel", k) - return - } - } else { - resp, err := ct.tr.RoundTrip(req) - if err != nil { - errs <- fmt.Errorf("RoundTrip(%d): %v", k, err) - return - } - ioutil.ReadAll(resp.Body) - resp.Body.Close() - if resp.StatusCode != 204 { - errs <- fmt.Errorf("Status = %v; want 204", resp.StatusCode) - return - } - } - if err := body.isClosed(); err != nil { - errs <- fmt.Errorf("RoundTrip(%d): %v", k, err) - } - }(k) + res.Body.Close() } - return nil } - ct.server = func() error { - var wg sync.WaitGroup - defer wg.Wait() + if connCount != 1 { + t.Errorf("created %v connections for %v requests, want 1", connCount, reqCount) + } +} - ct.greet(Setting{SettingMaxConcurrentStreams, maxConcurrent}) +// tests Transport.StrictMaxConcurrentStreams +func TestTransportRequestsStallAtServerLimit(t *testing.T) { + const maxConcurrent = 2 - // Server write loop. - var buf bytes.Buffer - enc := hpack.NewEncoder(&buf) - writeResp := make(chan uint32, maxConcurrent+1) + tc := newTestClientConn(t, func(tr *Transport) { + tr.StrictMaxConcurrentStreams = true + }) + tc.greet(Setting{SettingMaxConcurrentStreams, maxConcurrent}) - wg.Add(1) - go func() { - defer wg.Done() - <-unblockServer - for id := range writeResp { - buf.Reset() - enc.WriteField(hpack.HeaderField{Name: ":status", Value: "204"}) - ct.fr.WriteHeaders(HeadersFrameParam{ - StreamID: id, - EndHeaders: true, - EndStream: true, - BlockFragment: buf.Bytes(), - }) - } - }() + cancelClientRequest := make(chan struct{}) - // Server read loop. - var nreq int - for { - f, err := ct.fr.ReadFrame() - if err != nil { - select { - case <-clientDone: - // If the client's done, it will have reported any errors on its side. - return nil - default: - return err - } - } - switch f := f.(type) { - case *WindowUpdateFrame: - case *SettingsFrame: - // Wait for the client SETTINGS ack until ending the greet. - close(greet) - case *HeadersFrame: - if !f.HeadersEnded() { - return fmt.Errorf("headers should have END_HEADERS be ended: %v", f) - } - gotRequest <- struct{}{} - nreq++ - writeResp <- f.StreamID - if nreq == maxConcurrent+1 { - close(writeResp) - } - case *DataFrame: - default: - return fmt.Errorf("Unexpected client frame %v", f) + // Start maxConcurrent+2 requests. + // The server does not respond to any of them yet. + var rts []*testRoundTrip + for k := 0; k < maxConcurrent+2; k++ { + req, _ := http.NewRequest("GET", fmt.Sprintf("https://dummy.tld/%d", k), nil) + if k == maxConcurrent { + req.Cancel = cancelClientRequest + } + rt := tc.roundTrip(req) + rts = append(rts, rt) + + if k < maxConcurrent { + // We are under the stream limit, so the client sends the request. + tc.wantHeaders(wantHeader{ + streamID: rt.streamID(), + endStream: true, + header: http.Header{ + ":authority": []string{"dummy.tld"}, + ":method": []string{"GET"}, + ":path": []string{fmt.Sprintf("/%d", k)}, + }, + }) + } else { + // We have reached the stream limit, + // so the client cannot send the request. + if fr := tc.readFrame(); fr != nil { + t.Fatalf("after making new request while at stream limit, got unexpected frame: %v", fr) } } + + if rt.done() { + t.Fatalf("rt %v done", k) + } + } + + // Cancel the maxConcurrent'th request. + // The request should fail. + close(cancelClientRequest) + tc.sync() + if err := rts[maxConcurrent].err(); err == nil { + t.Fatalf("RoundTrip(%d) should have failed due to cancel, did not", maxConcurrent) + } + + // No requests should be complete, except for the canceled one. + for i, rt := range rts { + if i != maxConcurrent && rt.done() { + t.Fatalf("RoundTrip(%d) is done, but should not be", i) + } } - ct.run() + // Server responds to a request, unblocking the last one. + tc.writeHeaders(HeadersFrameParam{ + StreamID: rts[0].streamID(), + EndHeaders: true, + EndStream: true, + BlockFragment: tc.makeHeaderBlockFragment( + ":status", "200", + ), + }) + tc.wantHeaders(wantHeader{ + streamID: rts[maxConcurrent+1].streamID(), + endStream: true, + header: http.Header{ + ":authority": []string{"dummy.tld"}, + ":method": []string{"GET"}, + ":path": []string{fmt.Sprintf("/%d", maxConcurrent+1)}, + }, + }) + rts[0].wantStatus(200) } func TestTransportMaxDecoderHeaderTableSize(t *testing.T) { - ct := newClientTester(t) var reqSize, resSize uint32 = 8192, 16384 - ct.tr.MaxDecoderHeaderTableSize = reqSize - ct.client = func() error { - req, _ := http.NewRequest("GET", "https://dummy.tld/", nil) - cc, err := ct.tr.NewClientConn(ct.cc) - if err != nil { - return err - } - _, err = cc.RoundTrip(req) - if err != nil { - return err - } - if got, want := cc.peerMaxHeaderTableSize, resSize; got != want { - return fmt.Errorf("peerHeaderTableSize = %d, want %d", got, want) - } - return nil + tc := newTestClientConn(t, func(tr *Transport) { + tr.MaxDecoderHeaderTableSize = reqSize + }) + + fr := testClientConnReadFrame[*SettingsFrame](tc) + if v, ok := fr.Value(SettingHeaderTableSize); !ok { + t.Fatalf("missing SETTINGS_HEADER_TABLE_SIZE setting") + } else if v != reqSize { + t.Fatalf("received SETTINGS_HEADER_TABLE_SIZE = %d, want %d", v, reqSize) } - ct.server = func() error { - buf := make([]byte, len(ClientPreface)) - _, err := io.ReadFull(ct.sc, buf) - if err != nil { - return fmt.Errorf("reading client preface: %v", err) - } - f, err := ct.fr.ReadFrame() - if err != nil { - return err - } - sf, ok := f.(*SettingsFrame) - if !ok { - ct.t.Fatalf("wanted client settings frame; got %v", f) - _ = sf // stash it away? - } - var found bool - err = sf.ForeachSetting(func(s Setting) error { - if s.ID == SettingHeaderTableSize { - found = true - if got, want := s.Val, reqSize; got != want { - return fmt.Errorf("received SETTINGS_HEADER_TABLE_SIZE = %d, want %d", got, want) - } - } - return nil - }) - if err != nil { - return err - } - if !found { - return fmt.Errorf("missing SETTINGS_HEADER_TABLE_SIZE setting") - } - if err := ct.fr.WriteSettings(Setting{SettingHeaderTableSize, resSize}); err != nil { - ct.t.Fatal(err) - } - if err := ct.fr.WriteSettingsAck(); err != nil { - ct.t.Fatal(err) - } - for { - f, err := ct.fr.ReadFrame() - if err != nil { - return err - } - switch f := f.(type) { - case *HeadersFrame: - var buf bytes.Buffer - enc := hpack.NewEncoder(&buf) - enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) - ct.fr.WriteHeaders(HeadersFrameParam{ - StreamID: f.StreamID, - EndHeaders: true, - EndStream: true, - BlockFragment: buf.Bytes(), - }) - return nil - } - } + tc.writeSettings(Setting{SettingHeaderTableSize, resSize}) + if got, want := tc.cc.peerMaxHeaderTableSize, resSize; got != want { + t.Fatalf("peerHeaderTableSize = %d, want %d", got, want) } - ct.run() } func TestTransportMaxEncoderHeaderTableSize(t *testing.T) { - ct := newClientTester(t) var peerAdvertisedMaxHeaderTableSize uint32 = 16384 - ct.tr.MaxEncoderHeaderTableSize = 8192 - ct.client = func() error { - req, _ := http.NewRequest("GET", "https://dummy.tld/", nil) - cc, err := ct.tr.NewClientConn(ct.cc) - if err != nil { - return err - } - _, err = cc.RoundTrip(req) - if err != nil { - return err - } - if got, want := cc.henc.MaxDynamicTableSize(), ct.tr.MaxEncoderHeaderTableSize; got != want { - return fmt.Errorf("henc.MaxDynamicTableSize() = %d, want %d", got, want) - } - return nil - } - ct.server = func() error { - buf := make([]byte, len(ClientPreface)) - _, err := io.ReadFull(ct.sc, buf) - if err != nil { - return fmt.Errorf("reading client preface: %v", err) - } - f, err := ct.fr.ReadFrame() - if err != nil { - return err - } - sf, ok := f.(*SettingsFrame) - if !ok { - ct.t.Fatalf("wanted client settings frame; got %v", f) - _ = sf // stash it away? - } - if err := ct.fr.WriteSettings(Setting{SettingHeaderTableSize, peerAdvertisedMaxHeaderTableSize}); err != nil { - ct.t.Fatal(err) - } - if err := ct.fr.WriteSettingsAck(); err != nil { - ct.t.Fatal(err) - } + tc := newTestClientConn(t, func(tr *Transport) { + tr.MaxEncoderHeaderTableSize = 8192 + }) + tc.greet(Setting{SettingHeaderTableSize, peerAdvertisedMaxHeaderTableSize}) - for { - f, err := ct.fr.ReadFrame() - if err != nil { - return err - } - switch f := f.(type) { - case *HeadersFrame: - var buf bytes.Buffer - enc := hpack.NewEncoder(&buf) - enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) - ct.fr.WriteHeaders(HeadersFrameParam{ - StreamID: f.StreamID, - EndHeaders: true, - EndStream: true, - BlockFragment: buf.Bytes(), - }) - return nil - } - } + if got, want := tc.cc.henc.MaxDynamicTableSize(), tc.tr.MaxEncoderHeaderTableSize; got != want { + t.Fatalf("henc.MaxDynamicTableSize() = %d, want %d", got, want) } - ct.run() } func TestAuthorityAddr(t *testing.T) { @@ -4316,40 +3760,24 @@ func TestTransportAllocationsAfterResponseBodyClose(t *testing.T) { // Issue 18891: make sure Request.Body == NoBody means no DATA frame // is ever sent, even if empty. func TestTransportNoBodyMeansNoDATA(t *testing.T) { - ct := newClientTester(t) - - unblockClient := make(chan bool) + tc := newTestClientConn(t) + tc.greet() - ct.client = func() error { - req, _ := http.NewRequest("GET", "https://dummy.tld/", http.NoBody) - ct.tr.RoundTrip(req) - <-unblockClient - return nil - } - ct.server = func() error { - defer close(unblockClient) - defer ct.cc.(*net.TCPConn).Close() - ct.greet() + req, _ := http.NewRequest("GET", "https://dummy.tld/", http.NoBody) + rt := tc.roundTrip(req) - for { - f, err := ct.fr.ReadFrame() - if err != nil { - return fmt.Errorf("ReadFrame while waiting for Headers: %v", err) - } - switch f := f.(type) { - default: - return fmt.Errorf("Got %T; want HeadersFrame", f) - case *WindowUpdateFrame, *SettingsFrame: - continue - case *HeadersFrame: - if !f.StreamEnded() { - return fmt.Errorf("got headers frame without END_STREAM") - } - return nil - } - } + tc.wantHeaders(wantHeader{ + streamID: rt.streamID(), + endStream: true, // END_STREAM should be set when body is http.NoBody + header: http.Header{ + ":authority": []string{"dummy.tld"}, + ":method": []string{"GET"}, + ":path": []string{"/"}, + }, + }) + if fr := tc.readFrame(); fr != nil { + t.Fatalf("unexpected frame after headers: %v", fr) } - ct.run() } func benchSimpleRoundTrip(b *testing.B, nReqHeaders, nResHeader int) { @@ -4428,41 +3856,22 @@ func TestTransportResponseAndResetWithoutConsumingBodyRace(t *testing.T) { // Verify transport doesn't crash when receiving bogus response lacking a :status header. // Issue 22880. func TestTransportHandlesInvalidStatuslessResponse(t *testing.T) { - ct := newClientTester(t) - ct.client = func() error { - req, _ := http.NewRequest("GET", "https://dummy.tld/", nil) - _, err := ct.tr.RoundTrip(req) - const substr = "malformed response from server: missing status pseudo header" - if !strings.Contains(fmt.Sprint(err), substr) { - return fmt.Errorf("RoundTrip error = %v; want substring %q", err, substr) - } - return nil - } - ct.server = func() error { - ct.greet() - var buf bytes.Buffer - enc := hpack.NewEncoder(&buf) + tc := newTestClientConn(t) + tc.greet() - for { - f, err := ct.fr.ReadFrame() - if err != nil { - return err - } - switch f := f.(type) { - case *HeadersFrame: - enc.WriteField(hpack.HeaderField{Name: "content-type", Value: "text/html"}) // no :status header - ct.fr.WriteHeaders(HeadersFrameParam{ - StreamID: f.StreamID, - EndHeaders: true, - EndStream: false, // we'll send some DATA to try to crash the transport - BlockFragment: buf.Bytes(), - }) - ct.fr.WriteData(f.StreamID, true, []byte("payload")) - return nil - } - } - } - ct.run() + req, _ := http.NewRequest("GET", "https://dummy.tld/", nil) + rt := tc.roundTrip(req) + + tc.wantFrameType(FrameHeaders) + tc.writeHeaders(HeadersFrameParam{ + StreamID: rt.streamID(), + EndHeaders: true, + EndStream: false, // we'll send some DATA to try to crash the transport + BlockFragment: tc.makeHeaderBlockFragment( + "content-type", "text/html", // no :status header + ), + }) + tc.writeData(rt.streamID(), true, []byte("payload")) } func BenchmarkClientRequestHeaders(b *testing.B) { @@ -4810,95 +4219,42 @@ func (r *errReader) Read(p []byte) (int, error) { } func testTransportBodyReadError(t *testing.T, body []byte) { - if runtime.GOOS == "windows" || runtime.GOOS == "plan9" { - // So far we've only seen this be flaky on Windows and Plan 9, - // perhaps due to TCP behavior on shutdowns while - // unread data is in flight. This test should be - // fixed, but a skip is better than annoying people - // for now. - t.Skipf("skipping flaky test on %s; https://golang.org/issue/31260", runtime.GOOS) - } - clientDone := make(chan struct{}) - ct := newClientTester(t) - ct.client = func() error { - defer ct.cc.(*net.TCPConn).CloseWrite() - if runtime.GOOS == "plan9" { - // CloseWrite not supported on Plan 9; Issue 17906 - defer ct.cc.(*net.TCPConn).Close() - } - defer close(clientDone) - - checkNoStreams := func() error { - cp, ok := ct.tr.connPool().(*clientConnPool) - if !ok { - return fmt.Errorf("conn pool is %T; want *clientConnPool", ct.tr.connPool()) - } - cp.mu.Lock() - defer cp.mu.Unlock() - conns, ok := cp.conns["dummy.tld:443"] - if !ok { - return fmt.Errorf("missing connection") - } - if len(conns) != 1 { - return fmt.Errorf("conn pool size: %v; expect 1", len(conns)) - } - if activeStreams(conns[0]) != 0 { - return fmt.Errorf("active streams count: %v; want 0", activeStreams(conns[0])) - } - return nil - } - bodyReadError := errors.New("body read error") - body := &errReader{body, bodyReadError} - req, err := http.NewRequest("PUT", "https://dummy.tld/", body) - if err != nil { - return err - } - _, err = ct.tr.RoundTrip(req) - if err != bodyReadError { - return fmt.Errorf("err = %v; want %v", err, bodyReadError) - } - if err = checkNoStreams(); err != nil { - return err + tc := newTestClientConn(t) + tc.greet() + + bodyReadError := errors.New("body read error") + b := tc.newRequestBody() + b.Write(body) + b.closeWithError(bodyReadError) + req, _ := http.NewRequest("PUT", "https://dummy.tld/", b) + rt := tc.roundTrip(req) + + tc.wantFrameType(FrameHeaders) + var receivedBody []byte +readFrames: + for { + switch f := tc.readFrame().(type) { + case *DataFrame: + receivedBody = append(receivedBody, f.Data()...) + case *RSTStreamFrame: + break readFrames + default: + t.Fatalf("unexpected frame: %v", f) + case nil: + t.Fatalf("transport is idle, want RST_STREAM") } - return nil } - ct.server = func() error { - ct.greet() - var receivedBody []byte - var resetCount int - for { - f, err := ct.fr.ReadFrame() - t.Logf("server: ReadFrame = %v, %v", f, err) - if err != nil { - select { - case <-clientDone: - // If the client's done, it - // will have reported any - // errors on its side. - if bytes.Compare(receivedBody, body) != 0 { - return fmt.Errorf("body: %q; expected %q", receivedBody, body) - } - if resetCount != 1 { - return fmt.Errorf("stream reset count: %v; expected: 1", resetCount) - } - return nil - default: - return err - } - } - switch f := f.(type) { - case *WindowUpdateFrame, *SettingsFrame: - case *HeadersFrame: - case *DataFrame: - receivedBody = append(receivedBody, f.Data()...) - case *RSTStreamFrame: - resetCount++ - default: - return fmt.Errorf("Unexpected client frame %v", f) - } - } + if !bytes.Equal(receivedBody, body) { + t.Fatalf("body: %q; expected %q", receivedBody, body) + } + + if err := rt.err(); err != bodyReadError { + t.Fatalf("err = %v; want %v", err, bodyReadError) + } + + if got := activeStreams(tc.cc); got != 0 { + t.Fatalf("active streams count: %v; want 0", got) } - ct.run() } func TestTransportBodyReadError_Immediately(t *testing.T) { testTransportBodyReadError(t, nil) } @@ -4911,59 +4267,18 @@ func TestTransportBodyEagerEndStream(t *testing.T) { const reqBody = "some request body" const resBody = "some response body" - ct := newClientTester(t) - ct.client = func() error { - defer ct.cc.(*net.TCPConn).CloseWrite() - if runtime.GOOS == "plan9" { - // CloseWrite not supported on Plan 9; Issue 17906 - defer ct.cc.(*net.TCPConn).Close() - } - body := strings.NewReader(reqBody) - req, err := http.NewRequest("PUT", "https://dummy.tld/", body) - if err != nil { - return err - } - _, err = ct.tr.RoundTrip(req) - if err != nil { - return err - } - return nil - } - ct.server = func() error { - ct.greet() + tc := newTestClientConn(t) + tc.greet() - for { - f, err := ct.fr.ReadFrame() - if err != nil { - return err - } + body := strings.NewReader(reqBody) + req, _ := http.NewRequest("PUT", "https://dummy.tld/", body) + tc.roundTrip(req) - switch f := f.(type) { - case *WindowUpdateFrame, *SettingsFrame: - case *HeadersFrame: - case *DataFrame: - if !f.StreamEnded() { - ct.fr.WriteRSTStream(f.StreamID, ErrCodeRefusedStream) - return fmt.Errorf("data frame without END_STREAM %v", f) - } - var buf bytes.Buffer - enc := hpack.NewEncoder(&buf) - enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) - ct.fr.WriteHeaders(HeadersFrameParam{ - StreamID: f.Header().StreamID, - EndHeaders: true, - EndStream: false, - BlockFragment: buf.Bytes(), - }) - ct.fr.WriteData(f.StreamID, true, []byte(resBody)) - return nil - case *RSTStreamFrame: - default: - return fmt.Errorf("Unexpected client frame %v", f) - } - } + tc.wantFrameType(FrameHeaders) + f := testClientConnReadFrame[*DataFrame](tc) + if !f.StreamEnded() { + t.Fatalf("data frame without END_STREAM %v", f) } - ct.run() } type chunkReader struct { @@ -5737,39 +5052,27 @@ func TestClientConnReservations(t *testing.T) { } func TestTransportTimeoutServerHangs(t *testing.T) { - clientDone := make(chan struct{}) - ct := newClientTester(t) - ct.client = func() error { - defer ct.cc.(*net.TCPConn).CloseWrite() - defer close(clientDone) + tc := newTestClientConn(t) + tc.greet() - req, err := http.NewRequest("PUT", "https://dummy.tld/", nil) - if err != nil { - return err - } + ctx, cancel := context.WithCancel(context.Background()) + req, _ := http.NewRequestWithContext(ctx, "PUT", "https://dummy.tld/", nil) + rt := tc.roundTrip(req) - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - req = req.WithContext(ctx) - req.Header.Add("Big", strings.Repeat("a", 1<<20)) - _, err = ct.tr.RoundTrip(req) - if err == nil { - return errors.New("error should not be nil") - } - if ne, ok := err.(net.Error); !ok || !ne.Timeout() { - return fmt.Errorf("error should be a net error timeout: %v", err) - } - return nil + tc.wantFrameType(FrameHeaders) + tc.advance(5 * time.Second) + if f := tc.readFrame(); f != nil { + t.Fatalf("unexpected frame: %v", f) } - ct.server = func() error { - ct.greet() - select { - case <-time.After(5 * time.Second): - case <-clientDone: - } - return nil + if rt.done() { + t.Fatalf("after 5 seconds with no response, RoundTrip unexpectedly returned") + } + + cancel() + tc.sync() + if rt.err() != context.Canceled { + t.Fatalf("RoundTrip error: %v; want context.Canceled", rt.err()) } - ct.run() } func TestTransportContentLengthWithoutBody(t *testing.T) { @@ -5962,20 +5265,6 @@ func TestTransportClosesConnAfterGoAwayLastStream(t *testing.T) { testTransportClosesConnAfterGoAway(t, 1) } -type closeOnceConn struct { - net.Conn - closed uint32 -} - -var errClosed = errors.New("Close of closed connection") - -func (c *closeOnceConn) Close() error { - if atomic.CompareAndSwapUint32(&c.closed, 0, 1) { - return c.Conn.Close() - } - return errClosed -} - // testTransportClosesConnAfterGoAway verifies that the transport // closes a connection after reading a GOAWAY from it. // @@ -5983,53 +5272,35 @@ func (c *closeOnceConn) Close() error { // When 0, the transport (unsuccessfully) retries the request (stream 1); // when 1, the transport reads the response after receiving the GOAWAY. func testTransportClosesConnAfterGoAway(t *testing.T, lastStream uint32) { - ct := newClientTester(t) - ct.cc = &closeOnceConn{Conn: ct.cc} + tc := newTestClientConn(t) + tc.greet() - var wg sync.WaitGroup - wg.Add(1) - ct.client = func() error { - defer wg.Done() - req, _ := http.NewRequest("GET", "https://dummy.tld/", nil) - res, err := ct.tr.RoundTrip(req) - if err == nil { - res.Body.Close() - } - if gotErr, wantErr := err != nil, lastStream == 0; gotErr != wantErr { - t.Errorf("RoundTrip got error %v (want error: %v)", err, wantErr) - } - if err = ct.cc.Close(); err != errClosed { - return fmt.Errorf("ct.cc.Close() = %v, want errClosed", err) - } - return nil - } + req, _ := http.NewRequest("GET", "https://dummy.tld/", nil) + rt := tc.roundTrip(req) - ct.server = func() error { - defer wg.Wait() - ct.greet() - hf, err := ct.firstHeaders() - if err != nil { - return fmt.Errorf("server failed reading HEADERS: %v", err) - } - if err := ct.fr.WriteGoAway(lastStream, ErrCodeNo, nil); err != nil { - return fmt.Errorf("server failed writing GOAWAY: %v", err) - } - if lastStream > 0 { - // Send a valid response to first request. - var buf bytes.Buffer - enc := hpack.NewEncoder(&buf) - enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) - ct.fr.WriteHeaders(HeadersFrameParam{ - StreamID: hf.StreamID, - EndHeaders: true, - EndStream: true, - BlockFragment: buf.Bytes(), - }) - } - return nil + tc.wantFrameType(FrameHeaders) + tc.writeGoAway(lastStream, ErrCodeNo, nil) + + if lastStream > 0 { + // Send a valid response to first request. + tc.writeHeaders(HeadersFrameParam{ + StreamID: rt.streamID(), + EndHeaders: true, + EndStream: true, + BlockFragment: tc.makeHeaderBlockFragment( + ":status", "200", + ), + }) } - ct.run() + tc.closeWrite(io.EOF) + err := rt.err() + if gotErr, wantErr := err != nil, lastStream == 0; gotErr != wantErr { + t.Errorf("RoundTrip got error %v (want error: %v)", err, wantErr) + } + if !tc.netConnClosed { + t.Errorf("ClientConn did not close its net.Conn, expected it to") + } } type slowCloser struct {