chore(deps): bump google.golang.org/grpc from 1.53.0 to 1.56.3 (#5973)

Signed-off-by: hongzhouzi <hongzhouzi@kubesphere.io>
This commit is contained in:
hongzhouzi
2023-11-07 18:46:42 +08:00
committed by GitHub
parent 9725b664db
commit 05ccb4d9bc
54 changed files with 3652 additions and 1287 deletions

View File

@@ -28,8 +28,13 @@ import (
"google.golang.org/grpc/internal/grpcutil"
)
// Logger is the global binary logger. It can be used to get binary logger for
// each method.
var grpclogLogger = grpclog.Component("binarylog")
// Logger specifies MethodLoggers for method names with a Log call that
// takes a context.
//
// This is used in the 1.0 release of gcp/observability, and thus must not be
// deleted or changed.
type Logger interface {
GetMethodLogger(methodName string) MethodLogger
}
@@ -40,8 +45,6 @@ type Logger interface {
// It is used to get a MethodLogger for each individual method.
var binLogger Logger
var grpclogLogger = grpclog.Component("binarylog")
// SetLogger sets the binary logger.
//
// Only call this at init time.

View File

@@ -19,6 +19,7 @@
package binarylog
import (
"context"
"net"
"strings"
"sync/atomic"
@@ -48,8 +49,11 @@ func (g *callIDGenerator) reset() {
var idGen callIDGenerator
// MethodLogger is the sub-logger for each method.
//
// This is used in the 1.0 release of gcp/observability, and thus must not be
// deleted or changed.
type MethodLogger interface {
Log(LogEntryConfig)
Log(context.Context, LogEntryConfig)
}
// TruncatingMethodLogger is a method logger that truncates headers and messages
@@ -64,6 +68,9 @@ type TruncatingMethodLogger struct {
}
// NewTruncatingMethodLogger returns a new truncating method logger.
//
// This is used in the 1.0 release of gcp/observability, and thus must not be
// deleted or changed.
func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger {
return &TruncatingMethodLogger{
headerMaxLen: h,
@@ -98,7 +105,7 @@ func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *binlogpb.GrpcLogEntry
}
// Log creates a proto binary log entry, and logs it to the sink.
func (ml *TruncatingMethodLogger) Log(c LogEntryConfig) {
func (ml *TruncatingMethodLogger) Log(ctx context.Context, c LogEntryConfig) {
ml.sink.Write(ml.Build(c))
}
@@ -144,6 +151,9 @@ func (ml *TruncatingMethodLogger) truncateMessage(msgPb *binlogpb.Message) (trun
}
// LogEntryConfig represents the configuration for binary log entry.
//
// This is used in the 1.0 release of gcp/observability, and thus must not be
// deleted or changed.
type LogEntryConfig interface {
toProto() *binlogpb.GrpcLogEntry
}

View File

@@ -35,6 +35,7 @@ import "sync"
// internal/transport/transport.go for an example of this.
type Unbounded struct {
c chan interface{}
closed bool
mu sync.Mutex
backlog []interface{}
}
@@ -47,16 +48,18 @@ func NewUnbounded() *Unbounded {
// Put adds t to the unbounded buffer.
func (b *Unbounded) Put(t interface{}) {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return
}
if len(b.backlog) == 0 {
select {
case b.c <- t:
b.mu.Unlock()
return
default:
}
}
b.backlog = append(b.backlog, t)
b.mu.Unlock()
}
// Load sends the earliest buffered data, if any, onto the read channel
@@ -64,6 +67,10 @@ func (b *Unbounded) Put(t interface{}) {
// value from the read channel.
func (b *Unbounded) Load() {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return
}
if len(b.backlog) > 0 {
select {
case b.c <- b.backlog[0]:
@@ -72,7 +79,6 @@ func (b *Unbounded) Load() {
default:
}
}
b.mu.Unlock()
}
// Get returns a read channel on which values added to the buffer, via Put(),
@@ -80,6 +86,20 @@ func (b *Unbounded) Load() {
//
// Upon reading a value from this channel, users are expected to call Load() to
// send the next buffered value onto the channel if there is any.
//
// If the unbounded buffer is closed, the read channel returned by this method
// is closed.
func (b *Unbounded) Get() <-chan interface{} {
return b.c
}
// Close closes the unbounded buffer.
func (b *Unbounded) Close() {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return
}
b.closed = true
close(b.c)
}

View File

@@ -36,6 +36,10 @@ var (
// "GRPC_RING_HASH_CAP". This does not override the default bounds
// checking which NACKs configs specifying ring sizes > 8*1024*1024 (~8M).
RingHashCap = uint64FromEnv("GRPC_RING_HASH_CAP", 4096, 1, 8*1024*1024)
// PickFirstLBConfig is set if we should support configuration of the
// pick_first LB policy, which can be enabled by setting the environment
// variable "GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG" to "true".
PickFirstLBConfig = boolFromEnv("GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG", false)
)
func boolFromEnv(envVar string, def bool) bool {

View File

@@ -28,9 +28,15 @@ const (
var (
// ObservabilityConfig is the json configuration for the gcp/observability
// package specified directly in the envObservabilityConfig env var.
//
// This is used in the 1.0 release of gcp/observability, and thus must not be
// deleted or changed.
ObservabilityConfig = os.Getenv(envObservabilityConfig)
// ObservabilityConfigFile is the json configuration for the
// gcp/observability specified in a file with the location specified in
// envObservabilityConfigFile env var.
//
// This is used in the 1.0 release of gcp/observability, and thus must not be
// deleted or changed.
ObservabilityConfigFile = os.Getenv(envObservabilityConfigFile)
)

View File

@@ -61,11 +61,10 @@ var (
// have a brand new API on the server-side and users explicitly need to use
// the new API to get security integration on the server.
XDSClientSideSecurity = boolFromEnv("GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT", true)
// XDSAggregateAndDNS indicates whether processing of aggregated cluster
// and DNS cluster is enabled, which can be enabled by setting the
// environment variable
// "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" to
// "true".
// XDSAggregateAndDNS indicates whether processing of aggregated cluster and
// DNS cluster is enabled, which can be disabled by setting the environment
// variable "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"
// to "false".
XDSAggregateAndDNS = boolFromEnv("GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER", true)
// XDSRBAC indicates whether xDS configured RBAC HTTP Filter is enabled,
@@ -79,14 +78,18 @@ var (
// XDSFederation indicates whether federation support is enabled, which can
// be enabled by setting the environment variable
// "GRPC_EXPERIMENTAL_XDS_FEDERATION" to "true".
XDSFederation = boolFromEnv("GRPC_EXPERIMENTAL_XDS_FEDERATION", false)
XDSFederation = boolFromEnv("GRPC_EXPERIMENTAL_XDS_FEDERATION", true)
// XDSRLS indicates whether processing of Cluster Specifier plugins and
// support for the RLS CLuster Specifier is enabled, which can be enabled by
// support for the RLS CLuster Specifier is enabled, which can be disabled by
// setting the environment variable "GRPC_EXPERIMENTAL_XDS_RLS_LB" to
// "true".
XDSRLS = boolFromEnv("GRPC_EXPERIMENTAL_XDS_RLS_LB", false)
// "false".
XDSRLS = boolFromEnv("GRPC_EXPERIMENTAL_XDS_RLS_LB", true)
// C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing.
C2PResolverTestOnlyTrafficDirectorURI = os.Getenv("GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI")
// XDSCustomLBPolicy indicates whether Custom LB Policies are enabled, which
// can be disabled by setting the environment variable
// "GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG" to "false".
XDSCustomLBPolicy = boolFromEnv("GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG", true)
)

View File

@@ -63,6 +63,9 @@ func (pl *PrefixLogger) Errorf(format string, args ...interface{}) {
// Debugf does info logging at verbose level 2.
func (pl *PrefixLogger) Debugf(format string, args ...interface{}) {
// TODO(6044): Refactor interfaces LoggerV2 and DepthLogger, and maybe
// rewrite PrefixLogger a little to ensure that we don't use the global
// `Logger` here, and instead use the `logger` field.
if !Logger.V(2) {
return
}
@@ -73,6 +76,15 @@ func (pl *PrefixLogger) Debugf(format string, args ...interface{}) {
return
}
InfoDepth(1, fmt.Sprintf(format, args...))
}
// V reports whether verbosity level l is at least the requested verbose level.
func (pl *PrefixLogger) V(l int) bool {
// TODO(6044): Refactor interfaces LoggerV2 and DepthLogger, and maybe
// rewrite PrefixLogger a little to ensure that we don't use the global
// `Logger` here, and instead use the `logger` field.
return Logger.V(l)
}
// NewPrefixLogger creates a prefix logger with the given prefix.

View File

@@ -72,3 +72,17 @@ func Uint64() uint64 {
defer mu.Unlock()
return r.Uint64()
}
// Uint32 implements rand.Uint32 on the grpcrand global source.
func Uint32() uint32 {
mu.Lock()
defer mu.Unlock()
return r.Uint32()
}
// Shuffle implements rand.Shuffle on the grpcrand global source.
var Shuffle = func(n int, f func(int, int)) {
mu.Lock()
defer mu.Unlock()
r.Shuffle(n, f)
}

View File

@@ -0,0 +1,119 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpcsync
import (
"context"
"sync"
"google.golang.org/grpc/internal/buffer"
)
// CallbackSerializer provides a mechanism to schedule callbacks in a
// synchronized manner. It provides a FIFO guarantee on the order of execution
// of scheduled callbacks. New callbacks can be scheduled by invoking the
// Schedule() method.
//
// This type is safe for concurrent access.
type CallbackSerializer struct {
// Done is closed once the serializer is shut down completely, i.e all
// scheduled callbacks are executed and the serializer has deallocated all
// its resources.
Done chan struct{}
callbacks *buffer.Unbounded
closedMu sync.Mutex
closed bool
}
// NewCallbackSerializer returns a new CallbackSerializer instance. The provided
// context will be passed to the scheduled callbacks. Users should cancel the
// provided context to shutdown the CallbackSerializer. It is guaranteed that no
// callbacks will be added once this context is canceled, and any pending un-run
// callbacks will be executed before the serializer is shut down.
func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {
t := &CallbackSerializer{
Done: make(chan struct{}),
callbacks: buffer.NewUnbounded(),
}
go t.run(ctx)
return t
}
// Schedule adds a callback to be scheduled after existing callbacks are run.
//
// Callbacks are expected to honor the context when performing any blocking
// operations, and should return early when the context is canceled.
//
// Return value indicates if the callback was successfully added to the list of
// callbacks to be executed by the serializer. It is not possible to add
// callbacks once the context passed to NewCallbackSerializer is cancelled.
func (t *CallbackSerializer) Schedule(f func(ctx context.Context)) bool {
t.closedMu.Lock()
defer t.closedMu.Unlock()
if t.closed {
return false
}
t.callbacks.Put(f)
return true
}
func (t *CallbackSerializer) run(ctx context.Context) {
var backlog []func(context.Context)
defer close(t.Done)
for ctx.Err() == nil {
select {
case <-ctx.Done():
// Do nothing here. Next iteration of the for loop will not happen,
// since ctx.Err() would be non-nil.
case callback, ok := <-t.callbacks.Get():
if !ok {
return
}
t.callbacks.Load()
callback.(func(ctx context.Context))(ctx)
}
}
// Fetch pending callbacks if any, and execute them before returning from
// this method and closing t.Done.
t.closedMu.Lock()
t.closed = true
backlog = t.fetchPendingCallbacks()
t.callbacks.Close()
t.closedMu.Unlock()
for _, b := range backlog {
b(ctx)
}
}
func (t *CallbackSerializer) fetchPendingCallbacks() []func(context.Context) {
var backlog []func(context.Context)
for {
select {
case b := <-t.callbacks.Get():
backlog = append(backlog, b.(func(context.Context)))
t.callbacks.Load()
default:
return backlog
}
}
}

View File

@@ -58,6 +58,12 @@ var (
// gRPC server. An xDS-enabled server needs to know what type of credentials
// is configured on the underlying gRPC server. This is set by server.go.
GetServerCredentials interface{} // func (*grpc.Server) credentials.TransportCredentials
// CanonicalString returns the canonical string of the code defined here:
// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md.
//
// This is used in the 1.0 release of gcp/observability, and thus must not be
// deleted or changed.
CanonicalString interface{} // func (codes.Code) string
// DrainServerTransports initiates a graceful close of existing connections
// on a gRPC server accepted on the provided listener address. An
// xDS-enabled server invokes this method on a grpc.Server when a particular
@@ -66,16 +72,35 @@ var (
// AddGlobalServerOptions adds an array of ServerOption that will be
// effective globally for newly created servers. The priority will be: 1.
// user-provided; 2. this method; 3. default values.
//
// This is used in the 1.0 release of gcp/observability, and thus must not be
// deleted or changed.
AddGlobalServerOptions interface{} // func(opt ...ServerOption)
// ClearGlobalServerOptions clears the array of extra ServerOption. This
// method is useful in testing and benchmarking.
//
// This is used in the 1.0 release of gcp/observability, and thus must not be
// deleted or changed.
ClearGlobalServerOptions func()
// AddGlobalDialOptions adds an array of DialOption that will be effective
// globally for newly created client channels. The priority will be: 1.
// user-provided; 2. this method; 3. default values.
//
// This is used in the 1.0 release of gcp/observability, and thus must not be
// deleted or changed.
AddGlobalDialOptions interface{} // func(opt ...DialOption)
// DisableGlobalDialOptions returns a DialOption that prevents the
// ClientConn from applying the global DialOptions (set via
// AddGlobalDialOptions).
//
// This is used in the 1.0 release of gcp/observability, and thus must not be
// deleted or changed.
DisableGlobalDialOptions interface{} // func() grpc.DialOption
// ClearGlobalDialOptions clears the array of extra DialOption. This
// method is useful in testing and benchmarking.
//
// This is used in the 1.0 release of gcp/observability, and thus must not be
// deleted or changed.
ClearGlobalDialOptions func()
// JoinDialOptions combines the dial options passed as arguments into a
// single dial option.
@@ -86,9 +111,15 @@ var (
// WithBinaryLogger returns a DialOption that specifies the binary logger
// for a ClientConn.
//
// This is used in the 1.0 release of gcp/observability, and thus must not be
// deleted or changed.
WithBinaryLogger interface{} // func(binarylog.Logger) grpc.DialOption
// BinaryLogger returns a ServerOption that can set the binary logger for a
// server.
//
// This is used in the 1.0 release of gcp/observability, and thus must not be
// deleted or changed.
BinaryLogger interface{} // func(binarylog.Logger) grpc.ServerOption
// NewXDSResolverWithConfigForTesting creates a new xds resolver builder using
@@ -130,6 +161,9 @@ var (
//
// TODO: Remove this function once the RBAC env var is removed.
UnregisterRBACHTTPFilterForTesting func()
// ORCAAllowAnyMinReportingInterval is for examples/orca use ONLY.
ORCAAllowAnyMinReportingInterval interface{} // func(so *orca.ServiceOptions)
)
// HealthChecker defines the signature of the client-side LB channel health checking function.

View File

@@ -76,33 +76,11 @@ func Set(addr resolver.Address, md metadata.MD) resolver.Address {
return addr
}
// Validate returns an error if the input md contains invalid keys or values.
//
// If the header is not a pseudo-header, the following items are checked:
// - header names must contain one or more characters from this set [0-9 a-z _ - .].
// - if the header-name ends with a "-bin" suffix, no validation of the header value is performed.
// - otherwise, the header value must contain one or more characters from the set [%x20-%x7E].
// Validate validates every pair in md with ValidatePair.
func Validate(md metadata.MD) error {
for k, vals := range md {
// pseudo-header will be ignored
if k[0] == ':' {
continue
}
// check key, for i that saving a conversion if not using for range
for i := 0; i < len(k); i++ {
r := k[i]
if !(r >= 'a' && r <= 'z') && !(r >= '0' && r <= '9') && r != '.' && r != '-' && r != '_' {
return fmt.Errorf("header key %q contains illegal characters not in [0-9a-z-_.]", k)
}
}
if strings.HasSuffix(k, "-bin") {
continue
}
// check value
for _, val := range vals {
if hasNotPrintable(val) {
return fmt.Errorf("header key %q contains value with non-printable ASCII characters", k)
}
if err := ValidatePair(k, vals...); err != nil {
return err
}
}
return nil
@@ -118,3 +96,37 @@ func hasNotPrintable(msg string) bool {
}
return false
}
// ValidatePair validate a key-value pair with the following rules (the pseudo-header will be skipped) :
//
// - key must contain one or more characters.
// - the characters in the key must be contained in [0-9 a-z _ - .].
// - if the key ends with a "-bin" suffix, no validation of the corresponding value is performed.
// - the characters in the every value must be printable (in [%x20-%x7E]).
func ValidatePair(key string, vals ...string) error {
// key should not be empty
if key == "" {
return fmt.Errorf("there is an empty key in the header")
}
// pseudo-header will be ignored
if key[0] == ':' {
return nil
}
// check key, for i that saving a conversion if not using for range
for i := 0; i < len(key); i++ {
r := key[i]
if !(r >= 'a' && r <= 'z') && !(r >= '0' && r <= '9') && r != '.' && r != '-' && r != '_' {
return fmt.Errorf("header key %q contains illegal characters not in [0-9a-z-_.]", key)
}
}
if strings.HasSuffix(key, "-bin") {
return nil
}
// check value
for _, val := range vals {
if hasNotPrintable(val) {
return fmt.Errorf("header key %q contains value with non-printable ASCII characters", key)
}
}
return nil
}

View File

@@ -0,0 +1,130 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package serviceconfig
import (
"encoding/json"
"fmt"
"math"
"strconv"
"strings"
"time"
)
// Duration defines JSON marshal and unmarshal methods to conform to the
// protobuf JSON spec defined [here].
//
// [here]: https://protobuf.dev/reference/protobuf/google.protobuf/#duration
type Duration time.Duration
func (d Duration) String() string {
return fmt.Sprint(time.Duration(d))
}
// MarshalJSON converts from d to a JSON string output.
func (d Duration) MarshalJSON() ([]byte, error) {
ns := time.Duration(d).Nanoseconds()
sec := ns / int64(time.Second)
ns = ns % int64(time.Second)
var sign string
if sec < 0 || ns < 0 {
sign, sec, ns = "-", -1*sec, -1*ns
}
// Generated output always contains 0, 3, 6, or 9 fractional digits,
// depending on required precision.
str := fmt.Sprintf("%s%d.%09d", sign, sec, ns)
str = strings.TrimSuffix(str, "000")
str = strings.TrimSuffix(str, "000")
str = strings.TrimSuffix(str, ".000")
return []byte(fmt.Sprintf("\"%ss\"", str)), nil
}
// UnmarshalJSON unmarshals b as a duration JSON string into d.
func (d *Duration) UnmarshalJSON(b []byte) error {
var s string
if err := json.Unmarshal(b, &s); err != nil {
return err
}
if !strings.HasSuffix(s, "s") {
return fmt.Errorf("malformed duration %q: missing seconds unit", s)
}
neg := false
if s[0] == '-' {
neg = true
s = s[1:]
}
ss := strings.SplitN(s[:len(s)-1], ".", 3)
if len(ss) > 2 {
return fmt.Errorf("malformed duration %q: too many decimals", s)
}
// hasDigits is set if either the whole or fractional part of the number is
// present, since both are optional but one is required.
hasDigits := false
var sec, ns int64
if len(ss[0]) > 0 {
var err error
if sec, err = strconv.ParseInt(ss[0], 10, 64); err != nil {
return fmt.Errorf("malformed duration %q: %v", s, err)
}
// Maximum seconds value per the durationpb spec.
const maxProtoSeconds = 315_576_000_000
if sec > maxProtoSeconds {
return fmt.Errorf("out of range: %q", s)
}
hasDigits = true
}
if len(ss) == 2 && len(ss[1]) > 0 {
if len(ss[1]) > 9 {
return fmt.Errorf("malformed duration %q: too many digits after decimal", s)
}
var err error
if ns, err = strconv.ParseInt(ss[1], 10, 64); err != nil {
return fmt.Errorf("malformed duration %q: %v", s, err)
}
for i := 9; i > len(ss[1]); i-- {
ns *= 10
}
hasDigits = true
}
if !hasDigits {
return fmt.Errorf("malformed duration %q: contains no numbers", s)
}
if neg {
sec *= -1
ns *= -1
}
// Maximum/minimum seconds/nanoseconds representable by Go's time.Duration.
const maxSeconds = math.MaxInt64 / int64(time.Second)
const maxNanosAtMaxSeconds = math.MaxInt64 % int64(time.Second)
const minSeconds = math.MinInt64 / int64(time.Second)
const minNanosAtMinSeconds = math.MinInt64 % int64(time.Second)
if sec > maxSeconds || (sec == maxSeconds && ns >= maxNanosAtMaxSeconds) {
*d = Duration(math.MaxInt64)
} else if sec < minSeconds || (sec == minSeconds && ns <= minNanosAtMinSeconds) {
*d = Duration(math.MinInt64)
} else {
*d = Duration(sec*int64(time.Second) + ns)
}
return nil
}

View File

@@ -22,6 +22,7 @@ import (
"bytes"
"errors"
"fmt"
"net"
"runtime"
"strconv"
"sync"
@@ -29,6 +30,7 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/status"
)
@@ -486,12 +488,14 @@ type loopyWriter struct {
hEnc *hpack.Encoder // HPACK encoder.
bdpEst *bdpEstimator
draining bool
conn net.Conn
logger *grpclog.PrefixLogger
// Side-specific handlers
ssGoAwayHandler func(*goAway) (bool, error)
}
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter {
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger) *loopyWriter {
var buf bytes.Buffer
l := &loopyWriter{
side: s,
@@ -504,6 +508,8 @@ func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimato
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
bdpEst: bdpEst,
conn: conn,
logger: logger,
}
return l
}
@@ -521,15 +527,27 @@ const minBatchSize = 1000
// 2. Stream level flow control quota available.
//
// In each iteration of run loop, other than processing the incoming control
// frame, loopy calls processData, which processes one node from the activeStreams linked-list.
// This results in writing of HTTP2 frames into an underlying write buffer.
// When there's no more control frames to read from controlBuf, loopy flushes the write buffer.
// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
// if the batch size is too low to give stream goroutines a chance to fill it up.
// frame, loopy calls processData, which processes one node from the
// activeStreams linked-list. This results in writing of HTTP2 frames into an
// underlying write buffer. When there's no more control frames to read from
// controlBuf, loopy flushes the write buffer. As an optimization, to increase
// the batch size for each flush, loopy yields the processor, once if the batch
// size is too low to give stream goroutines a chance to fill it up.
//
// Upon exiting, if the error causing the exit is not an I/O error, run()
// flushes and closes the underlying connection. Otherwise, the connection is
// left open to allow the I/O error to be encountered by the reader instead.
func (l *loopyWriter) run() (err error) {
// Always flush the writer before exiting in case there are pending frames
// to be sent.
defer l.framer.writer.Flush()
defer func() {
if l.logger.V(logLevel) {
l.logger.Infof("loopyWriter exiting with error: %v", err)
}
if !isIOError(err) {
l.framer.writer.Flush()
l.conn.Close()
}
l.cbuf.finish()
}()
for {
it, err := l.cbuf.get(true)
if err != nil {
@@ -581,11 +599,11 @@ func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error
return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
}
func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error {
func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) {
// Otherwise update the quota.
if w.streamID == 0 {
l.sendQuota += w.increment
return nil
return
}
// Find the stream and update it.
if str, ok := l.estdStreams[w.streamID]; ok {
@@ -593,10 +611,9 @@ func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error
if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
str.state = active
l.activeStreams.enqueue(str)
return nil
return
}
}
return nil
}
func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
@@ -604,13 +621,11 @@ func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
}
func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
if err := l.applySettings(s.ss); err != nil {
return err
}
l.applySettings(s.ss)
return l.framer.fr.WriteSettingsAck()
}
func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
func (l *loopyWriter) registerStreamHandler(h *registerStream) {
str := &outStream{
id: h.streamID,
state: empty,
@@ -618,15 +633,14 @@ func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
wq: h.wq,
}
l.estdStreams[h.streamID] = str
return nil
}
func (l *loopyWriter) headerHandler(h *headerFrame) error {
if l.side == serverSide {
str, ok := l.estdStreams[h.streamID]
if !ok {
if logger.V(logLevel) {
logger.Warningf("transport: loopy doesn't recognize the stream: %d", h.streamID)
if l.logger.V(logLevel) {
l.logger.Infof("Unrecognized streamID %d in loopyWriter", h.streamID)
}
return nil
}
@@ -681,8 +695,8 @@ func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.He
l.hBuf.Reset()
for _, f := range hf {
if err := l.hEnc.WriteField(f); err != nil {
if logger.V(logLevel) {
logger.Warningf("transport: loopyWriter.writeHeader encountered error while encoding headers: %v", err)
if l.logger.V(logLevel) {
l.logger.Warningf("Encountered error while encoding headers: %v", err)
}
}
}
@@ -720,10 +734,10 @@ func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.He
return nil
}
func (l *loopyWriter) preprocessData(df *dataFrame) error {
func (l *loopyWriter) preprocessData(df *dataFrame) {
str, ok := l.estdStreams[df.streamID]
if !ok {
return nil
return
}
// If we got data for a stream it means that
// stream was originated and the headers were sent out.
@@ -732,7 +746,6 @@ func (l *loopyWriter) preprocessData(df *dataFrame) error {
str.state = active
l.activeStreams.enqueue(str)
}
return nil
}
func (l *loopyWriter) pingHandler(p *ping) error {
@@ -743,9 +756,8 @@ func (l *loopyWriter) pingHandler(p *ping) error {
}
func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) error {
func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) {
o.resp <- l.sendQuota
return nil
}
func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
@@ -763,6 +775,7 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
}
}
if l.draining && len(l.estdStreams) == 0 {
// Flush and close the connection; we are done with it.
return errors.New("finished processing active streams while in draining mode")
}
return nil
@@ -798,6 +811,7 @@ func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
if l.side == clientSide {
l.draining = true
if len(l.estdStreams) == 0 {
// Flush and close the connection; we are done with it.
return errors.New("received GOAWAY with no active streams")
}
}
@@ -816,17 +830,10 @@ func (l *loopyWriter) goAwayHandler(g *goAway) error {
return nil
}
func (l *loopyWriter) closeConnectionHandler() error {
// Exit loopyWriter entirely by returning an error here. This will lead to
// the transport closing the connection, and, ultimately, transport
// closure.
return ErrConnClosing
}
func (l *loopyWriter) handle(i interface{}) error {
switch i := i.(type) {
case *incomingWindowUpdate:
return l.incomingWindowUpdateHandler(i)
l.incomingWindowUpdateHandler(i)
case *outgoingWindowUpdate:
return l.outgoingWindowUpdateHandler(i)
case *incomingSettings:
@@ -836,7 +843,7 @@ func (l *loopyWriter) handle(i interface{}) error {
case *headerFrame:
return l.headerHandler(i)
case *registerStream:
return l.registerStreamHandler(i)
l.registerStreamHandler(i)
case *cleanupStream:
return l.cleanupStreamHandler(i)
case *earlyAbortStream:
@@ -844,21 +851,24 @@ func (l *loopyWriter) handle(i interface{}) error {
case *incomingGoAway:
return l.incomingGoAwayHandler(i)
case *dataFrame:
return l.preprocessData(i)
l.preprocessData(i)
case *ping:
return l.pingHandler(i)
case *goAway:
return l.goAwayHandler(i)
case *outFlowControlSizeRequest:
return l.outFlowControlSizeRequestHandler(i)
l.outFlowControlSizeRequestHandler(i)
case closeConnection:
return l.closeConnectionHandler()
// Just return a non-I/O error and run() will flush and close the
// connection.
return ErrConnClosing
default:
return fmt.Errorf("transport: unknown control message type %T", i)
}
return nil
}
func (l *loopyWriter) applySettings(ss []http2.Setting) error {
func (l *loopyWriter) applySettings(ss []http2.Setting) {
for _, s := range ss {
switch s.ID {
case http2.SettingInitialWindowSize:
@@ -877,7 +887,6 @@ func (l *loopyWriter) applySettings(ss []http2.Setting) error {
updateHeaderTblSize(l.hEnc, s.Val)
}
}
return nil
}
// processData removes the first stream from active streams, writes out at most 16KB
@@ -911,7 +920,7 @@ func (l *loopyWriter) processData() (bool, error) {
return false, err
}
if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
return false, nil
return false, err
}
} else {
l.activeStreams.enqueue(str)

View File

@@ -39,6 +39,7 @@ import (
"golang.org/x/net/http2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
@@ -83,6 +84,7 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s
contentSubtype: contentSubtype,
stats: stats,
}
st.logger = prefixLoggerForServerHandlerTransport(st)
if v := r.Header.Get("grpc-timeout"); v != "" {
to, err := decodeTimeout(v)
@@ -150,13 +152,14 @@ type serverHandlerTransport struct {
// TODO make sure this is consistent across handler_server and http2_server
contentSubtype string
stats []stats.Handler
stats []stats.Handler
logger *grpclog.PrefixLogger
}
func (ht *serverHandlerTransport) Close(err error) {
ht.closeOnce.Do(func() {
if logger.V(logLevel) {
logger.Infof("Closing serverHandlerTransport: %v", err)
if ht.logger.V(logLevel) {
ht.logger.Infof("Closing: %v", err)
}
close(ht.closedCh)
})
@@ -450,7 +453,7 @@ func (ht *serverHandlerTransport) IncrMsgSent() {}
func (ht *serverHandlerTransport) IncrMsgRecv() {}
func (ht *serverHandlerTransport) Drain() {
func (ht *serverHandlerTransport) Drain(debugData string) {
panic("Drain() is not implemented")
}

View File

@@ -38,6 +38,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/channelz"
icredentials "google.golang.org/grpc/internal/credentials"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpcutil"
imetadata "google.golang.org/grpc/internal/metadata"
@@ -145,6 +146,7 @@ type http2Client struct {
bufferPool *bufferPool
connectionID uint64
logger *grpclog.PrefixLogger
}
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) {
@@ -244,7 +246,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
if err := connectCtx.Err(); err != nil {
// connectCtx expired before exiting the function. Hard close the connection.
if logger.V(logLevel) {
logger.Infof("newClientTransport: aborting due to connectCtx: %v", err)
logger.Infof("Aborting due to connect deadline expiring: %v", err)
}
conn.Close()
}
@@ -346,6 +348,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
bufferPool: newBufferPool(),
onClose: onClose,
}
t.logger = prefixLoggerForClientTransport(t)
// Add peer information to the http2client context.
t.ctx = peer.NewContext(t.ctx, t.getPeer())
@@ -444,15 +447,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
return nil, err
}
go func() {
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
err := t.loopy.run()
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
}
// Do not close the transport. Let reader goroutine handle it since
// there might be data in the buffers.
t.conn.Close()
t.controlBuf.finish()
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
t.loopy.run()
close(t.writerDone)
}()
return t, nil
@@ -789,7 +785,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
s.id = h.streamID
s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
t.mu.Lock()
if t.activeStreams == nil { // Can be niled from Close().
if t.state == draining || t.activeStreams == nil { // Can be niled from Close().
t.mu.Unlock()
return false // Don't create a stream if the transport is already closed.
}
@@ -866,8 +862,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
}
}
if transportDrainRequired {
if logger.V(logLevel) {
logger.Infof("transport: t.nextID > MaxStreamID. Draining")
if t.logger.V(logLevel) {
t.logger.Infof("Draining transport: t.nextID > MaxStreamID")
}
t.GracefulClose()
}
@@ -959,8 +955,8 @@ func (t *http2Client) Close(err error) {
t.mu.Unlock()
return
}
if logger.V(logLevel) {
logger.Infof("transport: closing: %v", err)
if t.logger.V(logLevel) {
t.logger.Infof("Closing: %v", err)
}
// Call t.onClose ASAP to prevent the client from attempting to create new
// streams.
@@ -1016,8 +1012,8 @@ func (t *http2Client) GracefulClose() {
t.mu.Unlock()
return
}
if logger.V(logLevel) {
logger.Infof("transport: GracefulClose called")
if t.logger.V(logLevel) {
t.logger.Infof("GracefulClose called")
}
t.onClose(GoAwayInvalid)
t.state = draining
@@ -1181,8 +1177,8 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
}
statusCode, ok := http2ErrConvTab[f.ErrCode]
if !ok {
if logger.V(logLevel) {
logger.Warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error: %v", f.ErrCode)
if t.logger.V(logLevel) {
t.logger.Infof("Received a RST_STREAM frame with code %q, but found no mapped gRPC status", f.ErrCode)
}
statusCode = codes.Unknown
}
@@ -1264,10 +1260,12 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
t.mu.Unlock()
return
}
if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
if logger.V(logLevel) {
logger.Infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
}
if f.ErrCode == http2.ErrCodeEnhanceYourCalm && string(f.DebugData()) == "too_many_pings" {
// When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
// data equal to ASCII "too_many_pings", it should log the occurrence at a log level that is
// enabled by default and double the configure KEEPALIVE_TIME used for new connections
// on that channel.
logger.Errorf("Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII \"too_many_pings\".")
}
id := f.LastStreamID
if id > 0 && id%2 == 0 {
@@ -1339,7 +1337,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
// setGoAwayReason sets the value of t.goAwayReason based
// on the GoAway frame received.
// It expects a lock on transport's mutext to be held by
// It expects a lock on transport's mutex to be held by
// the caller.
func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
t.goAwayReason = GoAwayNoReason

View File

@@ -35,7 +35,9 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/internal/syscall"
"google.golang.org/grpc/codes"
@@ -129,6 +131,8 @@ type http2Server struct {
// This lock may not be taken if mu is already held.
maxStreamMu sync.Mutex
maxStreamID uint32 // max stream ID ever seen
logger *grpclog.PrefixLogger
}
// NewServerTransport creates a http2 transport with conn and configuration
@@ -167,15 +171,10 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
ID: http2.SettingMaxFrameSize,
Val: http2MaxFrameLen,
}}
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
// permitted in the HTTP2 spec.
maxStreams := config.MaxStreams
if maxStreams == 0 {
maxStreams = math.MaxUint32
} else {
if config.MaxStreams != math.MaxUint32 {
isettings = append(isettings, http2.Setting{
ID: http2.SettingMaxConcurrentStreams,
Val: maxStreams,
Val: config.MaxStreams,
})
}
dynamicWindow := true
@@ -254,7 +253,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
framer: framer,
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
maxStreams: maxStreams,
maxStreams: config.MaxStreams,
inTapHandle: config.InTapHandle,
fc: &trInFlow{limit: uint32(icwz)},
state: reachable,
@@ -267,6 +266,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
czData: new(channelzData),
bufferPool: newBufferPool(),
}
t.logger = prefixLoggerForServerTransport(t)
// Add peer information to the http2server context.
t.ctx = peer.NewContext(t.ctx, t.getPeer())
@@ -331,14 +331,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
t.handleSettings(sf)
go func() {
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
err := t.loopy.run()
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
}
t.conn.Close()
t.controlBuf.finish()
t.loopy.run()
close(t.writerDone)
}()
go t.keepalive()
@@ -383,7 +378,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
// if false, content-type was missing or invalid
isGRPC = false
contentType = ""
mdata = make(map[string][]string)
mdata = make(metadata.MD, len(frame.Fields))
httpMethod string
// these are set if an error is encountered while parsing the headers
protocolError bool
@@ -404,6 +399,17 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
s.contentSubtype = contentSubtype
isGRPC = true
case "grpc-accept-encoding":
mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
if hf.Value == "" {
continue
}
compressors := hf.Value
if s.clientAdvertisedCompressors != "" {
compressors = s.clientAdvertisedCompressors + "," + compressors
}
s.clientAdvertisedCompressors = compressors
case "grpc-encoding":
s.recvCompress = hf.Value
case ":method":
@@ -419,8 +425,8 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
// "Transports must consider requests containing the Connection header
// as malformed." - A41
case "connection":
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.operateHeaders parsed a :connection header which makes a request malformed as per the HTTP/2 spec")
if t.logger.V(logLevel) {
t.logger.Infof("Received a HEADERS frame with a :connection header which makes the request malformed, as per the HTTP/2 spec")
}
protocolError = true
default:
@@ -430,7 +436,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
v, err := decodeMetadataHeader(hf.Name, hf.Value)
if err != nil {
headerError = status.Newf(codes.Internal, "malformed binary metadata %q in header %q: %v", hf.Value, hf.Name, err)
logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
t.logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
break
}
mdata[hf.Name] = append(mdata[hf.Name], v)
@@ -444,8 +450,8 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
// error, this takes precedence over a client not speaking gRPC.
if len(mdata[":authority"]) > 1 || len(mdata["host"]) > 1 {
errMsg := fmt.Sprintf("num values of :authority: %v, num values of host: %v, both must only have 1 value as per HTTP/2 spec", len(mdata[":authority"]), len(mdata["host"]))
if logger.V(logLevel) {
logger.Errorf("transport: %v", errMsg)
if t.logger.V(logLevel) {
t.logger.Infof("Aborting the stream early: %v", errMsg)
}
t.controlBuf.put(&earlyAbortStream{
httpStatus: http.StatusBadRequest,
@@ -539,9 +545,9 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
if httpMethod != http.MethodPost {
t.mu.Unlock()
errMsg := fmt.Sprintf("http2Server.operateHeaders parsed a :method field: %v which should be POST", httpMethod)
if logger.V(logLevel) {
logger.Infof("transport: %v", errMsg)
errMsg := fmt.Sprintf("Received a HEADERS frame with :method %q which should be POST", httpMethod)
if t.logger.V(logLevel) {
t.logger.Infof("Aborting the stream early: %v", errMsg)
}
t.controlBuf.put(&earlyAbortStream{
httpStatus: 405,
@@ -557,8 +563,8 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
var err error
if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: s.method}); err != nil {
t.mu.Unlock()
if logger.V(logLevel) {
logger.Infof("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
if t.logger.V(logLevel) {
t.logger.Infof("Aborting the stream early due to InTapHandle failure: %v", err)
}
stat, ok := status.FromError(err)
if !ok {
@@ -595,7 +601,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
LocalAddr: t.localAddr,
Compression: s.recvCompress,
WireLength: int(frame.Header().Length),
Header: metadata.MD(mdata).Copy(),
Header: mdata.Copy(),
}
sh.HandleRPC(s.ctx, inHeader)
}
@@ -632,8 +638,8 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
if err != nil {
if se, ok := err.(http2.StreamError); ok {
if logger.V(logLevel) {
logger.Warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
if t.logger.V(logLevel) {
t.logger.Warningf("Encountered http2.StreamError: %v", se)
}
t.mu.Lock()
s := t.activeStreams[se.StreamID]
@@ -676,8 +682,8 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
case *http2.GoAwayFrame:
// TODO: Handle GoAway from the client appropriately.
default:
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
if t.logger.V(logLevel) {
t.logger.Infof("Received unsupported frame type %T", frame)
}
}
}
@@ -936,8 +942,8 @@ func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
var sz int64
for _, f := range hdrFrame.hf {
if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
if logger.V(logLevel) {
logger.Errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
if t.logger.V(logLevel) {
t.logger.Infof("Header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
}
return false
}
@@ -1050,7 +1056,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
stBytes, err := proto.Marshal(p)
if err != nil {
// TODO: return error instead, when callers are able to handle it.
logger.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
t.logger.Errorf("Failed to marshal rpc status: %s, error: %v", pretty.ToJSON(p), err)
} else {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
}
@@ -1155,18 +1161,18 @@ func (t *http2Server) keepalive() {
if val <= 0 {
// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
// Gracefully close the connection.
t.Drain()
t.Drain("max_idle")
return
}
idleTimer.Reset(val)
case <-ageTimer.C:
t.Drain()
t.Drain("max_age")
ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
select {
case <-ageTimer.C:
// Close the connection after grace period.
if logger.V(logLevel) {
logger.Infof("transport: closing server transport due to maximum connection age.")
if t.logger.V(logLevel) {
t.logger.Infof("Closing server transport due to maximum connection age")
}
t.controlBuf.put(closeConnection{})
case <-t.done:
@@ -1217,8 +1223,8 @@ func (t *http2Server) Close(err error) {
t.mu.Unlock()
return
}
if logger.V(logLevel) {
logger.Infof("transport: closing: %v", err)
if t.logger.V(logLevel) {
t.logger.Infof("Closing: %v", err)
}
t.state = closing
streams := t.activeStreams
@@ -1226,8 +1232,8 @@ func (t *http2Server) Close(err error) {
t.mu.Unlock()
t.controlBuf.finish()
close(t.done)
if err := t.conn.Close(); err != nil && logger.V(logLevel) {
logger.Infof("transport: error closing conn during Close: %v", err)
if err := t.conn.Close(); err != nil && t.logger.V(logLevel) {
t.logger.Infof("Error closing underlying net.Conn during Close: %v", err)
}
channelz.RemoveEntry(t.channelzID)
// Cancel all active streams.
@@ -1307,14 +1313,14 @@ func (t *http2Server) RemoteAddr() net.Addr {
return t.remoteAddr
}
func (t *http2Server) Drain() {
func (t *http2Server) Drain(debugData string) {
t.mu.Lock()
defer t.mu.Unlock()
if t.drainEvent != nil {
return
}
t.drainEvent = grpcsync.NewEvent()
t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte{}, headsUp: true})
t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte(debugData), headsUp: true})
}
var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
@@ -1344,9 +1350,6 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
return false, err
}
if retErr != nil {
// Abruptly close the connection following the GoAway (via
// loopywriter). But flush out what's inside the buffer first.
t.framer.writer.Flush()
return false, retErr
}
return true, nil
@@ -1359,7 +1362,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
// originated before the GoAway reaches the client.
// After getting the ack or timer expiration send out another GoAway this
// time with an ID of the max stream server intends to process.
if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, g.debugData); err != nil {
return false, err
}
if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {

View File

@@ -21,6 +21,7 @@ package transport
import (
"bufio"
"encoding/base64"
"errors"
"fmt"
"io"
"math"
@@ -37,7 +38,6 @@ import (
"golang.org/x/net/http2/hpack"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/status"
)
@@ -85,7 +85,6 @@ var (
// 504 Gateway timeout - UNAVAILABLE.
http.StatusGatewayTimeout: codes.Unavailable,
}
logger = grpclog.Component("transport")
)
// isReservedHeader checks whether hdr belongs to HTTP2 headers
@@ -330,7 +329,8 @@ func (w *bufWriter) Write(b []byte) (n int, err error) {
return 0, w.err
}
if w.batchSize == 0 { // Buffer has been disabled.
return w.conn.Write(b)
n, err = w.conn.Write(b)
return n, toIOError(err)
}
for len(b) > 0 {
nn := copy(w.buf[w.offset:], b)
@@ -352,10 +352,30 @@ func (w *bufWriter) Flush() error {
return nil
}
_, w.err = w.conn.Write(w.buf[:w.offset])
w.err = toIOError(w.err)
w.offset = 0
return w.err
}
type ioError struct {
error
}
func (i ioError) Unwrap() error {
return i.error
}
func isIOError(err error) bool {
return errors.As(err, &ioError{})
}
func toIOError(err error) error {
if err == nil {
return nil
}
return ioError{error: err}
}
type framer struct {
writer *bufWriter
fr *http2.Framer

View File

@@ -0,0 +1,40 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package transport
import (
"fmt"
"google.golang.org/grpc/grpclog"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
)
var logger = grpclog.Component("transport")
func prefixLoggerForServerTransport(p *http2Server) *internalgrpclog.PrefixLogger {
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[server-transport %p] ", p))
}
func prefixLoggerForServerHandlerTransport(p *serverHandlerTransport) *internalgrpclog.PrefixLogger {
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[server-handler-transport %p] ", p))
}
func prefixLoggerForClientTransport(p *http2Client) *internalgrpclog.PrefixLogger {
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[client-transport %p] ", p))
}

View File

@@ -257,6 +257,9 @@ type Stream struct {
fc *inFlow
wq *writeQuota
// Holds compressor names passed in grpc-accept-encoding metadata from the
// client. This is empty for the client side stream.
clientAdvertisedCompressors string
// Callback to state application's intentions to read data. This
// is used to adjust flow control, if needed.
requestRead func(int)
@@ -345,8 +348,24 @@ func (s *Stream) RecvCompress() string {
}
// SetSendCompress sets the compression algorithm to the stream.
func (s *Stream) SetSendCompress(str string) {
s.sendCompress = str
func (s *Stream) SetSendCompress(name string) error {
if s.isHeaderSent() || s.getState() == streamDone {
return errors.New("transport: set send compressor called after headers sent or stream done")
}
s.sendCompress = name
return nil
}
// SendCompress returns the send compressor name.
func (s *Stream) SendCompress() string {
return s.sendCompress
}
// ClientAdvertisedCompressors returns the compressor names advertised by the
// client via grpc-accept-encoding header.
func (s *Stream) ClientAdvertisedCompressors() string {
return s.clientAdvertisedCompressors
}
// Done returns a channel which is closed when it receives the final status
@@ -707,7 +726,7 @@ type ServerTransport interface {
RemoteAddr() net.Addr
// Drain notifies the client this ServerTransport stops accepting new RPCs.
Drain()
Drain(debugData string)
// IncrMsgSent increments the number of message sent through this transport.
IncrMsgSent()