Update dependencies (#5518)

This commit is contained in:
hongming
2023-02-12 23:09:20 +08:00
committed by GitHub
parent d3b35fb2da
commit a979342f56
1486 changed files with 126660 additions and 71128 deletions

View File

@@ -37,7 +37,7 @@ type Logger interface {
// binLogger is the global binary logger for the binary. One of this should be
// built at init time from the configuration (environment variable or flags).
//
// It is used to get a methodLogger for each individual method.
// It is used to get a MethodLogger for each individual method.
var binLogger Logger
var grpclogLogger = grpclog.Component("binarylog")
@@ -56,11 +56,11 @@ func GetLogger() Logger {
return binLogger
}
// GetMethodLogger returns the methodLogger for the given methodName.
// GetMethodLogger returns the MethodLogger for the given methodName.
//
// methodName should be in the format of "/service/method".
//
// Each methodLogger returned by this method is a new instance. This is to
// Each MethodLogger returned by this method is a new instance. This is to
// generate sequence id within the call.
func GetMethodLogger(methodName string) MethodLogger {
if binLogger == nil {
@@ -117,7 +117,7 @@ func (l *logger) setDefaultMethodLogger(ml *MethodLoggerConfig) error {
// Set method logger for "service/*".
//
// New methodLogger with same service overrides the old one.
// New MethodLogger with same service overrides the old one.
func (l *logger) setServiceMethodLogger(service string, ml *MethodLoggerConfig) error {
if _, ok := l.config.Services[service]; ok {
return fmt.Errorf("conflicting service rules for service %v found", service)
@@ -131,7 +131,7 @@ func (l *logger) setServiceMethodLogger(service string, ml *MethodLoggerConfig)
// Set method logger for "service/method".
//
// New methodLogger with same method overrides the old one.
// New MethodLogger with same method overrides the old one.
func (l *logger) setMethodMethodLogger(method string, ml *MethodLoggerConfig) error {
if _, ok := l.config.Blacklist[method]; ok {
return fmt.Errorf("conflicting blacklist rules for method %v found", method)
@@ -161,11 +161,11 @@ func (l *logger) setBlacklist(method string) error {
return nil
}
// getMethodLogger returns the methodLogger for the given methodName.
// getMethodLogger returns the MethodLogger for the given methodName.
//
// methodName should be in the format of "/service/method".
//
// Each methodLogger returned by this method is a new instance. This is to
// Each MethodLogger returned by this method is a new instance. This is to
// generate sequence id within the call.
func (l *logger) GetMethodLogger(methodName string) MethodLogger {
s, m, err := grpcutil.ParseMethod(methodName)
@@ -174,16 +174,16 @@ func (l *logger) GetMethodLogger(methodName string) MethodLogger {
return nil
}
if ml, ok := l.config.Methods[s+"/"+m]; ok {
return newMethodLogger(ml.Header, ml.Message)
return NewTruncatingMethodLogger(ml.Header, ml.Message)
}
if _, ok := l.config.Blacklist[s+"/"+m]; ok {
return nil
}
if ml, ok := l.config.Services[s]; ok {
return newMethodLogger(ml.Header, ml.Message)
return NewTruncatingMethodLogger(ml.Header, ml.Message)
}
if l.config.All == nil {
return nil
}
return newMethodLogger(l.config.All.Header, l.config.All.Message)
return NewTruncatingMethodLogger(l.config.All.Header, l.config.All.Message)
}

View File

@@ -30,15 +30,15 @@ import (
// to build a new logger and assign it to binarylog.Logger.
//
// Example filter config strings:
// - "" Nothing will be logged
// - "*" All headers and messages will be fully logged.
// - "*{h}" Only headers will be logged.
// - "*{m:256}" Only the first 256 bytes of each message will be logged.
// - "Foo/*" Logs every method in service Foo
// - "Foo/*,-Foo/Bar" Logs every method in service Foo except method /Foo/Bar
// - "Foo/*,Foo/Bar{m:256}" Logs the first 256 bytes of each message in method
// /Foo/Bar, logs all headers and messages in every other method in service
// Foo.
// - "" Nothing will be logged
// - "*" All headers and messages will be fully logged.
// - "*{h}" Only headers will be logged.
// - "*{m:256}" Only the first 256 bytes of each message will be logged.
// - "Foo/*" Logs every method in service Foo
// - "Foo/*,-Foo/Bar" Logs every method in service Foo except method /Foo/Bar
// - "Foo/*,Foo/Bar{m:256}" Logs the first 256 bytes of each message in method
// /Foo/Bar, logs all headers and messages in every other method in service
// Foo.
//
// If two configs exist for one certain method or service, the one specified
// later overrides the previous config.
@@ -57,7 +57,7 @@ func NewLoggerFromConfigString(s string) Logger {
return l
}
// fillMethodLoggerWithConfigString parses config, creates methodLogger and adds
// fillMethodLoggerWithConfigString parses config, creates TruncatingMethodLogger and adds
// it to the right map in the logger.
func (l *logger) fillMethodLoggerWithConfigString(config string) error {
// "" is invalid.

View File

@@ -52,7 +52,9 @@ type MethodLogger interface {
Log(LogEntryConfig)
}
type methodLogger struct {
// TruncatingMethodLogger is a method logger that truncates headers and messages
// based on configured fields.
type TruncatingMethodLogger struct {
headerMaxLen, messageMaxLen uint64
callID uint64
@@ -61,8 +63,9 @@ type methodLogger struct {
sink Sink // TODO(blog): make this plugable.
}
func newMethodLogger(h, m uint64) *methodLogger {
return &methodLogger{
// NewTruncatingMethodLogger returns a new truncating method logger.
func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger {
return &TruncatingMethodLogger{
headerMaxLen: h,
messageMaxLen: m,
@@ -75,8 +78,8 @@ func newMethodLogger(h, m uint64) *methodLogger {
// Build is an internal only method for building the proto message out of the
// input event. It's made public to enable other library to reuse as much logic
// in methodLogger as possible.
func (ml *methodLogger) Build(c LogEntryConfig) *pb.GrpcLogEntry {
// in TruncatingMethodLogger as possible.
func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *pb.GrpcLogEntry {
m := c.toProto()
timestamp, _ := ptypes.TimestampProto(time.Now())
m.Timestamp = timestamp
@@ -95,11 +98,11 @@ func (ml *methodLogger) Build(c LogEntryConfig) *pb.GrpcLogEntry {
}
// Log creates a proto binary log entry, and logs it to the sink.
func (ml *methodLogger) Log(c LogEntryConfig) {
func (ml *TruncatingMethodLogger) Log(c LogEntryConfig) {
ml.sink.Write(ml.Build(c))
}
func (ml *methodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
func (ml *TruncatingMethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
if ml.headerMaxLen == maxUInt {
return false
}
@@ -118,7 +121,7 @@ func (ml *methodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
// but not counted towards the size limit.
continue
}
currentEntryLen := uint64(len(entry.Value))
currentEntryLen := uint64(len(entry.GetKey())) + uint64(len(entry.GetValue()))
if currentEntryLen > bytesLimit {
break
}
@@ -129,7 +132,7 @@ func (ml *methodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
return truncated
}
func (ml *methodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) {
func (ml *TruncatingMethodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) {
if ml.messageMaxLen == maxUInt {
return false
}

View File

@@ -273,10 +273,10 @@ func (c *channel) deleteSelfFromMap() (delete bool) {
// deleteSelfIfReady tries to delete the channel itself from the channelz database.
// The delete process includes two steps:
// 1. delete the channel from the entry relation tree, i.e. delete the channel reference from its
// parent's child list.
// 2. delete the channel from the map, i.e. delete the channel entirely from channelz. Lookup by id
// will return entry not found error.
// 1. delete the channel from the entry relation tree, i.e. delete the channel reference from its
// parent's child list.
// 2. delete the channel from the map, i.e. delete the channel entirely from channelz. Lookup by id
// will return entry not found error.
func (c *channel) deleteSelfIfReady() {
if !c.deleteSelfFromTree() {
return
@@ -381,10 +381,10 @@ func (sc *subChannel) deleteSelfFromMap() (delete bool) {
// deleteSelfIfReady tries to delete the subchannel itself from the channelz database.
// The delete process includes two steps:
// 1. delete the subchannel from the entry relation tree, i.e. delete the subchannel reference from
// its parent's child list.
// 2. delete the subchannel from the map, i.e. delete the subchannel entirely from channelz. Lookup
// by id will return entry not found error.
// 1. delete the subchannel from the entry relation tree, i.e. delete the subchannel reference from
// its parent's child list.
// 2. delete the subchannel from the map, i.e. delete the subchannel entirely from channelz. Lookup
// by id will return entry not found error.
func (sc *subChannel) deleteSelfIfReady() {
if !sc.deleteSelfFromTree() {
return

View File

@@ -25,11 +25,15 @@ import (
)
const (
prefix = "GRPC_GO_"
txtErrIgnoreStr = prefix + "IGNORE_TXT_ERRORS"
prefix = "GRPC_GO_"
txtErrIgnoreStr = prefix + "IGNORE_TXT_ERRORS"
advertiseCompressorsStr = prefix + "ADVERTISE_COMPRESSORS"
)
var (
// TXTErrIgnore is set if TXT errors should be ignored ("GRPC_GO_IGNORE_TXT_ERRORS" is not "false").
TXTErrIgnore = !strings.EqualFold(os.Getenv(txtErrIgnoreStr), "false")
// AdvertiseCompressors is set if registered compressor should be advertised
// ("GRPC_GO_ADVERTISE_COMPRESSORS" is not "false").
AdvertiseCompressors = !strings.EqualFold(os.Getenv(advertiseCompressorsStr), "false")
)

View File

@@ -0,0 +1,36 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package envconfig
import "os"
const (
envObservabilityConfig = "GRPC_GCP_OBSERVABILITY_CONFIG"
envObservabilityConfigFile = "GRPC_GCP_OBSERVABILITY_CONFIG_FILE"
)
var (
// ObservabilityConfig is the json configuration for the gcp/observability
// package specified directly in the envObservabilityConfig env var.
ObservabilityConfig = os.Getenv(envObservabilityConfig)
// ObservabilityConfigFile is the json configuration for the
// gcp/observability specified in a file with the location specified in
// envObservabilityConfigFile env var.
ObservabilityConfigFile = os.Getenv(envObservabilityConfigFile)
)

View File

@@ -41,6 +41,7 @@ const (
clientSideSecuritySupportEnv = "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT"
aggregateAndDNSSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"
rbacSupportEnv = "GRPC_XDS_EXPERIMENTAL_RBAC"
outlierDetectionSupportEnv = "GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION"
federationEnv = "GRPC_EXPERIMENTAL_XDS_FEDERATION"
rlsInXDSEnv = "GRPC_EXPERIMENTAL_XDS_RLS_LB"
@@ -83,9 +84,9 @@ var (
// "GRPC_XDS_EXPERIMENTAL_RBAC" to "false".
XDSRBAC = !strings.EqualFold(os.Getenv(rbacSupportEnv), "false")
// XDSOutlierDetection indicates whether outlier detection support is
// enabled, which can be enabled by setting the environment variable
// "GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION" to "true".
XDSOutlierDetection = false
// enabled, which can be disabled by setting the environment variable
// "GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION" to "false".
XDSOutlierDetection = !strings.EqualFold(os.Getenv(outlierDetectionSupportEnv), "false")
// XDSFederation indicates whether federation support is enabled.
XDSFederation = strings.EqualFold(os.Getenv(federationEnv), "true")

View File

@@ -110,7 +110,7 @@ type LoggerV2 interface {
// This is a copy of the DepthLoggerV2 defined in the external grpclog package.
// It is defined here to avoid a circular dependency.
//
// Experimental
// # Experimental
//
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release.

View File

@@ -52,6 +52,13 @@ func Intn(n int) int {
return r.Intn(n)
}
// Int31n implements rand.Int31n on the grpcrand global source.
func Int31n(n int32) int32 {
mu.Lock()
defer mu.Unlock()
return r.Int31n(n)
}
// Float64 implements rand.Float64 on the grpcrand global source.
func Float64() float64 {
mu.Lock()

View File

@@ -0,0 +1,32 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpcsync
import (
"sync"
)
// OnceFunc returns a function wrapping f which ensures f is only executed
// once even if the returned function is executed multiple times.
func OnceFunc(f func()) func() {
var once sync.Once
return func() {
once.Do(f)
}
}

View File

@@ -0,0 +1,47 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpcutil
import (
"strings"
"google.golang.org/grpc/internal/envconfig"
)
// RegisteredCompressorNames holds names of the registered compressors.
var RegisteredCompressorNames []string
// IsCompressorNameRegistered returns true when name is available in registry.
func IsCompressorNameRegistered(name string) bool {
for _, compressor := range RegisteredCompressorNames {
if compressor == name {
return true
}
}
return false
}
// RegisteredCompressors returns a string of registered compressor names
// separated by comma.
func RegisteredCompressors() string {
if !envconfig.AdvertiseCompressors {
return ""
}
return strings.Join(RegisteredCompressorNames, ",")
}

View File

@@ -25,7 +25,6 @@ import (
// ParseMethod splits service and method from the input. It expects format
// "/service/method".
//
func ParseMethod(methodName string) (service, method string, _ error) {
if !strings.HasPrefix(methodName, "/") {
return "", "", errors.New("invalid method name: should start with /")

View File

@@ -63,20 +63,30 @@ var (
// xDS-enabled server invokes this method on a grpc.Server when a particular
// listener moves to "not-serving" mode.
DrainServerTransports interface{} // func(*grpc.Server, string)
// AddExtraServerOptions adds an array of ServerOption that will be
// AddGlobalServerOptions adds an array of ServerOption that will be
// effective globally for newly created servers. The priority will be: 1.
// user-provided; 2. this method; 3. default values.
AddExtraServerOptions interface{} // func(opt ...ServerOption)
// ClearExtraServerOptions clears the array of extra ServerOption. This
AddGlobalServerOptions interface{} // func(opt ...ServerOption)
// ClearGlobalServerOptions clears the array of extra ServerOption. This
// method is useful in testing and benchmarking.
ClearExtraServerOptions func()
// AddExtraDialOptions adds an array of DialOption that will be effective
ClearGlobalServerOptions func()
// AddGlobalDialOptions adds an array of DialOption that will be effective
// globally for newly created client channels. The priority will be: 1.
// user-provided; 2. this method; 3. default values.
AddExtraDialOptions interface{} // func(opt ...DialOption)
// ClearExtraDialOptions clears the array of extra DialOption. This
AddGlobalDialOptions interface{} // func(opt ...DialOption)
// ClearGlobalDialOptions clears the array of extra DialOption. This
// method is useful in testing and benchmarking.
ClearExtraDialOptions func()
ClearGlobalDialOptions func()
// JoinServerOptions combines the server options passed as arguments into a
// single server option.
JoinServerOptions interface{} // func(...grpc.ServerOption) grpc.ServerOption
// WithBinaryLogger returns a DialOption that specifies the binary logger
// for a ClientConn.
WithBinaryLogger interface{} // func(binarylog.Logger) grpc.DialOption
// BinaryLogger returns a ServerOption that can set the binary logger for a
// server.
BinaryLogger interface{} // func(binarylog.Logger) grpc.ServerOption
// NewXDSResolverWithConfigForTesting creates a new xds resolver builder using
// the provided xds bootstrap config instead of the global configuration from
@@ -117,22 +127,6 @@ var (
//
// TODO: Remove this function once the RBAC env var is removed.
UnregisterRBACHTTPFilterForTesting func()
// RegisterOutlierDetectionBalancerForTesting registers the Outlier
// Detection Balancer for testing purposes, regardless of the Outlier
// Detection environment variable.
//
// TODO: Remove this function once the Outlier Detection env var is removed.
RegisterOutlierDetectionBalancerForTesting func()
// UnregisterOutlierDetectionBalancerForTesting unregisters the Outlier
// Detection Balancer for testing purposes. This is needed because there is
// no way to unregister the Outlier Detection Balancer after registering it
// solely for testing purposes using
// RegisterOutlierDetectionBalancerForTesting().
//
// TODO: Remove this function once the Outlier Detection env var is removed.
UnregisterOutlierDetectionBalancerForTesting func()
)
// HealthChecker defines the signature of the client-side LB channel health checking function.

View File

@@ -140,10 +140,10 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts
disableServiceConfig: opts.DisableServiceConfig,
}
if target.Authority == "" {
if target.URL.Host == "" {
d.resolver = defaultResolver
} else {
d.resolver, err = customAuthorityResolver(target.Authority)
d.resolver, err = customAuthorityResolver(target.URL.Host)
if err != nil {
return nil, err
}

View File

@@ -20,13 +20,20 @@
// name without scheme back to gRPC as resolved address.
package passthrough
import "google.golang.org/grpc/resolver"
import (
"errors"
"google.golang.org/grpc/resolver"
)
const scheme = "passthrough"
type passthroughBuilder struct{}
func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
if target.Endpoint == "" && opts.Dialer == nil {
return nil, errors.New("passthrough: received empty target in Build()")
}
r := &passthroughResolver{
target: target,
cc: cc,

View File

@@ -34,8 +34,8 @@ type builder struct {
}
func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
if target.Authority != "" {
return nil, fmt.Errorf("invalid (non-empty) authority: %v", target.Authority)
if target.URL.Host != "" {
return nil, fmt.Errorf("invalid (non-empty) authority: %v", target.URL.Host)
}
// gRPC was parsing the dial target manually before PR #4817, and we
@@ -49,8 +49,9 @@ func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, _ resolv
}
addr := resolver.Address{Addr: endpoint}
if b.scheme == unixAbstractScheme {
// prepend "\x00" to address for unix-abstract
addr.Addr = "\x00" + addr.Addr
// We can not prepend \0 as c++ gRPC does, as in Golang '@' is used to signify we do
// not want trailing \0 in address.
addr.Addr = "@" + addr.Addr
}
cc.UpdateState(resolver.State{Addresses: []resolver.Address{networktype.Set(addr, "unix")}})
return &nopResolver{}, nil

View File

@@ -67,10 +67,10 @@ func (bc *BalancerConfig) MarshalJSON() ([]byte, error) {
// ServiceConfig contains a list of loadBalancingConfigs, each with a name and
// config. This method iterates through that list in order, and stops at the
// first policy that is supported.
// - If the config for the first supported policy is invalid, the whole service
// config is invalid.
// - If the list doesn't contain any supported policy, the whole service config
// is invalid.
// - If the config for the first supported policy is invalid, the whole service
// config is invalid.
// - If the list doesn't contain any supported policy, the whole service config
// is invalid.
func (bc *BalancerConfig) UnmarshalJSON(b []byte) error {
var ir intermediateBalancerConfig
err := json.Unmarshal(b, &ir)

View File

@@ -164,3 +164,13 @@ func (e *Error) Is(target error) bool {
}
return proto.Equal(e.s.s, tse.s.s)
}
// IsRestrictedControlPlaneCode returns whether the status includes a code
// restricted for control plane usage as defined by gRFC A54.
func IsRestrictedControlPlaneCode(s *Status) bool {
switch s.Code() {
case codes.InvalidArgument, codes.NotFound, codes.AlreadyExists, codes.FailedPrecondition, codes.Aborted, codes.OutOfRange, codes.DataLoss:
return true
}
return false
}

View File

@@ -191,7 +191,7 @@ type goAway struct {
code http2.ErrCode
debugData []byte
headsUp bool
closeConn bool
closeConn error // if set, loopyWriter will exit, resulting in conn closure
}
func (*goAway) isTransportResponseFrame() bool { return false }
@@ -209,6 +209,14 @@ type outFlowControlSizeRequest struct {
func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
// closeConnection is an instruction to tell the loopy writer to flush the
// framer and exit, which will cause the transport's connection to be closed
// (by the client or server). The transport itself will close after the reader
// encounters the EOF caused by the connection closure.
type closeConnection struct{}
func (closeConnection) isTransportResponseFrame() bool { return false }
type outStreamState int
const (
@@ -408,7 +416,7 @@ func (c *controlBuffer) get(block bool) (interface{}, error) {
select {
case <-c.ch:
case <-c.done:
return nil, ErrConnClosing
return nil, errors.New("transport closed by client")
}
}
}
@@ -519,18 +527,6 @@ const minBatchSize = 1000
// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
// if the batch size is too low to give stream goroutines a chance to fill it up.
func (l *loopyWriter) run() (err error) {
defer func() {
if err == ErrConnClosing {
// Don't log ErrConnClosing as error since it happens
// 1. When the connection is closed by some other known issue.
// 2. User closed the connection.
// 3. A graceful close of connection.
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter.run returning. %v", err)
}
err = nil
}
}()
for {
it, err := l.cbuf.get(true)
if err != nil {
@@ -574,7 +570,6 @@ func (l *loopyWriter) run() (err error) {
}
l.framer.writer.Flush()
break hasdata
}
}
}
@@ -662,11 +657,10 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error {
func (l *loopyWriter) originateStream(str *outStream) error {
hdr := str.itl.dequeue().(*headerFrame)
if err := hdr.initStream(str.id); err != nil {
if err == ErrConnClosing {
return err
if err == errStreamDrain { // errStreamDrain need not close transport
return nil
}
// Other errors(errStreamDrain) need not close transport.
return nil
return err
}
if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
return err
@@ -764,7 +758,7 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
}
}
if l.side == clientSide && l.draining && len(l.estdStreams) == 0 {
return ErrConnClosing
return errors.New("finished processing active streams while in draining mode")
}
return nil
}
@@ -799,7 +793,7 @@ func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
if l.side == clientSide {
l.draining = true
if len(l.estdStreams) == 0 {
return ErrConnClosing
return errors.New("received GOAWAY with no active streams")
}
}
return nil
@@ -817,6 +811,14 @@ func (l *loopyWriter) goAwayHandler(g *goAway) error {
return nil
}
func (l *loopyWriter) closeConnectionHandler() error {
l.framer.writer.Flush()
// Exit loopyWriter entirely by returning an error here. This will lead to
// the transport closing the connection, and, ultimately, transport
// closure.
return ErrConnClosing
}
func (l *loopyWriter) handle(i interface{}) error {
switch i := i.(type) {
case *incomingWindowUpdate:
@@ -845,6 +847,8 @@ func (l *loopyWriter) handle(i interface{}) error {
return l.goAwayHandler(i)
case *outFlowControlSizeRequest:
return l.outFlowControlSizeRequestHandler(i)
case closeConnection:
return l.closeConnectionHandler()
default:
return fmt.Errorf("transport: unknown control message type %T", i)
}
@@ -886,9 +890,9 @@ func (l *loopyWriter) processData() (bool, error) {
dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
// A data item is represented by a dataFrame, since it later translates into
// multiple HTTP2 data frames.
// Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data.
// Every dataFrame has two buffers; h that keeps grpc-message header and d that is actual data.
// As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
// maximum possilbe HTTP2 frame size.
// maximum possible HTTP2 frame size.
if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
// Client sends out empty data frame with endStream = true

View File

@@ -46,24 +46,32 @@ import (
"google.golang.org/grpc/status"
)
// NewServerHandlerTransport returns a ServerTransport handling gRPC
// from inside an http.Handler. It requires that the http Server
// supports HTTP/2.
// NewServerHandlerTransport returns a ServerTransport handling gRPC from
// inside an http.Handler, or writes an HTTP error to w and returns an error.
// It requires that the http Server supports HTTP/2.
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler) (ServerTransport, error) {
if r.ProtoMajor != 2 {
return nil, errors.New("gRPC requires HTTP/2")
msg := "gRPC requires HTTP/2"
http.Error(w, msg, http.StatusBadRequest)
return nil, errors.New(msg)
}
if r.Method != "POST" {
return nil, errors.New("invalid gRPC request method")
msg := fmt.Sprintf("invalid gRPC request method %q", r.Method)
http.Error(w, msg, http.StatusBadRequest)
return nil, errors.New(msg)
}
contentType := r.Header.Get("Content-Type")
// TODO: do we assume contentType is lowercase? we did before
contentSubtype, validContentType := grpcutil.ContentSubtype(contentType)
if !validContentType {
return nil, errors.New("invalid gRPC request content-type")
msg := fmt.Sprintf("invalid gRPC request content-type %q", contentType)
http.Error(w, msg, http.StatusBadRequest)
return nil, errors.New(msg)
}
if _, ok := w.(http.Flusher); !ok {
return nil, errors.New("gRPC requires a ResponseWriter supporting http.Flusher")
msg := "gRPC requires a ResponseWriter supporting http.Flusher"
http.Error(w, msg, http.StatusInternalServerError)
return nil, errors.New(msg)
}
st := &serverHandlerTransport{
@@ -79,7 +87,9 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s
if v := r.Header.Get("grpc-timeout"); v != "" {
to, err := decodeTimeout(v)
if err != nil {
return nil, status.Errorf(codes.Internal, "malformed time-out: %v", err)
msg := fmt.Sprintf("malformed time-out: %v", err)
http.Error(w, msg, http.StatusBadRequest)
return nil, status.Error(codes.Internal, msg)
}
st.timeoutSet = true
st.timeout = to
@@ -97,7 +107,9 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s
for _, v := range vv {
v, err := decodeMetadataHeader(k, v)
if err != nil {
return nil, status.Errorf(codes.Internal, "malformed binary metadata: %v", err)
msg := fmt.Sprintf("malformed binary metadata %q in header %q: %v", v, k, err)
http.Error(w, msg, http.StatusBadRequest)
return nil, status.Error(codes.Internal, msg)
}
metakv = append(metakv, k, v)
}
@@ -141,12 +153,15 @@ type serverHandlerTransport struct {
stats []stats.Handler
}
func (ht *serverHandlerTransport) Close() {
ht.closeOnce.Do(ht.closeCloseChanOnce)
func (ht *serverHandlerTransport) Close(err error) {
ht.closeOnce.Do(func() {
if logger.V(logLevel) {
logger.Infof("Closing serverHandlerTransport: %v", err)
}
close(ht.closedCh)
})
}
func (ht *serverHandlerTransport) closeCloseChanOnce() { close(ht.closedCh) }
func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) }
// strAddr is a net.Addr backed by either a TCP "ip:port" string, or
@@ -236,7 +251,7 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
})
}
}
ht.Close()
ht.Close(errors.New("finished writing status"))
return err
}
@@ -346,7 +361,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
case <-ht.req.Context().Done():
}
cancel()
ht.Close()
ht.Close(errors.New("request is done processing"))
}()
req := ht.req
@@ -442,10 +457,10 @@ func (ht *serverHandlerTransport) Drain() {
// mapRecvMsgError returns the non-nil err into the appropriate
// error value as expected by callers of *grpc.parser.recvMsg.
// In particular, in can only be:
// * io.EOF
// * io.ErrUnexpectedEOF
// * of type transport.ConnectionError
// * an error from the status package
// - io.EOF
// - io.ErrUnexpectedEOF
// - of type transport.ConnectionError
// - an error from the status package
func mapRecvMsgError(err error) error {
if err == io.EOF || err == io.ErrUnexpectedEOF {
return err

View File

@@ -38,8 +38,10 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/channelz"
icredentials "google.golang.org/grpc/internal/credentials"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpcutil"
imetadata "google.golang.org/grpc/internal/metadata"
istatus "google.golang.org/grpc/internal/status"
"google.golang.org/grpc/internal/syscall"
"google.golang.org/grpc/internal/transport/networktype"
"google.golang.org/grpc/keepalive"
@@ -57,11 +59,15 @@ var clientConnectionCounter uint64
// http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
ctx context.Context
cancel context.CancelFunc
ctxDone <-chan struct{} // Cache the ctx.Done() chan.
userAgent string
lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
ctx context.Context
cancel context.CancelFunc
ctxDone <-chan struct{} // Cache the ctx.Done() chan.
userAgent string
// address contains the resolver returned address for this transport.
// If the `ServerName` field is set, it takes precedence over `CallHdr.Host`
// passed to `NewStream`, when determining the :authority header.
address resolver.Address
md metadata.MD
conn net.Conn // underlying communication channel
loopy *loopyWriter
@@ -99,16 +105,13 @@ type http2Client struct {
maxSendHeaderListSize *uint32
bdpEst *bdpEstimator
// onPrefaceReceipt is a callback that client transport calls upon
// receiving server preface to signal that a succefull HTTP2
// connection was established.
onPrefaceReceipt func()
maxConcurrentStreams uint32
streamQuota int64
streamsQuotaAvailable chan struct{}
waitingStreams uint32
nextID uint32
registeredCompressors string
// Do not access controlBuf with mu held.
mu sync.Mutex // guard the following variables
@@ -194,7 +197,7 @@ func isTemporary(err error) bool {
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
scheme := "http"
ctx, cancel := context.WithCancel(ctx)
defer func() {
@@ -216,12 +219,38 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
}
// Any further errors will close the underlying connection
defer func(conn net.Conn) {
if err != nil {
conn.Close()
}
}(conn)
// The following defer and goroutine monitor the connectCtx for cancelation
// and deadline. On context expiration, the connection is hard closed and
// this function will naturally fail as a result. Otherwise, the defer
// waits for the goroutine to exit to prevent the context from being
// monitored (and to prevent the connection from ever being closed) after
// returning from this function.
ctxMonitorDone := grpcsync.NewEvent()
newClientCtx, newClientDone := context.WithCancel(connectCtx)
defer func() {
newClientDone() // Awaken the goroutine below if connectCtx hasn't expired.
<-ctxMonitorDone.Done() // Wait for the goroutine below to exit.
}()
go func(conn net.Conn) {
defer ctxMonitorDone.Fire() // Signal this goroutine has exited.
<-newClientCtx.Done() // Block until connectCtx expires or the defer above executes.
if err := connectCtx.Err(); err != nil {
// connectCtx expired before exiting the function. Hard close the connection.
if logger.V(logLevel) {
logger.Infof("newClientTransport: aborting due to connectCtx: %v", err)
}
conn.Close()
}
}(conn)
kp := opts.KeepaliveParams
// Validate keepalive parameters.
if kp.Time == 0 {
@@ -253,15 +282,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
}
if transportCreds != nil {
rawConn := conn
// Pull the deadline from the connectCtx, which will be used for
// timeouts in the authentication protocol handshake. Can ignore the
// boolean as the deadline will return the zero value, which will make
// the conn not timeout on I/O operations.
deadline, _ := connectCtx.Deadline()
rawConn.SetDeadline(deadline)
conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, rawConn)
rawConn.SetDeadline(time.Time{})
conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, conn)
if err != nil {
return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
}
@@ -299,6 +320,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
ctxDone: ctx.Done(), // Cache Done chan.
cancel: cancel,
userAgent: opts.UserAgent,
registeredCompressors: grpcutil.RegisteredCompressors(),
address: addr,
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
@@ -315,17 +338,18 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
kp: kp,
statsHandlers: opts.StatsHandlers,
initialWindowSize: initialWindowSize,
onPrefaceReceipt: onPrefaceReceipt,
nextID: 1,
maxConcurrentStreams: defaultMaxStreamsClient,
streamQuota: defaultMaxStreamsClient,
streamsQuotaAvailable: make(chan struct{}, 1),
czData: new(channelzData),
onGoAway: onGoAway,
onClose: onClose,
keepaliveEnabled: keepaliveEnabled,
bufferPool: newBufferPool(),
onClose: onClose,
}
// Add peer information to the http2client context.
t.ctx = peer.NewContext(t.ctx, t.getPeer())
if md, ok := addr.Metadata.(*metadata.MD); ok {
t.md = *md
@@ -361,21 +385,32 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
t.kpDormancyCond = sync.NewCond(&t.mu)
go t.keepalive()
}
// Start the reader goroutine for incoming message. Each transport has
// a dedicated goroutine which reads HTTP2 frame from network. Then it
// dispatches the frame to the corresponding stream entity.
go t.reader()
// Start the reader goroutine for incoming messages. Each transport has a
// dedicated goroutine which reads HTTP2 frames from the network. Then it
// dispatches the frame to the corresponding stream entity. When the
// server preface is received, readerErrCh is closed. If an error occurs
// first, an error is pushed to the channel. This must be checked before
// returning from this function.
readerErrCh := make(chan error, 1)
go t.reader(readerErrCh)
defer func() {
if err == nil {
err = <-readerErrCh
}
if err != nil {
t.Close(err)
}
}()
// Send connection preface to server.
n, err := t.conn.Write(clientPreface)
if err != nil {
err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
t.Close(err)
return nil, err
}
if n != len(clientPreface) {
err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
t.Close(err)
return nil, err
}
var ss []http2.Setting
@@ -395,14 +430,12 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
err = t.framer.fr.WriteSettings(ss...)
if err != nil {
err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
t.Close(err)
return nil, err
}
// Adjust the connection flow control window if needed.
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
t.Close(err)
return nil, err
}
}
@@ -415,10 +448,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
go func() {
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
err := t.loopy.run()
if err != nil {
if logger.V(logLevel) {
logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
}
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
}
// Do not close the transport. Let reader goroutine handle it since
// there might be data in the buffers.
@@ -469,7 +500,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
func (t *http2Client) getPeer() *peer.Peer {
return &peer.Peer{
Addr: t.remoteAddr,
AuthInfo: t.authInfo,
AuthInfo: t.authInfo, // Can be nil
}
}
@@ -505,9 +536,22 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
}
registeredCompressors := t.registeredCompressors
if callHdr.SendCompress != "" {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: callHdr.SendCompress})
// Include the outgoing compressor name when compressor is not registered
// via encoding.RegisterCompressor. This is possible when client uses
// WithCompressor dial option.
if !grpcutil.IsCompressorNameRegistered(callHdr.SendCompress) {
if registeredCompressors != "" {
registeredCompressors += ","
}
registeredCompressors += callHdr.SendCompress
}
}
if registeredCompressors != "" {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: registeredCompressors})
}
if dl, ok := ctx.Deadline(); ok {
// Send out timeout regardless its value. The server can detect timeout context by itself.
@@ -587,7 +631,11 @@ func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[s
for _, c := range t.perRPCCreds {
data, err := c.GetRequestMetadata(ctx, audience)
if err != nil {
if _, ok := status.FromError(err); ok {
if st, ok := status.FromError(err); ok {
// Restrict the code to the list allowed by gRFC A54.
if istatus.IsRestrictedControlPlaneCode(st) {
err = status.Errorf(codes.Internal, "transport: received per-RPC creds error with illegal status: %v", err)
}
return nil, err
}
@@ -616,7 +664,14 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
}
data, err := callCreds.GetRequestMetadata(ctx, audience)
if err != nil {
return nil, status.Errorf(codes.Internal, "transport: %v", err)
if st, ok := status.FromError(err); ok {
// Restrict the code to the list allowed by gRFC A54.
if istatus.IsRestrictedControlPlaneCode(st) {
err = status.Errorf(codes.Internal, "transport: received per-RPC creds error with illegal status: %v", err)
}
return nil, err
}
return nil, status.Errorf(codes.Internal, "transport: per-RPC creds failed due to error: %v", err)
}
callAuthData = make(map[string]string, len(data))
for k, v := range data {
@@ -632,13 +687,13 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
// NewStream errors result in transparent retry, as they mean nothing went onto
// the wire. However, there are two notable exceptions:
//
// 1. If the stream headers violate the max header list size allowed by the
// server. It's possible this could succeed on another transport, even if
// it's unlikely, but do not transparently retry.
// 2. If the credentials errored when requesting their headers. In this case,
// it's possible a retry can fix the problem, but indefinitely transparently
// retrying is not appropriate as it is likely the credentials, if they can
// eventually succeed, would need I/O to do so.
// 1. If the stream headers violate the max header list size allowed by the
// server. It's possible this could succeed on another transport, even if
// it's unlikely, but do not transparently retry.
// 2. If the credentials errored when requesting their headers. In this case,
// it's possible a retry can fix the problem, but indefinitely transparently
// retrying is not appropriate as it is likely the credentials, if they can
// eventually succeed, would need I/O to do so.
type NewStreamError struct {
Err error
@@ -653,6 +708,18 @@ func (e NewStreamError) Error() string {
// streams. All non-nil errors returned will be *NewStreamError.
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) {
ctx = peer.NewContext(ctx, t.getPeer())
// ServerName field of the resolver returned address takes precedence over
// Host field of CallHdr to determine the :authority header. This is because,
// the ServerName field takes precedence for server authentication during
// TLS handshake, and the :authority header should match the value used
// for server authentication.
if t.address.ServerName != "" {
newCallHdr := *callHdr
newCallHdr.Host = t.address.ServerName
callHdr = &newCallHdr
}
headerFields, err := t.createHeaderFields(ctx, callHdr)
if err != nil {
return nil, &NewStreamError{Err: err, AllowTransparentRetry: false}
@@ -878,19 +945,18 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
// Close kicks off the shutdown process of the transport. This should be called
// only once on a transport. Once it is called, the transport should not be
// accessed any more.
//
// This method blocks until the addrConn that initiated this transport is
// re-connected. This happens because t.onClose() begins reconnect logic at the
// addrConn level and blocks until the addrConn is successfully connected.
func (t *http2Client) Close(err error) {
t.mu.Lock()
// Make sure we only Close once.
// Make sure we only close once.
if t.state == closing {
t.mu.Unlock()
return
}
// Call t.onClose before setting the state to closing to prevent the client
// from attempting to create new streams ASAP.
if logger.V(logLevel) {
logger.Infof("transport: closing: %v", err)
}
// Call t.onClose ASAP to prevent the client from attempting to create new
// streams.
t.onClose()
t.state = closing
streams := t.activeStreams
@@ -941,11 +1007,14 @@ func (t *http2Client) GracefulClose() {
t.mu.Unlock()
return
}
if logger.V(logLevel) {
logger.Infof("transport: GracefulClose called")
}
t.state = draining
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close(ErrConnClosing)
t.Close(connectionErrorf(true, nil, "no active streams left to process while draining"))
return
}
t.controlBuf.put(&incomingGoAway{})
@@ -1230,18 +1299,29 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
if upperLimit == 0 { // This is the first GoAway Frame.
upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
}
t.prevGoAwayID = id
if len(t.activeStreams) == 0 {
t.mu.Unlock()
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
return
}
streamsToClose := make([]*Stream, 0)
for streamID, stream := range t.activeStreams {
if streamID > id && streamID <= upperLimit {
// The stream was unprocessed by the server.
atomic.StoreUint32(&stream.unprocessed, 1)
t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
if streamID > id && streamID <= upperLimit {
atomic.StoreUint32(&stream.unprocessed, 1)
streamsToClose = append(streamsToClose, stream)
}
}
}
t.prevGoAwayID = id
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
// Called outside t.mu because closeStream can take controlBuf's mu, which
// could induce deadlock and is not allowed.
for _, stream := range streamsToClose {
t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
}
}
@@ -1469,33 +1549,35 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, statusGen, mdata, true)
}
// reader runs as a separate goroutine in charge of reading data from network
// connection.
//
// TODO(zhaoq): currently one reader per transport. Investigate whether this is
// optimal.
// TODO(zhaoq): Check the validity of the incoming frame sequence.
func (t *http2Client) reader() {
defer close(t.readerDone)
// Check the validity of server preface.
// readServerPreface reads and handles the initial settings frame from the
// server.
func (t *http2Client) readServerPreface() error {
frame, err := t.framer.fr.ReadFrame()
if err != nil {
err = connectionErrorf(true, err, "error reading server preface: %v", err)
t.Close(err) // this kicks off resetTransport, so must be last before return
return
}
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
if t.keepaliveEnabled {
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
return connectionErrorf(true, err, "error reading server preface: %v", err)
}
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
// this kicks off resetTransport, so must be last before return
t.Close(connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame: %T", frame))
return connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame: %T", frame)
}
t.handleSettings(sf, true)
return nil
}
// reader verifies the server preface and reads all subsequent data from
// network connection. If the server preface is not read successfully, an
// error is pushed to errCh; otherwise errCh is closed with no error.
func (t *http2Client) reader(errCh chan<- error) {
defer close(t.readerDone)
if err := t.readServerPreface(); err != nil {
errCh <- err
return
}
t.onPrefaceReceipt()
t.handleSettings(sf, true)
close(errCh)
if t.keepaliveEnabled {
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
}
// loop to keep reading incoming messages on this transport.
for {

View File

@@ -21,6 +21,7 @@ package transport
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"math"
@@ -41,6 +42,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
@@ -101,13 +103,13 @@ type http2Server struct {
mu sync.Mutex // guard the following
// drainChan is initialized when Drain() is called the first time.
// After which the server writes out the first GoAway(with ID 2^31-1) frame.
// Then an independent goroutine will be launched to later send the second GoAway.
// During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
// Thus call to Drain() will be a no-op if drainChan is already initialized since draining is
// already underway.
drainChan chan struct{}
// drainEvent is initialized when Drain() is called the first time. After
// which the server writes out the first GoAway(with ID 2^31-1) frame. Then
// an independent goroutine will be launched to later send the second
// GoAway. During this time we don't want to write another first GoAway(with
// ID 2^31 -1) frame. Thus call to Drain() will be a no-op if drainEvent is
// already initialized since draining is already underway.
drainEvent *grpcsync.Event
state transportState
activeStreams map[uint32]*Stream
// idle is the time instant when the connection went idle.
@@ -265,6 +267,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
czData: new(channelzData),
bufferPool: newBufferPool(),
}
// Add peer information to the http2server context.
t.ctx = peer.NewContext(t.ctx, t.getPeer())
t.controlBuf = newControlBuffer(t.done)
if dynamicWindow {
t.bdpEst = &bdpEstimator{
@@ -290,7 +295,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
defer func() {
if err != nil {
t.Close()
t.Close(err)
}
}()
@@ -328,10 +333,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
go func() {
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
if err := t.loopy.run(); err != nil {
if logger.V(logLevel) {
logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
}
err := t.loopy.run()
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
}
t.conn.Close()
t.controlBuf.finish()
@@ -341,8 +345,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
return t, nil
}
// operateHeader takes action on the decoded headers.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
// operateHeaders takes action on the decoded headers. Returns an error if fatal
// error encountered and transport needs to close, otherwise returns nil.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) error {
// Acquire max stream ID lock for entire duration
t.maxStreamMu.Lock()
defer t.maxStreamMu.Unlock()
@@ -358,15 +363,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
rstCode: http2.ErrCodeFrameSize,
onWrite: func() {},
})
return false
return nil
}
if streamID%2 != 1 || streamID <= t.maxStreamID {
// illegal gRPC stream id.
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
}
return true
return fmt.Errorf("received an illegal stream id: %v. headers frame: %+v", streamID, frame)
}
t.maxStreamID = streamID
@@ -450,7 +452,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
status: status.New(codes.Internal, errMsg),
rst: !frame.StreamEnded(),
})
return false
return nil
}
if !isGRPC || headerError {
@@ -460,7 +462,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
rstCode: http2.ErrCodeProtocol,
onWrite: func() {},
})
return false
return nil
}
// "If :authority is missing, Host must be renamed to :authority." - A41
@@ -485,14 +487,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
} else {
s.ctx, s.cancel = context.WithCancel(t.ctx)
}
pr := &peer.Peer{
Addr: t.remoteAddr,
}
// Attach Auth info if there is any.
if t.authInfo != nil {
pr.AuthInfo = t.authInfo
}
s.ctx = peer.NewContext(s.ctx, pr)
// Attach the received metadata to the context.
if len(mdata) > 0 {
s.ctx = metadata.NewIncomingContext(s.ctx, mdata)
@@ -507,7 +502,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
if t.state != reachable {
t.mu.Unlock()
s.cancel()
return false
return nil
}
if uint32(len(t.activeStreams)) >= t.maxStreams {
t.mu.Unlock()
@@ -518,7 +513,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
onWrite: func() {},
})
s.cancel()
return false
return nil
}
if httpMethod != http.MethodPost {
t.mu.Unlock()
@@ -534,7 +529,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
rst: !frame.StreamEnded(),
})
s.cancel()
return false
return nil
}
if t.inTapHandle != nil {
var err error
@@ -554,7 +549,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
status: stat,
rst: !frame.StreamEnded(),
})
return false
return nil
}
}
t.activeStreams[streamID] = s
@@ -601,7 +596,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
wq: s.wq,
})
handle(s)
return false
return nil
}
// HandleStreams receives incoming streams using the given handler. This is
@@ -634,19 +629,16 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
continue
}
if err == io.EOF || err == io.ErrUnexpectedEOF {
t.Close()
t.Close(err)
return
}
if logger.V(logLevel) {
logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
}
t.Close()
t.Close(err)
return
}
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
if t.operateHeaders(frame, handle, traceCtx) {
t.Close()
if err := t.operateHeaders(frame, handle, traceCtx); err != nil {
t.Close(err)
break
}
case *http2.DataFrame:
@@ -847,8 +839,8 @@ const (
func (t *http2Server) handlePing(f *http2.PingFrame) {
if f.IsAck() {
if f.Data == goAwayPing.data && t.drainChan != nil {
close(t.drainChan)
if f.Data == goAwayPing.data && t.drainEvent != nil {
t.drainEvent.Fire()
return
}
// Maybe it's a BDP ping.
@@ -890,10 +882,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {
if t.pingStrikes > maxPingStrikes {
// Send goaway and close the connection.
if logger.V(logLevel) {
logger.Errorf("transport: Got too many pings from the client, closing the connection.")
}
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: errors.New("got too many pings from the client")})
}
}
@@ -1157,7 +1146,7 @@ func (t *http2Server) keepalive() {
if logger.V(logLevel) {
logger.Infof("transport: closing server transport due to maximum connection age.")
}
t.Close()
t.controlBuf.put(closeConnection{})
case <-t.done:
}
return
@@ -1173,10 +1162,7 @@ func (t *http2Server) keepalive() {
continue
}
if outstandingPing && kpTimeoutLeft <= 0 {
if logger.V(logLevel) {
logger.Infof("transport: closing server transport due to idleness.")
}
t.Close()
t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Time))
return
}
if !outstandingPing {
@@ -1203,12 +1189,15 @@ func (t *http2Server) keepalive() {
// Close starts shutting down the http2Server transport.
// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
// could cause some resource issue. Revisit this later.
func (t *http2Server) Close() {
func (t *http2Server) Close(err error) {
t.mu.Lock()
if t.state == closing {
t.mu.Unlock()
return
}
if logger.V(logLevel) {
logger.Infof("transport: closing: %v", err)
}
t.state = closing
streams := t.activeStreams
t.activeStreams = nil
@@ -1299,10 +1288,10 @@ func (t *http2Server) RemoteAddr() net.Addr {
func (t *http2Server) Drain() {
t.mu.Lock()
defer t.mu.Unlock()
if t.drainChan != nil {
if t.drainEvent != nil {
return
}
t.drainChan = make(chan struct{})
t.drainEvent = grpcsync.NewEvent()
t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte{}, headsUp: true})
}
@@ -1323,19 +1312,20 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
// Stop accepting more streams now.
t.state = draining
sid := t.maxStreamID
retErr := g.closeConn
if len(t.activeStreams) == 0 {
g.closeConn = true
retErr = errors.New("second GOAWAY written and no active streams left to process")
}
t.mu.Unlock()
t.maxStreamMu.Unlock()
if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
return false, err
}
if g.closeConn {
if retErr != nil {
// Abruptly close the connection following the GoAway (via
// loopywriter). But flush out what's inside the buffer first.
t.framer.writer.Flush()
return false, fmt.Errorf("transport: Connection closing")
return false, retErr
}
return true, nil
}
@@ -1357,7 +1347,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
timer := time.NewTimer(time.Minute)
defer timer.Stop()
select {
case <-t.drainChan:
case <-t.drainEvent.Done():
case <-timer.C:
case <-t.done:
return
@@ -1416,6 +1406,13 @@ func (t *http2Server) getOutFlowWindow() int64 {
}
}
func (t *http2Server) getPeer() *peer.Peer {
return &peer.Peer{
Addr: t.remoteAddr,
AuthInfo: t.authInfo, // Can be nil
}
}
func getJitter(v time.Duration) time.Duration {
if v == infinity {
return 0

View File

@@ -20,7 +20,6 @@ package transport
import (
"bufio"
"bytes"
"encoding/base64"
"fmt"
"io"
@@ -45,7 +44,7 @@ import (
const (
// http2MaxFrameLen specifies the max length of a HTTP2 frame.
http2MaxFrameLen = 16384 // 16KB frame
// http://http2.github.io/http2-spec/#SettingValues
// https://httpwg.org/specs/rfc7540.html#SettingValues
http2InitHeaderTableSize = 4096
)
@@ -251,13 +250,13 @@ func encodeGrpcMessage(msg string) string {
}
func encodeGrpcMessageUnchecked(msg string) string {
var buf bytes.Buffer
var sb strings.Builder
for len(msg) > 0 {
r, size := utf8.DecodeRuneInString(msg)
for _, b := range []byte(string(r)) {
if size > 1 {
// If size > 1, r is not ascii. Always do percent encoding.
buf.WriteString(fmt.Sprintf("%%%02X", b))
fmt.Fprintf(&sb, "%%%02X", b)
continue
}
@@ -266,14 +265,14 @@ func encodeGrpcMessageUnchecked(msg string) string {
//
// fmt.Sprintf("%%%02X", utf8.RuneError) gives "%FFFD".
if b >= spaceByte && b <= tildeByte && b != percentByte {
buf.WriteByte(b)
sb.WriteByte(b)
} else {
buf.WriteString(fmt.Sprintf("%%%02X", b))
fmt.Fprintf(&sb, "%%%02X", b)
}
}
msg = msg[size:]
}
return buf.String()
return sb.String()
}
// decodeGrpcMessage decodes the msg encoded by encodeGrpcMessage.
@@ -291,23 +290,23 @@ func decodeGrpcMessage(msg string) string {
}
func decodeGrpcMessageUnchecked(msg string) string {
var buf bytes.Buffer
var sb strings.Builder
lenMsg := len(msg)
for i := 0; i < lenMsg; i++ {
c := msg[i]
if c == percentByte && i+2 < lenMsg {
parsed, err := strconv.ParseUint(msg[i+1:i+3], 16, 8)
if err != nil {
buf.WriteByte(c)
sb.WriteByte(c)
} else {
buf.WriteByte(byte(parsed))
sb.WriteByte(byte(parsed))
i += 2
}
} else {
buf.WriteByte(c)
sb.WriteByte(c)
}
}
return buf.String()
return sb.String()
}
type bufWriter struct {

View File

@@ -43,6 +43,10 @@ import (
"google.golang.org/grpc/tap"
)
// ErrNoHeaders is used as a signal that a trailers only response was received,
// and is not a real error.
var ErrNoHeaders = errors.New("stream has no headers")
const logLevel = 2
type bufferPool struct {
@@ -366,9 +370,15 @@ func (s *Stream) Header() (metadata.MD, error) {
return s.header.Copy(), nil
}
s.waitOnHeader()
if !s.headerValid {
return nil, s.status.Err()
}
if s.noHeaders {
return nil, ErrNoHeaders
}
return s.header.Copy(), nil
}
@@ -573,8 +583,8 @@ type ConnectOptions struct {
// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
return newHTTP2Client(connectCtx, ctx, addr, opts, onPrefaceReceipt, onGoAway, onClose)
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
return newHTTP2Client(connectCtx, ctx, addr, opts, onGoAway, onClose)
}
// Options provides additional hints and information for message
@@ -691,7 +701,7 @@ type ServerTransport interface {
// Close tears down the transport. Once it is called, the transport
// should not be accessed any more. All the pending streams and their
// handlers will be terminated asynchronously.
Close()
Close(err error)
// RemoteAddr returns the remote network address.
RemoteAddr() net.Addr