use dep as denpendency managment tool
This commit is contained in:
371
vendor/golang.org/x/net/http2/transport.go
generated
vendored
371
vendor/golang.org/x/net/http2/transport.go
generated
vendored
@@ -27,9 +27,9 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/http/httpguts"
|
||||
"golang.org/x/net/http2/hpack"
|
||||
"golang.org/x/net/idna"
|
||||
"golang.org/x/net/lex/httplex"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -87,7 +87,7 @@ type Transport struct {
|
||||
|
||||
// MaxHeaderListSize is the http2 SETTINGS_MAX_HEADER_LIST_SIZE to
|
||||
// send in the initial settings frame. It is how many bytes
|
||||
// of response headers are allow. Unlike the http2 spec, zero here
|
||||
// of response headers are allowed. Unlike the http2 spec, zero here
|
||||
// means to use a default limit (currently 10MB). If you actually
|
||||
// want to advertise an ulimited value to the peer, Transport
|
||||
// interprets the highest possible value here (0xffffffff or 1<<32-1)
|
||||
@@ -172,9 +172,10 @@ type ClientConn struct {
|
||||
fr *Framer
|
||||
lastActive time.Time
|
||||
// Settings from peer: (also guarded by mu)
|
||||
maxFrameSize uint32
|
||||
maxConcurrentStreams uint32
|
||||
initialWindowSize uint32
|
||||
maxFrameSize uint32
|
||||
maxConcurrentStreams uint32
|
||||
peerMaxHeaderListSize uint64
|
||||
initialWindowSize uint32
|
||||
|
||||
hbuf bytes.Buffer // HPACK encoder writes into this
|
||||
henc *hpack.Encoder
|
||||
@@ -273,6 +274,13 @@ func (cs *clientStream) checkResetOrDone() error {
|
||||
}
|
||||
}
|
||||
|
||||
func (cs *clientStream) getStartedWrite() bool {
|
||||
cc := cs.cc
|
||||
cc.mu.Lock()
|
||||
defer cc.mu.Unlock()
|
||||
return cs.startedWrite
|
||||
}
|
||||
|
||||
func (cs *clientStream) abortRequestBodyWrite(err error) {
|
||||
if err == nil {
|
||||
panic("nil error")
|
||||
@@ -298,7 +306,26 @@ func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
var ErrNoCachedConn = errors.New("http2: no cached connection was available")
|
||||
// noCachedConnError is the concrete type of ErrNoCachedConn, which
|
||||
// needs to be detected by net/http regardless of whether it's its
|
||||
// bundled version (in h2_bundle.go with a rewritten type name) or
|
||||
// from a user's x/net/http2. As such, as it has a unique method name
|
||||
// (IsHTTP2NoCachedConnError) that net/http sniffs for via func
|
||||
// isNoCachedConnError.
|
||||
type noCachedConnError struct{}
|
||||
|
||||
func (noCachedConnError) IsHTTP2NoCachedConnError() {}
|
||||
func (noCachedConnError) Error() string { return "http2: no cached connection was available" }
|
||||
|
||||
// isNoCachedConnError reports whether err is of type noCachedConnError
|
||||
// or its equivalent renamed type in net/http2's h2_bundle.go. Both types
|
||||
// may coexist in the same running program.
|
||||
func isNoCachedConnError(err error) bool {
|
||||
_, ok := err.(interface{ IsHTTP2NoCachedConnError() })
|
||||
return ok
|
||||
}
|
||||
|
||||
var ErrNoCachedConn error = noCachedConnError{}
|
||||
|
||||
// RoundTripOpt are options for the Transport.RoundTripOpt method.
|
||||
type RoundTripOpt struct {
|
||||
@@ -348,14 +375,9 @@ func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Res
|
||||
return nil, err
|
||||
}
|
||||
traceGotConn(req, cc)
|
||||
res, err := cc.RoundTrip(req)
|
||||
res, gotErrAfterReqBodyWrite, err := cc.roundTrip(req)
|
||||
if err != nil && retry <= 6 {
|
||||
afterBodyWrite := false
|
||||
if e, ok := err.(afterReqBodyWriteError); ok {
|
||||
err = e
|
||||
afterBodyWrite = true
|
||||
}
|
||||
if req, err = shouldRetryRequest(req, err, afterBodyWrite); err == nil {
|
||||
if req, err = shouldRetryRequest(req, err, gotErrAfterReqBodyWrite); err == nil {
|
||||
// After the first retry, do exponential backoff with 10% jitter.
|
||||
if retry == 0 {
|
||||
continue
|
||||
@@ -393,16 +415,6 @@ var (
|
||||
errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
|
||||
)
|
||||
|
||||
// afterReqBodyWriteError is a wrapper around errors returned by ClientConn.RoundTrip.
|
||||
// It is used to signal that err happened after part of Request.Body was sent to the server.
|
||||
type afterReqBodyWriteError struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (e afterReqBodyWriteError) Error() string {
|
||||
return e.err.Error() + "; some request body already written"
|
||||
}
|
||||
|
||||
// shouldRetryRequest is called by RoundTrip when a request fails to get
|
||||
// response headers. It is always called with a non-nil error.
|
||||
// It returns either a request to retry (either the same request, or a
|
||||
@@ -519,17 +531,18 @@ func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
|
||||
|
||||
func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) {
|
||||
cc := &ClientConn{
|
||||
t: t,
|
||||
tconn: c,
|
||||
readerDone: make(chan struct{}),
|
||||
nextStreamID: 1,
|
||||
maxFrameSize: 16 << 10, // spec default
|
||||
initialWindowSize: 65535, // spec default
|
||||
maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough.
|
||||
streams: make(map[uint32]*clientStream),
|
||||
singleUse: singleUse,
|
||||
wantSettingsAck: true,
|
||||
pings: make(map[[8]byte]chan struct{}),
|
||||
t: t,
|
||||
tconn: c,
|
||||
readerDone: make(chan struct{}),
|
||||
nextStreamID: 1,
|
||||
maxFrameSize: 16 << 10, // spec default
|
||||
initialWindowSize: 65535, // spec default
|
||||
maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough.
|
||||
peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
|
||||
streams: make(map[uint32]*clientStream),
|
||||
singleUse: singleUse,
|
||||
wantSettingsAck: true,
|
||||
pings: make(map[[8]byte]chan struct{}),
|
||||
}
|
||||
if d := t.idleConnTimeout(); d != 0 {
|
||||
cc.idleTimeout = d
|
||||
@@ -554,6 +567,10 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
|
||||
// henc in response to SETTINGS frames?
|
||||
cc.henc = hpack.NewEncoder(&cc.hbuf)
|
||||
|
||||
if t.AllowHTTP {
|
||||
cc.nextStreamID = 3
|
||||
}
|
||||
|
||||
if cs, ok := c.(connectionStater); ok {
|
||||
state := cs.ConnectionState()
|
||||
cc.tlsState = &state
|
||||
@@ -750,8 +767,13 @@ func actualContentLength(req *http.Request) int64 {
|
||||
}
|
||||
|
||||
func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
resp, _, err := cc.roundTrip(req)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAfterReqBodyWrite bool, err error) {
|
||||
if err := checkConnHeaders(req); err != nil {
|
||||
return nil, err
|
||||
return nil, false, err
|
||||
}
|
||||
if cc.idleTimer != nil {
|
||||
cc.idleTimer.Stop()
|
||||
@@ -759,14 +781,14 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
|
||||
trailers, err := commaSeparatedTrailers(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, false, err
|
||||
}
|
||||
hasTrailers := trailers != ""
|
||||
|
||||
cc.mu.Lock()
|
||||
if err := cc.awaitOpenSlotForRequest(req); err != nil {
|
||||
cc.mu.Unlock()
|
||||
return nil, err
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
body := req.Body
|
||||
@@ -800,7 +822,7 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
hdrs, err := cc.encodeHeaders(req, requestedGzip, trailers, contentLen)
|
||||
if err != nil {
|
||||
cc.mu.Unlock()
|
||||
return nil, err
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
cs := cc.newStream()
|
||||
@@ -812,7 +834,7 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
|
||||
cc.wmu.Lock()
|
||||
endStream := !hasBody && !hasTrailers
|
||||
werr := cc.writeHeaders(cs.ID, endStream, hdrs)
|
||||
werr := cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
|
||||
cc.wmu.Unlock()
|
||||
traceWroteHeaders(cs.trace)
|
||||
cc.mu.Unlock()
|
||||
@@ -826,7 +848,7 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
// Don't bother sending a RST_STREAM (our write already failed;
|
||||
// no need to keep writing)
|
||||
traceWroteRequest(cs.trace, werr)
|
||||
return nil, werr
|
||||
return nil, false, werr
|
||||
}
|
||||
|
||||
var respHeaderTimer <-chan time.Time
|
||||
@@ -845,7 +867,7 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
bodyWritten := false
|
||||
ctx := reqContext(req)
|
||||
|
||||
handleReadLoopResponse := func(re resAndError) (*http.Response, error) {
|
||||
handleReadLoopResponse := func(re resAndError) (*http.Response, bool, error) {
|
||||
res := re.res
|
||||
if re.err != nil || res.StatusCode > 299 {
|
||||
// On error or status code 3xx, 4xx, 5xx, etc abort any
|
||||
@@ -861,18 +883,12 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
cs.abortRequestBodyWrite(errStopReqBodyWrite)
|
||||
}
|
||||
if re.err != nil {
|
||||
cc.mu.Lock()
|
||||
afterBodyWrite := cs.startedWrite
|
||||
cc.mu.Unlock()
|
||||
cc.forgetStreamID(cs.ID)
|
||||
if afterBodyWrite {
|
||||
return nil, afterReqBodyWriteError{re.err}
|
||||
}
|
||||
return nil, re.err
|
||||
return nil, cs.getStartedWrite(), re.err
|
||||
}
|
||||
res.Request = req
|
||||
res.TLS = cc.tlsState
|
||||
return res, nil
|
||||
return res, false, nil
|
||||
}
|
||||
|
||||
for {
|
||||
@@ -887,7 +903,7 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
|
||||
}
|
||||
cc.forgetStreamID(cs.ID)
|
||||
return nil, errTimeout
|
||||
return nil, cs.getStartedWrite(), errTimeout
|
||||
case <-ctx.Done():
|
||||
if !hasBody || bodyWritten {
|
||||
cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
|
||||
@@ -896,7 +912,7 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
|
||||
}
|
||||
cc.forgetStreamID(cs.ID)
|
||||
return nil, ctx.Err()
|
||||
return nil, cs.getStartedWrite(), ctx.Err()
|
||||
case <-req.Cancel:
|
||||
if !hasBody || bodyWritten {
|
||||
cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
|
||||
@@ -905,12 +921,12 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
|
||||
}
|
||||
cc.forgetStreamID(cs.ID)
|
||||
return nil, errRequestCanceled
|
||||
return nil, cs.getStartedWrite(), errRequestCanceled
|
||||
case <-cs.peerReset:
|
||||
// processResetStream already removed the
|
||||
// stream from the streams map; no need for
|
||||
// forgetStreamID.
|
||||
return nil, cs.resetErr
|
||||
return nil, cs.getStartedWrite(), cs.resetErr
|
||||
case err := <-bodyWriter.resc:
|
||||
// Prefer the read loop's response, if available. Issue 16102.
|
||||
select {
|
||||
@@ -919,7 +935,7 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
default:
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, cs.getStartedWrite(), err
|
||||
}
|
||||
bodyWritten = true
|
||||
if d := cc.responseHeaderTimeout(); d != 0 {
|
||||
@@ -939,6 +955,9 @@ func (cc *ClientConn) awaitOpenSlotForRequest(req *http.Request) error {
|
||||
for {
|
||||
cc.lastActive = time.Now()
|
||||
if cc.closed || !cc.canTakeNewRequestLocked() {
|
||||
if waitingForConn != nil {
|
||||
close(waitingForConn)
|
||||
}
|
||||
return errClientConnUnusable
|
||||
}
|
||||
if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) {
|
||||
@@ -971,13 +990,12 @@ func (cc *ClientConn) awaitOpenSlotForRequest(req *http.Request) error {
|
||||
}
|
||||
|
||||
// requires cc.wmu be held
|
||||
func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, hdrs []byte) error {
|
||||
func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, maxFrameSize int, hdrs []byte) error {
|
||||
first := true // first frame written (HEADERS is first, then CONTINUATION)
|
||||
frameSize := int(cc.maxFrameSize)
|
||||
for len(hdrs) > 0 && cc.werr == nil {
|
||||
chunk := hdrs
|
||||
if len(chunk) > frameSize {
|
||||
chunk = chunk[:frameSize]
|
||||
if len(chunk) > maxFrameSize {
|
||||
chunk = chunk[:maxFrameSize]
|
||||
}
|
||||
hdrs = hdrs[len(chunk):]
|
||||
endHeaders := len(hdrs) == 0
|
||||
@@ -1085,17 +1103,26 @@ func (cs *clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (
|
||||
var trls []byte
|
||||
if hasTrailers {
|
||||
cc.mu.Lock()
|
||||
defer cc.mu.Unlock()
|
||||
trls = cc.encodeTrailers(req)
|
||||
trls, err = cc.encodeTrailers(req)
|
||||
cc.mu.Unlock()
|
||||
if err != nil {
|
||||
cc.writeStreamReset(cs.ID, ErrCodeInternal, err)
|
||||
cc.forgetStreamID(cs.ID)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
cc.mu.Lock()
|
||||
maxFrameSize := int(cc.maxFrameSize)
|
||||
cc.mu.Unlock()
|
||||
|
||||
cc.wmu.Lock()
|
||||
defer cc.wmu.Unlock()
|
||||
|
||||
// Two ways to send END_STREAM: either with trailers, or
|
||||
// with an empty DATA frame.
|
||||
if len(trls) > 0 {
|
||||
err = cc.writeHeaders(cs.ID, true, trls)
|
||||
err = cc.writeHeaders(cs.ID, true, maxFrameSize, trls)
|
||||
} else {
|
||||
err = cc.fr.WriteData(cs.ID, true, nil)
|
||||
}
|
||||
@@ -1154,7 +1181,7 @@ func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trail
|
||||
if host == "" {
|
||||
host = req.URL.Host
|
||||
}
|
||||
host, err := httplex.PunycodeHostPort(host)
|
||||
host, err := httpguts.PunycodeHostPort(host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1179,72 +1206,96 @@ func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trail
|
||||
// potentially pollute our hpack state. (We want to be able to
|
||||
// continue to reuse the hpack encoder for future requests)
|
||||
for k, vv := range req.Header {
|
||||
if !httplex.ValidHeaderFieldName(k) {
|
||||
if !httpguts.ValidHeaderFieldName(k) {
|
||||
return nil, fmt.Errorf("invalid HTTP header name %q", k)
|
||||
}
|
||||
for _, v := range vv {
|
||||
if !httplex.ValidHeaderFieldValue(v) {
|
||||
if !httpguts.ValidHeaderFieldValue(v) {
|
||||
return nil, fmt.Errorf("invalid HTTP header value %q for header %q", v, k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 8.1.2.3 Request Pseudo-Header Fields
|
||||
// The :path pseudo-header field includes the path and query parts of the
|
||||
// target URI (the path-absolute production and optionally a '?' character
|
||||
// followed by the query production (see Sections 3.3 and 3.4 of
|
||||
// [RFC3986]).
|
||||
cc.writeHeader(":authority", host)
|
||||
cc.writeHeader(":method", req.Method)
|
||||
if req.Method != "CONNECT" {
|
||||
cc.writeHeader(":path", path)
|
||||
cc.writeHeader(":scheme", req.URL.Scheme)
|
||||
}
|
||||
if trailers != "" {
|
||||
cc.writeHeader("trailer", trailers)
|
||||
enumerateHeaders := func(f func(name, value string)) {
|
||||
// 8.1.2.3 Request Pseudo-Header Fields
|
||||
// The :path pseudo-header field includes the path and query parts of the
|
||||
// target URI (the path-absolute production and optionally a '?' character
|
||||
// followed by the query production (see Sections 3.3 and 3.4 of
|
||||
// [RFC3986]).
|
||||
f(":authority", host)
|
||||
f(":method", req.Method)
|
||||
if req.Method != "CONNECT" {
|
||||
f(":path", path)
|
||||
f(":scheme", req.URL.Scheme)
|
||||
}
|
||||
if trailers != "" {
|
||||
f("trailer", trailers)
|
||||
}
|
||||
|
||||
var didUA bool
|
||||
for k, vv := range req.Header {
|
||||
if strings.EqualFold(k, "host") || strings.EqualFold(k, "content-length") {
|
||||
// Host is :authority, already sent.
|
||||
// Content-Length is automatic, set below.
|
||||
continue
|
||||
} else if strings.EqualFold(k, "connection") || strings.EqualFold(k, "proxy-connection") ||
|
||||
strings.EqualFold(k, "transfer-encoding") || strings.EqualFold(k, "upgrade") ||
|
||||
strings.EqualFold(k, "keep-alive") {
|
||||
// Per 8.1.2.2 Connection-Specific Header
|
||||
// Fields, don't send connection-specific
|
||||
// fields. We have already checked if any
|
||||
// are error-worthy so just ignore the rest.
|
||||
continue
|
||||
} else if strings.EqualFold(k, "user-agent") {
|
||||
// Match Go's http1 behavior: at most one
|
||||
// User-Agent. If set to nil or empty string,
|
||||
// then omit it. Otherwise if not mentioned,
|
||||
// include the default (below).
|
||||
didUA = true
|
||||
if len(vv) < 1 {
|
||||
continue
|
||||
}
|
||||
vv = vv[:1]
|
||||
if vv[0] == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
for _, v := range vv {
|
||||
f(k, v)
|
||||
}
|
||||
}
|
||||
if shouldSendReqContentLength(req.Method, contentLength) {
|
||||
f("content-length", strconv.FormatInt(contentLength, 10))
|
||||
}
|
||||
if addGzipHeader {
|
||||
f("accept-encoding", "gzip")
|
||||
}
|
||||
if !didUA {
|
||||
f("user-agent", defaultUserAgent)
|
||||
}
|
||||
}
|
||||
|
||||
var didUA bool
|
||||
for k, vv := range req.Header {
|
||||
lowKey := strings.ToLower(k)
|
||||
switch lowKey {
|
||||
case "host", "content-length":
|
||||
// Host is :authority, already sent.
|
||||
// Content-Length is automatic, set below.
|
||||
continue
|
||||
case "connection", "proxy-connection", "transfer-encoding", "upgrade", "keep-alive":
|
||||
// Per 8.1.2.2 Connection-Specific Header
|
||||
// Fields, don't send connection-specific
|
||||
// fields. We have already checked if any
|
||||
// are error-worthy so just ignore the rest.
|
||||
continue
|
||||
case "user-agent":
|
||||
// Match Go's http1 behavior: at most one
|
||||
// User-Agent. If set to nil or empty string,
|
||||
// then omit it. Otherwise if not mentioned,
|
||||
// include the default (below).
|
||||
didUA = true
|
||||
if len(vv) < 1 {
|
||||
continue
|
||||
}
|
||||
vv = vv[:1]
|
||||
if vv[0] == "" {
|
||||
continue
|
||||
}
|
||||
}
|
||||
for _, v := range vv {
|
||||
cc.writeHeader(lowKey, v)
|
||||
}
|
||||
}
|
||||
if shouldSendReqContentLength(req.Method, contentLength) {
|
||||
cc.writeHeader("content-length", strconv.FormatInt(contentLength, 10))
|
||||
}
|
||||
if addGzipHeader {
|
||||
cc.writeHeader("accept-encoding", "gzip")
|
||||
}
|
||||
if !didUA {
|
||||
cc.writeHeader("user-agent", defaultUserAgent)
|
||||
// Do a first pass over the headers counting bytes to ensure
|
||||
// we don't exceed cc.peerMaxHeaderListSize. This is done as a
|
||||
// separate pass before encoding the headers to prevent
|
||||
// modifying the hpack state.
|
||||
hlSize := uint64(0)
|
||||
enumerateHeaders(func(name, value string) {
|
||||
hf := hpack.HeaderField{Name: name, Value: value}
|
||||
hlSize += uint64(hf.Size())
|
||||
})
|
||||
|
||||
if hlSize > cc.peerMaxHeaderListSize {
|
||||
return nil, errRequestHeaderListSize
|
||||
}
|
||||
|
||||
// Header list size is ok. Write the headers.
|
||||
enumerateHeaders(func(name, value string) {
|
||||
cc.writeHeader(strings.ToLower(name), value)
|
||||
})
|
||||
|
||||
return cc.hbuf.Bytes(), nil
|
||||
}
|
||||
|
||||
@@ -1271,17 +1322,29 @@ func shouldSendReqContentLength(method string, contentLength int64) bool {
|
||||
}
|
||||
|
||||
// requires cc.mu be held.
|
||||
func (cc *ClientConn) encodeTrailers(req *http.Request) []byte {
|
||||
func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) {
|
||||
cc.hbuf.Reset()
|
||||
|
||||
hlSize := uint64(0)
|
||||
for k, vv := range req.Trailer {
|
||||
// Transfer-Encoding, etc.. have already been filter at the
|
||||
for _, v := range vv {
|
||||
hf := hpack.HeaderField{Name: k, Value: v}
|
||||
hlSize += uint64(hf.Size())
|
||||
}
|
||||
}
|
||||
if hlSize > cc.peerMaxHeaderListSize {
|
||||
return nil, errRequestHeaderListSize
|
||||
}
|
||||
|
||||
for k, vv := range req.Trailer {
|
||||
// Transfer-Encoding, etc.. have already been filtered at the
|
||||
// start of RoundTrip
|
||||
lowKey := strings.ToLower(k)
|
||||
for _, v := range vv {
|
||||
cc.writeHeader(lowKey, v)
|
||||
}
|
||||
}
|
||||
return cc.hbuf.Bytes()
|
||||
return cc.hbuf.Bytes(), nil
|
||||
}
|
||||
|
||||
func (cc *ClientConn) writeHeader(name, value string) {
|
||||
@@ -1339,17 +1402,12 @@ func (cc *ClientConn) streamByID(id uint32, andRemove bool) *clientStream {
|
||||
// clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop.
|
||||
type clientConnReadLoop struct {
|
||||
cc *ClientConn
|
||||
activeRes map[uint32]*clientStream // keyed by streamID
|
||||
closeWhenIdle bool
|
||||
}
|
||||
|
||||
// readLoop runs in its own goroutine and reads and dispatches frames.
|
||||
func (cc *ClientConn) readLoop() {
|
||||
rl := &clientConnReadLoop{
|
||||
cc: cc,
|
||||
activeRes: make(map[uint32]*clientStream),
|
||||
}
|
||||
|
||||
rl := &clientConnReadLoop{cc: cc}
|
||||
defer rl.cleanup()
|
||||
cc.readerErr = rl.run()
|
||||
if ce, ok := cc.readerErr.(ConnectionError); ok {
|
||||
@@ -1404,10 +1462,8 @@ func (rl *clientConnReadLoop) cleanup() {
|
||||
} else if err == io.EOF {
|
||||
err = io.ErrUnexpectedEOF
|
||||
}
|
||||
for _, cs := range rl.activeRes {
|
||||
cs.bufPipe.CloseWithError(err)
|
||||
}
|
||||
for _, cs := range cc.streams {
|
||||
cs.bufPipe.CloseWithError(err) // no-op if already closed
|
||||
select {
|
||||
case cs.resc <- resAndError{err: err}:
|
||||
default:
|
||||
@@ -1485,7 +1541,7 @@ func (rl *clientConnReadLoop) run() error {
|
||||
}
|
||||
return err
|
||||
}
|
||||
if rl.closeWhenIdle && gotReply && maybeIdle && len(rl.activeRes) == 0 {
|
||||
if rl.closeWhenIdle && gotReply && maybeIdle {
|
||||
cc.closeIfIdle()
|
||||
}
|
||||
}
|
||||
@@ -1493,13 +1549,31 @@ func (rl *clientConnReadLoop) run() error {
|
||||
|
||||
func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
|
||||
cc := rl.cc
|
||||
cs := cc.streamByID(f.StreamID, f.StreamEnded())
|
||||
cs := cc.streamByID(f.StreamID, false)
|
||||
if cs == nil {
|
||||
// We'd get here if we canceled a request while the
|
||||
// server had its response still in flight. So if this
|
||||
// was just something we canceled, ignore it.
|
||||
return nil
|
||||
}
|
||||
if f.StreamEnded() {
|
||||
// Issue 20521: If the stream has ended, streamByID() causes
|
||||
// clientStream.done to be closed, which causes the request's bodyWriter
|
||||
// to be closed with an errStreamClosed, which may be received by
|
||||
// clientConn.RoundTrip before the result of processing these headers.
|
||||
// Deferring stream closure allows the header processing to occur first.
|
||||
// clientConn.RoundTrip may still receive the bodyWriter error first, but
|
||||
// the fix for issue 16102 prioritises any response.
|
||||
//
|
||||
// Issue 22413: If there is no request body, we should close the
|
||||
// stream before writing to cs.resc so that the stream is closed
|
||||
// immediately once RoundTrip returns.
|
||||
if cs.req.Body != nil {
|
||||
defer cc.forgetStreamID(f.StreamID)
|
||||
} else {
|
||||
cc.forgetStreamID(f.StreamID)
|
||||
}
|
||||
}
|
||||
if !cs.firstByte {
|
||||
if cs.trace != nil {
|
||||
// TODO(bradfitz): move first response byte earlier,
|
||||
@@ -1523,6 +1597,7 @@ func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
|
||||
}
|
||||
// Any other error type is a stream error.
|
||||
cs.cc.writeStreamReset(f.StreamID, ErrCodeProtocol, err)
|
||||
cc.forgetStreamID(cs.ID)
|
||||
cs.resc <- resAndError{err: err}
|
||||
return nil // return nil from process* funcs to keep conn alive
|
||||
}
|
||||
@@ -1530,9 +1605,6 @@ func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
|
||||
// (nil, nil) special case. See handleResponse docs.
|
||||
return nil
|
||||
}
|
||||
if res.Body != noBody {
|
||||
rl.activeRes[cs.ID] = cs
|
||||
}
|
||||
cs.resTrailer = &res.Trailer
|
||||
cs.resc <- resAndError{res: res}
|
||||
return nil
|
||||
@@ -1552,11 +1624,11 @@ func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFra
|
||||
|
||||
status := f.PseudoValue("status")
|
||||
if status == "" {
|
||||
return nil, errors.New("missing status pseudo header")
|
||||
return nil, errors.New("malformed response from server: missing status pseudo header")
|
||||
}
|
||||
statusCode, err := strconv.Atoi(status)
|
||||
if err != nil {
|
||||
return nil, errors.New("malformed non-numeric status pseudo header")
|
||||
return nil, errors.New("malformed response from server: malformed non-numeric status pseudo header")
|
||||
}
|
||||
|
||||
if statusCode == 100 {
|
||||
@@ -1789,7 +1861,23 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if !cs.firstByte {
|
||||
cc.logf("protocol error: received DATA before a HEADERS frame")
|
||||
rl.endStreamError(cs, StreamError{
|
||||
StreamID: f.StreamID,
|
||||
Code: ErrCodeProtocol,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
if f.Length > 0 {
|
||||
if cs.req.Method == "HEAD" && len(data) > 0 {
|
||||
cc.logf("protocol error: received DATA on a HEAD request")
|
||||
rl.endStreamError(cs, StreamError{
|
||||
StreamID: f.StreamID,
|
||||
Code: ErrCodeProtocol,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
// Check connection-level flow control.
|
||||
cc.mu.Lock()
|
||||
if cs.inflow.available() >= int32(f.Length) {
|
||||
@@ -1851,11 +1939,10 @@ func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) {
|
||||
err = io.EOF
|
||||
code = cs.copyTrailers
|
||||
}
|
||||
cs.bufPipe.closeWithErrorAndCode(err, code)
|
||||
delete(rl.activeRes, cs.ID)
|
||||
if isConnectionCloseRequest(cs.req) {
|
||||
rl.closeWhenIdle = true
|
||||
}
|
||||
cs.bufPipe.closeWithErrorAndCode(err, code)
|
||||
|
||||
select {
|
||||
case cs.resc <- resAndError{err: err}:
|
||||
@@ -1903,6 +1990,8 @@ func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
|
||||
cc.maxFrameSize = s.Val
|
||||
case SettingMaxConcurrentStreams:
|
||||
cc.maxConcurrentStreams = s.Val
|
||||
case SettingMaxHeaderListSize:
|
||||
cc.peerMaxHeaderListSize = uint64(s.Val)
|
||||
case SettingInitialWindowSize:
|
||||
// Values above the maximum flow-control
|
||||
// window size of 2^31-1 MUST be treated as a
|
||||
@@ -1980,7 +2069,6 @@ func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
|
||||
cs.bufPipe.CloseWithError(err)
|
||||
cs.cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl
|
||||
}
|
||||
delete(rl.activeRes, cs.ID)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -2069,6 +2157,7 @@ func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, err error)
|
||||
|
||||
var (
|
||||
errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit")
|
||||
errRequestHeaderListSize = errors.New("http2: request header list larger than peer's advertised limit")
|
||||
errPseudoTrailers = errors.New("http2: invalid pseudo header in trailers")
|
||||
)
|
||||
|
||||
@@ -2162,7 +2251,7 @@ func (t *Transport) getBodyWriterState(cs *clientStream, body io.Reader) (s body
|
||||
}
|
||||
s.delay = t.expectContinueTimeout()
|
||||
if s.delay == 0 ||
|
||||
!httplex.HeaderValuesContainsToken(
|
||||
!httpguts.HeaderValuesContainsToken(
|
||||
cs.req.Header["Expect"],
|
||||
"100-continue") {
|
||||
return
|
||||
@@ -2217,5 +2306,5 @@ func (s bodyWriterState) scheduleBodyWrite() {
|
||||
// isConnectionCloseRequest reports whether req should use its own
|
||||
// connection for a single request and then close the connection.
|
||||
func isConnectionCloseRequest(req *http.Request) bool {
|
||||
return req.Close || httplex.HeaderValuesContainsToken(req.Header["Connection"], "close")
|
||||
return req.Close || httpguts.HeaderValuesContainsToken(req.Header["Connection"], "close")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user