Compare commits

..

9 Commits

Author SHA1 Message Date
junot
aa0da0c80d add rule_type label to indicate rule type (#5834)
add rule_type label to indicate rule type (#472)

Signed-off-by: junot <junotxiang@kubesphere.io>
2023-07-18 17:21:06 +08:00
Elon Cheng
560b0da7da Modify permission path (#5848)
Signed-off-by: dehaocheng <dehaocheng@kubesphere.io>
2023-07-18 10:10:05 +08:00
LQBing
6e0a48c555 Use autoscaling/v2 instead of autoscaling/v2beta2 (#5833) 2023-07-13 11:02:58 +08:00
littlejian
fde9d2e7cd feat: support gitlab identity provider (#5836) 2023-07-13 10:44:59 +08:00
hongming
a64e573c04 fix: enableMultiLogin configuration does not take effect (#5819) 2023-07-06 18:04:51 +08:00
hongzhouzi
0fe9fb48b4 chore(deps): bump google.golang.org/grpc from 1.52.3 to 1.53.0 (#5820)
Signed-off-by: hongzhouzi <hongzhouzi@kubesphere.io>
2023-07-06 15:45:52 +08:00
Xinzhao Xu
3b8c12ffdc apis/resources: support for searching alias in annotations (#5807) 2023-07-04 11:24:49 +08:00
Xinzhao Xu
580b4196ff Fix webhook validation issue for new clusters (#5802) 2023-07-03 15:34:48 +08:00
Wenhao Zhou
b1466e572b fix: clear all oauth Providers when reloading configuration (#5797)
* fix: clear all auth Providers when reloading configuration

Signed-off-by: wenhaozhou <wenhaozhou@yunify.com>

* fix:identity-provider test

Signed-off-by: wenhaozhou <wenhaozhou@yunify.com>

---------

Signed-off-by: wenhaozhou <wenhaozhou@yunify.com>
2023-06-29 19:09:44 +08:00
54 changed files with 717 additions and 285 deletions

6
go.mod
View File

@@ -76,7 +76,7 @@ require (
github.com/stretchr/testify v1.8.1
golang.org/x/crypto v0.5.0
golang.org/x/oauth2 v0.4.0
google.golang.org/grpc v1.52.3
google.golang.org/grpc v1.53.0
gopkg.in/cas.v2 v2.2.0
gopkg.in/square/go-jose.v2 v2.5.1
gopkg.in/src-d/go-git.v4 v4.13.1
@@ -181,7 +181,7 @@ require (
github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd // indirect
github.com/gregjones/httpcache v0.0.0-20181110185634-c63ab54fda8f // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.1 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/huandu/xstrings v1.3.3 // indirect
github.com/imdario/mergo v0.3.12 // indirect
@@ -502,7 +502,7 @@ replace (
gomodules.xyz/jsonpatch/v2 => gomodules.xyz/jsonpatch/v2 v2.2.0
google.golang.org/appengine => google.golang.org/appengine v1.6.7
google.golang.org/genproto => google.golang.org/genproto v0.0.0-20230124163310-31e0e69b6fc2
google.golang.org/grpc => google.golang.org/grpc v1.52.3
google.golang.org/grpc => google.golang.org/grpc v1.53.0
google.golang.org/protobuf => google.golang.org/protobuf v1.28.1
gopkg.in/asn1-ber.v1 => gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d
gopkg.in/cas.v2 => gopkg.in/cas.v2 v2.2.0

18
go.sum
View File

@@ -34,8 +34,9 @@ cloud.google.com/go v0.100.2/go.mod h1:4Xra9TjzAeYHrl5+oeLlzbM2k3mjVhZh4UqTZ//w9
cloud.google.com/go v0.102.0/go.mod h1:oWcCzKlqJ5zgHQt9YsaeTY9KzIvjyy0ArmiBUgpQ+nc=
cloud.google.com/go v0.102.1/go.mod h1:XZ77E9qnTEnrgEOvr4xzfdX5TRo7fB4T2F4O6+34hIU=
cloud.google.com/go v0.104.0/go.mod h1:OO6xxXdJyvuJPcEPBLN9BJPD+jep5G1+2U5B5gkRYtA=
cloud.google.com/go v0.105.0 h1:DNtEKRBAAzeS4KyIory52wWHuClNaXJ5x1F7xa4q+5Y=
cloud.google.com/go v0.105.0/go.mod h1:PrLgOJNe5nfE9UMxKxgXj4mD3voiP+YQ6gdt6KMFOKM=
cloud.google.com/go v0.107.0 h1:qkj22L7bgkl6vIeZDlOY2po43Mx/TIa2Wsa7VR+PEww=
cloud.google.com/go v0.107.0/go.mod h1:wpc2eNrD7hXUTy8EKS10jkxpZBjASrORK7goS+3YX2I=
cloud.google.com/go/accessapproval v1.5.0/go.mod h1:HFy3tuiGvMdcd/u+Cu5b9NkO1pEICJ46IR82PoUdplw=
cloud.google.com/go/accesscontextmanager v1.3.0/go.mod h1:TgCBehyr5gNMz7ZaH9xubp+CE8dkrszb4oK9CWyvD4o=
cloud.google.com/go/accesscontextmanager v1.4.0/go.mod h1:/Kjh7BBu/Gh83sv+K60vN9QE5NJcd80sU33vIe2IFPE=
@@ -76,8 +77,9 @@ cloud.google.com/go/compute v1.10.0/go.mod h1:ER5CLbMxl90o2jtNbGSbtfOpQKR0t15FOt
cloud.google.com/go/compute v1.12.0/go.mod h1:e8yNOBcBONZU1vJKCvCoDw/4JQsA0dpM4x/6PIIOocU=
cloud.google.com/go/compute v1.12.1/go.mod h1:e8yNOBcBONZU1vJKCvCoDw/4JQsA0dpM4x/6PIIOocU=
cloud.google.com/go/compute v1.13.0/go.mod h1:5aPTS0cUNMIc1CE546K+Th6weJUNQErARyZtRXDJ8GE=
cloud.google.com/go/compute v1.14.0 h1:hfm2+FfxVmnRlh6LpB7cg1ZNU+5edAHmW679JePztk0=
cloud.google.com/go/compute v1.14.0/go.mod h1:YfLtxrj9sU4Yxv+sXzZkyPjEyPBZfXHUvjxega5vAdo=
cloud.google.com/go/compute v1.15.1 h1:7UGq3QknM33pw5xATlpzeoomNxsacIVvTqTTvbfajmE=
cloud.google.com/go/compute v1.15.1/go.mod h1:bjjoF/NtFUrkD/urWfdHaKuOPDR5nWIs63rR+SXhcpA=
cloud.google.com/go/compute/metadata v0.1.0/go.mod h1:Z1VN+bulIf6bt4P/C37K4DyZYZEXYonfTBHHFPO/4UU=
cloud.google.com/go/compute/metadata v0.2.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k=
cloud.google.com/go/compute/metadata v0.2.1/go.mod h1:jgHgmJd2RKBGzXqF5LR2EZMGxBkeanZ9wwa75XHJgOM=
@@ -337,6 +339,7 @@ github.com/cenkalti/backoff/v4 v4.2.0 h1:HN5dHm3WBOgndBH6E8V0q2jIYIR3s9yglV8k/+M
github.com/cenkalti/backoff/v4 v4.2.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw=
github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
@@ -366,11 +369,13 @@ github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6D
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
github.com/clbanning/mxj v1.8.4/go.mod h1:BVjHeAH+rl9rs6f+QIpeRl0tfu10SXn1pUSa5PVGJng=
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI=
github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI=
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc h1:PYXxkRUBGUMa5xgMVMDl62vEklZvKpVaxQeN9ie7Hfk=
github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b h1:ACGZRIr7HsgBKHsueQ1yM4WaVaXh21ynwqsF8M8tXhA=
github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cockroachdb/datadriven v0.0.0-20200714090401-bf6692d28da5/go.mod h1:h6jFvWxBdQXxjopDMZyH2UVceIRfR84bdzbkoKrsWNo=
github.com/cockroachdb/errors v1.2.4/go.mod h1:rQD95gz6FARkaKkQXUksEje/d9a6wBJoCr5oaCLELYA=
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f/go.mod h1:i/u985jwjWRlyHXQbwatDASoW0RMlZ/3i9yJHE2xLkI=
@@ -750,8 +755,9 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.1 h1:/sDbPb60SusIXjiJGYLUoS/rAQurQmvGWmwn2bBPM9c=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.1/go.mod h1:G+WkljZi4mflcqVxYSgvt8MNctRQHjEH8ubKtt1Ka3w=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 h1:lLT7ZLSzGLI08vc9cpd+tYmNWjdKDqyr/2L+f6U12Fk=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3/go.mod h1:o//XUCC/F+yRGJoPO/VU0GSB0f8Nhgmxx0VIRUvaC0w=
github.com/hashicorp/consul/api v1.10.1/go.mod h1:XjsvQN+RJGWI2TWy1/kqaE16HrR2J/FWgkYjdZQsX9M=
github.com/hashicorp/consul/api v1.18.0 h1:R7PPNzTCeN6VuQNDwwhZWJvzCtGSrNpJqfb22h3yH9g=
github.com/hashicorp/consul/api v1.18.0/go.mod h1:owRRGJ9M5xReDC5nfT8FTJrNAPbT4NM6p/k+d03q2v4=
@@ -1479,8 +1485,8 @@ google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6
google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/genproto v0.0.0-20230124163310-31e0e69b6fc2 h1:O97sLx/Xmb/KIZHB/2/BzofxBs5QmmR0LcihPtllmbc=
google.golang.org/genproto v0.0.0-20230124163310-31e0e69b6fc2/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM=
google.golang.org/grpc v1.52.3 h1:pf7sOysg4LdgBqduXveGKrcEwbStiK2rtfghdzlUYDQ=
google.golang.org/grpc v1.52.3/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5vorUY=
google.golang.org/grpc v1.53.0 h1:LAv2ds7cmFV/XTS3XG1NneeENYrXGmorPxsBbptIjNc=
google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=

View File

@@ -108,7 +108,7 @@ var SupportedGroupVersionResources = map[ClientType][]schema.GroupVersionResourc
{Group: "networking.k8s.io", Version: "v1", Resource: "ingresses"},
{Group: "autoscaling", Version: "v2beta2", Resource: "horizontalpodautoscalers"},
{Group: "autoscaling", Version: "v2", Resource: "horizontalpodautoscalers"},
},
// all supported kubesphere api objects

View File

@@ -466,7 +466,7 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error {
"ingresses",
"networkpolicies",
},
{Group: "autoscaling", Version: "v2beta2"}: {
{Group: "autoscaling", Version: "v2"}: {
"horizontalpodautoscalers",
},
}

View File

@@ -0,0 +1,177 @@
/*
Copyright 2020 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gitlab
import (
"context"
"crypto/tls"
"encoding/json"
"io"
"net/http"
"strconv"
"github.com/mitchellh/mapstructure"
"golang.org/x/oauth2"
"kubesphere.io/kubesphere/pkg/apiserver/authentication/identityprovider"
"kubesphere.io/kubesphere/pkg/server/options"
)
const (
userInfoURL = "https://gitlab.com/api/v4/user"
authURL = "https://gitlab.com/oauth/authorize"
tokenURL = "https://gitlab.com/oauth/token"
)
func init() {
identityprovider.RegisterOAuthProvider(&gitlabProviderFactory{})
}
type gitlab struct {
// ClientID is the application's ID.
ClientID string `json:"clientID" yaml:"clientID"`
// ClientSecret is the application's secret.
ClientSecret string `json:"-" yaml:"clientSecret"`
// Endpoint contains the resource server's token endpoint
// URLs. These are constants specific to each server and are
// often available via site-specific packages, such as
// google.Endpoint or sso.endpoint.
Endpoint endpoint `json:"endpoint" yaml:"endpoint"`
// RedirectURL is the URL to redirect users going through
// the OAuth flow, after the resource owner's URLs.
RedirectURL string `json:"redirectURL" yaml:"redirectURL"`
// Used to turn off TLS certificate checks
InsecureSkipVerify bool `json:"insecureSkipVerify" yaml:"insecureSkipVerify"`
// Scope specifies optional requested permissions.
Scopes []string `json:"scopes" yaml:"scopes"`
Config *oauth2.Config `json:"-" yaml:"-"`
}
// endpoint represents an OAuth 2.0 provider's authorization and token
// endpoint URLs.
type endpoint struct {
AuthURL string `json:"authURL" yaml:"authURL"`
TokenURL string `json:"tokenURL" yaml:"tokenURL"`
UserInfoURL string `json:"userInfoURL" yaml:"userInfoURL"`
}
type gitlabIdentity struct {
ID int64 `json:"id"`
Username string `json:"username"`
Email string `json:"email"`
Name string `json:"name"`
State string `json:"state"`
AvatarURL string `json:"avatar_url"`
WebURL string `json:"web_url"`
}
type gitlabProviderFactory struct {
}
func (g *gitlabProviderFactory) Type() string {
return "GitlabIdentityProvider"
}
func (g *gitlabProviderFactory) Create(opts options.DynamicOptions) (identityprovider.OAuthProvider, error) {
var gitlab gitlab
if err := mapstructure.Decode(opts, &gitlab); err != nil {
return nil, err
}
if gitlab.Endpoint.AuthURL == "" {
gitlab.Endpoint.AuthURL = authURL
}
if gitlab.Endpoint.TokenURL == "" {
gitlab.Endpoint.TokenURL = tokenURL
}
if gitlab.Endpoint.UserInfoURL == "" {
gitlab.Endpoint.UserInfoURL = userInfoURL
}
// fixed options
opts["endpoint"] = options.DynamicOptions{
"authURL": gitlab.Endpoint.AuthURL,
"tokenURL": gitlab.Endpoint.TokenURL,
"userInfoURL": gitlab.Endpoint.UserInfoURL,
}
gitlab.Config = &oauth2.Config{
ClientID: gitlab.ClientID,
ClientSecret: gitlab.ClientSecret,
Endpoint: oauth2.Endpoint{
AuthURL: gitlab.Endpoint.AuthURL,
TokenURL: gitlab.Endpoint.TokenURL,
},
RedirectURL: gitlab.RedirectURL,
Scopes: gitlab.Scopes,
}
return &gitlab, nil
}
func (g gitlabIdentity) GetUserID() string {
return strconv.FormatInt(g.ID, 10)
}
func (g gitlabIdentity) GetUsername() string {
return g.Username
}
func (g gitlabIdentity) GetEmail() string {
return g.Email
}
func (g *gitlab) IdentityExchangeCallback(req *http.Request) (identityprovider.Identity, error) {
// OAuth2 callback, see also https://tools.ietf.org/html/rfc6749#section-4.1.2
code := req.URL.Query().Get("code")
ctx := req.Context()
if g.InsecureSkipVerify {
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
}
ctx = context.WithValue(ctx, oauth2.HTTPClient, client)
}
token, err := g.Config.Exchange(ctx, code)
if err != nil {
return nil, err
}
resp, err := oauth2.NewClient(ctx, oauth2.StaticTokenSource(token)).Get(g.Endpoint.UserInfoURL)
if err != nil {
return nil, err
}
data, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var gitlabIdentity gitlabIdentity
err = json.Unmarshal(data, &gitlabIdentity)
if err != nil {
return nil, err
}
return gitlabIdentity, nil
}

View File

@@ -0,0 +1,82 @@
package gitlab
import (
"reflect"
"testing"
"golang.org/x/oauth2"
"gopkg.in/yaml.v3"
"kubesphere.io/kubesphere/pkg/apiserver/authentication/identityprovider"
"kubesphere.io/kubesphere/pkg/server/options"
)
func Test_gitlabProviderFactory_Create(t *testing.T) {
type args struct {
opts options.DynamicOptions
}
mustUnmarshalYAML := func(data string) options.DynamicOptions {
var dynamicOptions options.DynamicOptions
_ = yaml.Unmarshal([]byte(data), &dynamicOptions)
return dynamicOptions
}
tests := []struct {
name string
args args
want identityprovider.OAuthProvider
wantErr bool
}{
{
name: "should create successfully",
args: args{opts: mustUnmarshalYAML(`
clientID: 035c18fc229c686e4652d7034
clientSecret: 75c82b42e54aaf25186140f5
endpoint:
userInfoUrl: "https://gitlab.com/api/v4/user"
authURL: "https://gitlab.com/oauth/authorize"
tokenURL: "https://gitlab.com/oauth/token"
redirectURL: "https://ks-console.kubesphere-system.svc/oauth/redirect/gitlab"
scopes:
- read
`)},
want: &gitlab{
ClientID: "035c18fc229c686e4652d7034",
ClientSecret: "75c82b42e54aaf25186140f5",
Endpoint: endpoint{
AuthURL: "https://gitlab.com/oauth/authorize",
TokenURL: "https://gitlab.com/oauth/token",
UserInfoURL: "https://gitlab.com/api/v4/user",
},
RedirectURL: "https://ks-console.kubesphere-system.svc/oauth/redirect/gitlab",
Scopes: []string{"read"},
Config: &oauth2.Config{
ClientID: "035c18fc229c686e4652d7034",
ClientSecret: "75c82b42e54aaf25186140f5",
Endpoint: oauth2.Endpoint{
AuthURL: "https://gitlab.com/oauth/authorize",
TokenURL: "https://gitlab.com/oauth/token",
AuthStyle: oauth2.AuthStyleAutoDetect,
},
RedirectURL: "https://ks-console.kubesphere-system.svc/oauth/redirect/gitlab",
Scopes: []string{"read"},
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := &gitlabProviderFactory{}
got, err := g.Create(tt.args.opts)
if (err != nil) != tt.wantErr {
t.Errorf("Create() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("Create() got = %v, want %v", got, tt.want)
}
})
}
}

View File

@@ -47,6 +47,10 @@ type Identity interface {
// SetupWithOptions will verify the configuration and initialize the identityProviders
func SetupWithOptions(options []oauth.IdentityProviderOptions) error {
// Clear all providers when reloading configuration
oauthProviders = make(map[string]OAuthProvider)
genericProviders = make(map[string]GenericProvider)
for _, o := range options {
if oauthProviders[o.Name] != nil || genericProviders[o.Name] != nil {
err := fmt.Errorf("duplicate identity provider found: %s, name must be unique", o.Name)

View File

@@ -113,6 +113,12 @@ func TestSetupWith(t *testing.T) {
Type: "LDAPIdentityProvider",
Provider: options.DynamicOptions{},
},
{
Name: "ldap",
MappingMethod: "auto",
Type: "LDAPIdentityProvider",
Provider: options.DynamicOptions{},
},
}},
wantErr: true,
},

View File

@@ -28,6 +28,7 @@ import (
_ "kubesphere.io/kubesphere/pkg/apiserver/authentication/identityprovider/aliyunidaas"
_ "kubesphere.io/kubesphere/pkg/apiserver/authentication/identityprovider/cas"
_ "kubesphere.io/kubesphere/pkg/apiserver/authentication/identityprovider/github"
_ "kubesphere.io/kubesphere/pkg/apiserver/authentication/identityprovider/gitlab"
_ "kubesphere.io/kubesphere/pkg/apiserver/authentication/identityprovider/ldap"
_ "kubesphere.io/kubesphere/pkg/apiserver/authentication/identityprovider/oidc"
"kubesphere.io/kubesphere/pkg/apiserver/authentication/oauth"

View File

@@ -21,6 +21,7 @@ type Value string
const (
FieldName = "name"
FieldAlias = "alias"
FieldNames = "names"
FieldUID = "uid"
FieldCreationTimeStamp = "creationTimestamp"

View File

@@ -42,9 +42,13 @@ const (
RuleLevelCluster RuleLevel = "cluster"
RuleLevelGlobal RuleLevel = "global"
RuleTypeTemplate RuleType = "template" // for template rule configured by exprBuilder to build expression
RuleTypeCustom RuleType = "custom" // for custom rule configured by direct expression
// for rule.labels
RuleLabelKeyRuleLevel = "rule_level"
RuleLabelKeyRuleGroup = "rule_group"
RuleLabelKeyRuleType = "rule_type"
RuleLabelKeyCluster = "cluster"
RuleLabelKeyNamespace = "namespace"
RuleLabelKeySeverity = "severity"
@@ -75,6 +79,8 @@ const (
type RuleLevel string
type RuleType string
var maxConfigMapDataSize = int(float64(corev1.MaxSecretSize) * 0.5)
type enforceRuleFunc func(rule *promresourcesv1.Rule) error
@@ -193,6 +199,11 @@ func makePrometheusRuleGroups(log logr.Logger, groupList client.ObjectList,
continue
}
if prule != nil {
if rule.ExprBuilder != nil && rule.ExprBuilder.Workload != nil {
prule.Labels[RuleLabelKeyRuleType] = string(RuleTypeTemplate)
} else {
prule.Labels[RuleLabelKeyRuleType] = string(RuleTypeCustom)
}
prules = append(prules, *prule)
}
}
@@ -216,6 +227,11 @@ func makePrometheusRuleGroups(log logr.Logger, groupList client.ObjectList,
continue
}
if prule != nil {
if rule.ExprBuilder != nil && rule.ExprBuilder.Node != nil {
prule.Labels[RuleLabelKeyRuleType] = string(RuleTypeTemplate)
} else {
prule.Labels[RuleLabelKeyRuleType] = string(RuleTypeCustom)
}
prules = append(prules, *prule)
}
}
@@ -241,6 +257,11 @@ func makePrometheusRuleGroups(log logr.Logger, groupList client.ObjectList,
continue
}
if prule != nil {
if rule.ExprBuilder != nil && (rule.ExprBuilder.Node != nil || rule.ExprBuilder.Workload != nil) {
prule.Labels[RuleLabelKeyRuleType] = string(RuleTypeTemplate)
} else {
prule.Labels[RuleLabelKeyRuleType] = string(RuleTypeCustom)
}
prules = append(prules, *prule)
}
}

View File

@@ -46,28 +46,29 @@ func (h *ValidatingHandler) InjectDecoder(d *admission.Decoder) error {
// Handle handles admission requests.
func (h *ValidatingHandler) Handle(ctx context.Context, req admission.Request) admission.Response {
if req.Operation != v1.Update {
return admission.Allowed("")
}
newCluster := &clusterv1alpha1.Cluster{}
err := h.decoder.DecodeRaw(req.Object, newCluster)
if err != nil {
if err := h.decoder.DecodeRaw(req.Object, newCluster); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
oldCluster := &clusterv1alpha1.Cluster{}
err = h.decoder.DecodeRaw(req.OldObject, oldCluster)
if err != nil {
if err := h.decoder.DecodeRaw(req.OldObject, oldCluster); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
// The cluster created for the first time has no status information
if oldCluster.Status.UID == "" {
return admission.Allowed("")
}
clusterConfig, err := clientcmd.RESTConfigFromKubeConfig(newCluster.Spec.Connection.KubeConfig)
if err != nil {
return admission.Denied(fmt.Sprintf("failed to load cluster config for %s: %s", newCluster.Name, err))
}
clusterClient, err := kubernetes.NewForConfig(clusterConfig)
if err != nil {
return admission.Denied(err.Error())
@@ -80,6 +81,5 @@ func (h *ValidatingHandler) Handle(ctx context.Context, req admission.Request) a
if oldCluster.Status.UID != kubeSystem.UID {
return admission.Denied("this kubeconfig corresponds to a different cluster than the previous one, you need to make sure that kubeconfig is not from another cluster")
}
return admission.Allowed("")
}

View File

@@ -53,12 +53,12 @@ func AddToContainer(
ws := runtime.NewWebService(GroupVersion)
h := newNotificationHandler(informers, k8sClient, ksClient, options)
ws.Route(ws.POST("/configs/notification/verification").
ws.Route(ws.POST("/verification").
Reads("").
To(h.Verify).
Returns(http.StatusOK, api.StatusOK, http.Response{}.Body)).
Doc("Provide validation for notification-manager information")
ws.Route(ws.POST("/configs/notification/users/{user}/verification").
ws.Route(ws.POST("/users/{user}/verification").
To(h.Verify).
Param(ws.PathParameter("user", "user name")).
Returns(http.StatusOK, api.StatusOK, http.Response{}.Body)).

View File

@@ -455,6 +455,11 @@ func (h *handler) passwordGrant(provider, username string, password string, req
}
func (h *handler) issueTokenTo(user user.Info) (*oauth.Token, error) {
if !h.options.MultipleLogin {
if err := h.tokenOperator.RevokeAllUserTokens(user.GetName()); err != nil {
return nil, err
}
}
accessToken, err := h.tokenOperator.IssueTo(&token.IssueRequest{
User: user,
Claims: token.Claims{TokenType: token.AccessToken},
@@ -471,7 +476,6 @@ func (h *handler) issueTokenTo(user user.Info) (*oauth.Token, error) {
if err != nil {
return nil, err
}
result := oauth.Token{
AccessToken: accessToken,
// The OAuth 2.0 token_type response parameter value MUST be Bearer,

View File

@@ -129,9 +129,7 @@ func (t *tokenOperator) tokenCacheValidate(username, token string) error {
if exist, err := t.cache.Exists(key); err != nil {
return err
} else if !exist {
err = errors.New("token not found in cache")
klog.V(4).Info(fmt.Errorf("%s: %s", err, token))
return err
return errors.New("token not found in cache")
}
return nil
}

View File

@@ -18,7 +18,7 @@ package hpa
import (
"sort"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
autoscalingv2 "k8s.io/api/autoscaling/v2"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
@@ -35,15 +35,15 @@ func NewHpaSearcher(informers informers.SharedInformerFactory) v1alpha2.Interfac
}
func (s *hpaSearcher) Get(namespace, name string) (interface{}, error) {
return s.informers.Autoscaling().V2beta2().HorizontalPodAutoscalers().Lister().HorizontalPodAutoscalers(namespace).Get(name)
return s.informers.Autoscaling().V2().HorizontalPodAutoscalers().Lister().HorizontalPodAutoscalers(namespace).Get(name)
}
func hpaTargetMatch(item *autoscalingv2beta2.HorizontalPodAutoscaler, kind, name string) bool {
func hpaTargetMatch(item *autoscalingv2.HorizontalPodAutoscaler, kind, name string) bool {
return item.Spec.ScaleTargetRef.Kind == kind && item.Spec.ScaleTargetRef.Name == name
}
// exactly Match
func (*hpaSearcher) match(match map[string]string, item *autoscalingv2beta2.HorizontalPodAutoscaler) bool {
func (*hpaSearcher) match(match map[string]string, item *autoscalingv2.HorizontalPodAutoscaler) bool {
for k, v := range match {
switch k {
case v1alpha2.TargetKind:
@@ -63,7 +63,7 @@ func (*hpaSearcher) match(match map[string]string, item *autoscalingv2beta2.Hori
return true
}
func (*hpaSearcher) fuzzy(fuzzy map[string]string, item *autoscalingv2beta2.HorizontalPodAutoscaler) bool {
func (*hpaSearcher) fuzzy(fuzzy map[string]string, item *autoscalingv2.HorizontalPodAutoscaler) bool {
for k, v := range fuzzy {
if !v1alpha2.ObjectMetaFuzzyMath(k, v, item.ObjectMeta) {
return false
@@ -74,13 +74,13 @@ func (*hpaSearcher) fuzzy(fuzzy map[string]string, item *autoscalingv2beta2.Hori
func (s *hpaSearcher) Search(namespace string, conditions *params.Conditions, orderBy string, reverse bool) ([]interface{}, error) {
horizontalPodAutoscalers, err := s.informers.Autoscaling().V2beta2().HorizontalPodAutoscalers().Lister().HorizontalPodAutoscalers(namespace).List(labels.Everything())
horizontalPodAutoscalers, err := s.informers.Autoscaling().V2().HorizontalPodAutoscalers().Lister().HorizontalPodAutoscalers(namespace).List(labels.Everything())
if err != nil {
return nil, err
}
result := make([]*autoscalingv2beta2.HorizontalPodAutoscaler, 0)
result := make([]*autoscalingv2.HorizontalPodAutoscaler, 0)
if len(conditions.Match) == 0 && len(conditions.Fuzzy) == 0 {
result = horizontalPodAutoscalers

View File

@@ -20,14 +20,14 @@ import (
"sort"
"strings"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"kubesphere.io/kubesphere/pkg/api"
"kubesphere.io/kubesphere/pkg/apiserver/query"
"kubesphere.io/kubesphere/pkg/constants"
)
type Interface interface {
@@ -120,13 +120,19 @@ func DefaultObjectMetaFilter(item metav1.ObjectMeta, filter query.Filter) bool {
// /namespaces?page=1&limit=10&name=default
case query.FieldName:
return strings.Contains(item.Name, string(filter.Value))
// /namespaces?page=1&limit=10&uid=a8a8d6cf-f6a5-4fea-9c1b-e57610115706
// /clusters?page=1&limit=10&alias=xxx
case query.FieldAlias:
if item.Annotations == nil {
return false
}
return strings.Contains(item.Annotations[constants.DisplayNameAnnotationKey], string(filter.Value))
// /namespaces?page=1&limit=10&uid=a8a8d6cf-f6a5-4fea-9c1b-e57610115706
case query.FieldUID:
return strings.Compare(string(item.UID), string(filter.Value)) == 0
// /deployments?page=1&limit=10&namespace=kubesphere-system
// /deployments?page=1&limit=10&namespace=kubesphere-system
case query.FieldNamespace:
return strings.Compare(item.Namespace, string(filter.Value)) == 0
// /namespaces?page=1&limit=10&ownerReference=a8a8d6cf-f6a5-4fea-9c1b-e57610115706
// /namespaces?page=1&limit=10&ownerReference=a8a8d6cf-f6a5-4fea-9c1b-e57610115706
case query.FieldOwnerReference:
for _, ownerReference := range item.OwnerReferences {
if strings.Compare(string(ownerReference.UID), string(filter.Value)) == 0 {
@@ -134,7 +140,7 @@ func DefaultObjectMetaFilter(item metav1.ObjectMeta, filter query.Filter) bool {
}
}
return false
// /namespaces?page=1&limit=10&ownerKind=Workspace
// /namespaces?page=1&limit=10&ownerKind=Workspace
case query.FieldOwnerKind:
for _, ownerReference := range item.OwnerReferences {
if strings.Compare(ownerReference.Kind, string(filter.Value)) == 0 {
@@ -142,10 +148,10 @@ func DefaultObjectMetaFilter(item metav1.ObjectMeta, filter query.Filter) bool {
}
}
return false
// /namespaces?page=1&limit=10&annotation=openpitrix_runtime
// /namespaces?page=1&limit=10&annotation=openpitrix_runtime
case query.FieldAnnotation:
return labelMatch(item.Annotations, string(filter.Value))
// /namespaces?page=1&limit=10&label=kubesphere.io/workspace:system-workspace
// /namespaces?page=1&limit=10&label=kubesphere.io/workspace:system-workspace
case query.FieldLabel:
return labelMatch(item.Labels, string(filter.Value))
default:

View File

@@ -92,7 +92,7 @@ func (s *inMemoryCache) cleanInvalidToken() {
func (s *inMemoryCache) Keys(pattern string) ([]string, error) {
// There is a little difference between go regexp and redis key pattern
// In redis, * means any character, while in go . means match everything.
pattern = strings.Replace(pattern, "*", ".", -1)
pattern = strings.Replace(pattern, "*", ".*", -1)
re, err := regexp.Compile(pattern)
if err != nil {

View File

@@ -30,6 +30,7 @@ require (
github.com/aws/aws-sdk-go v1.44.187 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dennwc/varint v1.0.0 // indirect
github.com/docker/distribution v2.8.2+incompatible // indirect
@@ -96,7 +97,7 @@ require (
golang.org/x/tools v0.6.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/grpc v1.52.3 // indirect
google.golang.org/grpc v1.53.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect

View File

@@ -109,7 +109,8 @@ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc h1:PYXxkRUBGUMa5xgMVMDl62vEklZvKpVaxQeN9ie7Hfk=
github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b h1:ACGZRIr7HsgBKHsueQ1yM4WaVaXh21ynwqsF8M8tXhA=
github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
@@ -1056,8 +1057,8 @@ google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.52.3 h1:pf7sOysg4LdgBqduXveGKrcEwbStiK2rtfghdzlUYDQ=
google.golang.org/grpc v1.52.3/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5vorUY=
google.golang.org/grpc v1.53.0 h1:LAv2ds7cmFV/XTS3XG1NneeENYrXGmorPxsBbptIjNc=
google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=

View File

@@ -130,7 +130,7 @@ require (
golang.org/x/time v0.3.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230124163310-31e0e69b6fc2 // indirect
google.golang.org/grpc v1.52.3 // indirect
google.golang.org/grpc v1.53.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect

View File

@@ -592,8 +592,8 @@ google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTp
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.52.3 h1:pf7sOysg4LdgBqduXveGKrcEwbStiK2rtfghdzlUYDQ=
google.golang.org/grpc v1.52.3/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5vorUY=
google.golang.org/grpc v1.53.0 h1:LAv2ds7cmFV/XTS3XG1NneeENYrXGmorPxsBbptIjNc=
google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=

View File

@@ -275,11 +275,12 @@ func (p *parser) accept(term termType) (string, error) {
// expectPChars determines if "t" consists of only pchars defined in RFC3986.
//
// https://www.ietf.org/rfc/rfc3986.txt, P.49
// pchar = unreserved / pct-encoded / sub-delims / ":" / "@"
// unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~"
// sub-delims = "!" / "$" / "&" / "'" / "(" / ")"
// / "*" / "+" / "," / ";" / "="
// pct-encoded = "%" HEXDIG HEXDIG
//
// pchar = unreserved / pct-encoded / sub-delims / ":" / "@"
// unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~"
// sub-delims = "!" / "$" / "&" / "'" / "(" / ")"
// / "*" / "+" / "," / ";" / "="
// pct-encoded = "%" HEXDIG HEXDIG
func expectPChars(t string) error {
const (
init = iota

View File

@@ -162,10 +162,11 @@ func DefaultStreamErrorHandler(_ context.Context, err error) *status.Status {
// DefaultRoutingErrorHandler is our default handler for routing errors.
// By default http error codes mapped on the following error codes:
// NotFound -> grpc.NotFound
// StatusBadRequest -> grpc.InvalidArgument
// MethodNotAllowed -> grpc.Unimplemented
// Other -> grpc.Internal, method is not expecting to be called for anything else
//
// NotFound -> grpc.NotFound
// StatusBadRequest -> grpc.InvalidArgument
// MethodNotAllowed -> grpc.Unimplemented
// Other -> grpc.Internal, method is not expecting to be called for anything else
func DefaultRoutingErrorHandler(ctx context.Context, mux *ServeMux, marshaler Marshaler, w http.ResponseWriter, r *http.Request, httpStatus int) {
sterr := status.Error(codes.Internal, "Unexpected routing error")
switch httpStatus {

View File

@@ -279,6 +279,14 @@ type PickResult struct {
// type, Done may not be called. May be nil if the balancer does not wish
// to be notified when the RPC completes.
Done func(DoneInfo)
// Metadata provides a way for LB policies to inject arbitrary per-call
// metadata. Any metadata returned here will be merged with existing
// metadata added by the client application.
//
// LB policies with child policies are responsible for propagating metadata
// injected by their children to the ClientConn, as part of Pick().
Metatada metadata.MD
}
// TransientFailureError returns e. It exists for backward compatibility and

View File

@@ -256,7 +256,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
if err != nil {
return nil, err
}
cc.authority, err = determineAuthority(cc.parsedTarget.Endpoint, cc.target, cc.dopts)
cc.authority, err = determineAuthority(cc.parsedTarget.Endpoint(), cc.target, cc.dopts)
if err != nil {
return nil, err
}
@@ -934,7 +934,7 @@ func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
return cc.sc.healthCheckConfig
}
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, balancer.PickResult, error) {
return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
Ctx: ctx,
FullMethodName: method,
@@ -1237,9 +1237,11 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
addr.ServerName = ac.cc.getServerName(addr)
hctx, hcancel := context.WithCancel(ac.ctx)
onClose := grpcsync.OnceFunc(func() {
onClose := func(r transport.GoAwayReason) {
ac.mu.Lock()
defer ac.mu.Unlock()
// adjust params based on GoAwayReason
ac.adjustParams(r)
if ac.state == connectivity.Shutdown {
// Already shut down. tearDown() already cleared the transport and
// canceled hctx via ac.ctx, and we expected this connection to be
@@ -1260,19 +1262,13 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
// Always go idle and wait for the LB policy to initiate a new
// connection attempt.
ac.updateConnectivityState(connectivity.Idle, nil)
})
onGoAway := func(r transport.GoAwayReason) {
ac.mu.Lock()
ac.adjustParams(r)
ac.mu.Unlock()
onClose()
}
connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
defer cancel()
copts.ChannelzParentID = ac.channelzID
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onGoAway, onClose)
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onClose)
if err != nil {
if logger.V(2) {
logger.Infof("Creating new client transport to %q: %v", addr, err)
@@ -1380,7 +1376,7 @@ func (ac *addrConn) startHealthCheck(ctx context.Context) {
if status.Code(err) == codes.Unimplemented {
channelz.Error(logger, ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled")
} else {
channelz.Errorf(logger, ac.channelzID, "HealthCheckFunc exits with unexpected error %v", err)
channelz.Errorf(logger, ac.channelzID, "Health checking failed: %v", err)
}
}
}()
@@ -1591,30 +1587,17 @@ func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {
}
// parseTarget uses RFC 3986 semantics to parse the given target into a
// resolver.Target struct containing scheme, authority and endpoint. Query
// resolver.Target struct containing scheme, authority and url. Query
// params are stripped from the endpoint.
func parseTarget(target string) (resolver.Target, error) {
u, err := url.Parse(target)
if err != nil {
return resolver.Target{}, err
}
// For targets of the form "[scheme]://[authority]/endpoint, the endpoint
// value returned from url.Parse() contains a leading "/". Although this is
// in accordance with RFC 3986, we do not want to break existing resolver
// implementations which expect the endpoint without the leading "/". So, we
// end up stripping the leading "/" here. But this will result in an
// incorrect parsing for something like "unix:///path/to/socket". Since we
// own the "unix" resolver, we can workaround in the unix resolver by using
// the `URL` field instead of the `Endpoint` field.
endpoint := u.Path
if endpoint == "" {
endpoint = u.Opaque
}
endpoint = strings.TrimPrefix(endpoint, "/")
return resolver.Target{
Scheme: u.Scheme,
Authority: u.Host,
Endpoint: endpoint,
URL: *u,
}, nil
}

View File

@@ -23,9 +23,9 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net"
"net/url"
"os"
credinternal "google.golang.org/grpc/internal/credentials"
)
@@ -166,7 +166,7 @@ func NewClientTLSFromCert(cp *x509.CertPool, serverNameOverride string) Transpor
// it will override the virtual host name of authority (e.g. :authority header
// field) in requests.
func NewClientTLSFromFile(certFile, serverNameOverride string) (TransportCredentials, error) {
b, err := ioutil.ReadFile(certFile)
b, err := os.ReadFile(certFile)
if err != nil {
return nil, err
}

View File

@@ -44,6 +44,7 @@ func init() {
extraDialOptions = nil
}
internal.WithBinaryLogger = withBinaryLogger
internal.JoinDialOptions = newJoinDialOption
}
// dialOptions configure a Dial call. dialOptions are set by the DialOption
@@ -111,6 +112,20 @@ func newFuncDialOption(f func(*dialOptions)) *funcDialOption {
}
}
type joinDialOption struct {
opts []DialOption
}
func (jdo *joinDialOption) apply(do *dialOptions) {
for _, opt := range jdo.opts {
opt.apply(do)
}
}
func newJoinDialOption(opts ...DialOption) DialOption {
return &joinDialOption{opts: opts}
}
// WithWriteBufferSize determines how much data can be batched before doing a
// write on the wire. The corresponding memory allocation for this buffer will
// be twice the size to keep syscalls low. The default value for this buffer is

View File

@@ -75,7 +75,9 @@ var registeredCompressor = make(map[string]Compressor)
// registered with the same name, the one registered last will take effect.
func RegisterCompressor(c Compressor) {
registeredCompressor[c.Name()] = c
grpcutil.RegisteredCompressorNames = append(grpcutil.RegisteredCompressorNames, c.Name())
if !grpcutil.IsCompressorNameRegistered(c.Name()) {
grpcutil.RegisteredCompressorNames = append(grpcutil.RegisteredCompressorNames, c.Name())
}
}
// GetCompressor returns Compressor for the given compressor name.

View File

@@ -30,7 +30,6 @@ import (
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"sync"
"google.golang.org/grpc/encoding"
@@ -42,7 +41,7 @@ const Name = "gzip"
func init() {
c := &compressor{}
c.poolCompressor.New = func() interface{} {
return &writer{Writer: gzip.NewWriter(ioutil.Discard), pool: &c.poolCompressor}
return &writer{Writer: gzip.NewWriter(io.Discard), pool: &c.poolCompressor}
}
encoding.RegisterCompressor(c)
}
@@ -63,7 +62,7 @@ func SetLevel(level int) error {
}
c := encoding.GetCompressor(Name).(*compressor)
c.poolCompressor.New = func() interface{} {
w, err := gzip.NewWriterLevel(ioutil.Discard, level)
w, err := gzip.NewWriterLevel(io.Discard, level)
if err != nil {
panic(err)
}

View File

@@ -22,7 +22,6 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"strconv"
@@ -140,9 +139,9 @@ func newLoggerV2WithConfig(infoW, warningW, errorW io.Writer, c loggerV2Config)
// newLoggerV2 creates a loggerV2 to be used as default logger.
// All logs are written to stderr.
func newLoggerV2() LoggerV2 {
errorW := ioutil.Discard
warningW := ioutil.Discard
infoW := ioutil.Discard
errorW := io.Discard
warningW := io.Discard
infoW := io.Discard
logLevel := os.Getenv("GRPC_GO_LOG_SEVERITY_LEVEL")
switch logLevel {

View File

@@ -26,7 +26,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
@@ -79,7 +79,7 @@ func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger {
// Build is an internal only method for building the proto message out of the
// input event. It's made public to enable other library to reuse as much logic
// in TruncatingMethodLogger as possible.
func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *pb.GrpcLogEntry {
func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *binlogpb.GrpcLogEntry {
m := c.toProto()
timestamp, _ := ptypes.TimestampProto(time.Now())
m.Timestamp = timestamp
@@ -87,11 +87,11 @@ func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *pb.GrpcLogEntry {
m.SequenceIdWithinCall = ml.idWithinCallGen.next()
switch pay := m.Payload.(type) {
case *pb.GrpcLogEntry_ClientHeader:
case *binlogpb.GrpcLogEntry_ClientHeader:
m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata())
case *pb.GrpcLogEntry_ServerHeader:
case *binlogpb.GrpcLogEntry_ServerHeader:
m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata())
case *pb.GrpcLogEntry_Message:
case *binlogpb.GrpcLogEntry_Message:
m.PayloadTruncated = ml.truncateMessage(pay.Message)
}
return m
@@ -102,7 +102,7 @@ func (ml *TruncatingMethodLogger) Log(c LogEntryConfig) {
ml.sink.Write(ml.Build(c))
}
func (ml *TruncatingMethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
func (ml *TruncatingMethodLogger) truncateMetadata(mdPb *binlogpb.Metadata) (truncated bool) {
if ml.headerMaxLen == maxUInt {
return false
}
@@ -132,7 +132,7 @@ func (ml *TruncatingMethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated
return truncated
}
func (ml *TruncatingMethodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) {
func (ml *TruncatingMethodLogger) truncateMessage(msgPb *binlogpb.Message) (truncated bool) {
if ml.messageMaxLen == maxUInt {
return false
}
@@ -145,7 +145,7 @@ func (ml *TruncatingMethodLogger) truncateMessage(msgPb *pb.Message) (truncated
// LogEntryConfig represents the configuration for binary log entry.
type LogEntryConfig interface {
toProto() *pb.GrpcLogEntry
toProto() *binlogpb.GrpcLogEntry
}
// ClientHeader configs the binary log entry to be a ClientHeader entry.
@@ -159,10 +159,10 @@ type ClientHeader struct {
PeerAddr net.Addr
}
func (c *ClientHeader) toProto() *pb.GrpcLogEntry {
func (c *ClientHeader) toProto() *binlogpb.GrpcLogEntry {
// This function doesn't need to set all the fields (e.g. seq ID). The Log
// function will set the fields when necessary.
clientHeader := &pb.ClientHeader{
clientHeader := &binlogpb.ClientHeader{
Metadata: mdToMetadataProto(c.Header),
MethodName: c.MethodName,
Authority: c.Authority,
@@ -170,16 +170,16 @@ func (c *ClientHeader) toProto() *pb.GrpcLogEntry {
if c.Timeout > 0 {
clientHeader.Timeout = ptypes.DurationProto(c.Timeout)
}
ret := &pb.GrpcLogEntry{
Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
Payload: &pb.GrpcLogEntry_ClientHeader{
ret := &binlogpb.GrpcLogEntry{
Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
Payload: &binlogpb.GrpcLogEntry_ClientHeader{
ClientHeader: clientHeader,
},
}
if c.OnClientSide {
ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
} else {
ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
}
if c.PeerAddr != nil {
ret.Peer = addrToProto(c.PeerAddr)
@@ -195,19 +195,19 @@ type ServerHeader struct {
PeerAddr net.Addr
}
func (c *ServerHeader) toProto() *pb.GrpcLogEntry {
ret := &pb.GrpcLogEntry{
Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
Payload: &pb.GrpcLogEntry_ServerHeader{
ServerHeader: &pb.ServerHeader{
func (c *ServerHeader) toProto() *binlogpb.GrpcLogEntry {
ret := &binlogpb.GrpcLogEntry{
Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
Payload: &binlogpb.GrpcLogEntry_ServerHeader{
ServerHeader: &binlogpb.ServerHeader{
Metadata: mdToMetadataProto(c.Header),
},
},
}
if c.OnClientSide {
ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
} else {
ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
}
if c.PeerAddr != nil {
ret.Peer = addrToProto(c.PeerAddr)
@@ -223,7 +223,7 @@ type ClientMessage struct {
Message interface{}
}
func (c *ClientMessage) toProto() *pb.GrpcLogEntry {
func (c *ClientMessage) toProto() *binlogpb.GrpcLogEntry {
var (
data []byte
err error
@@ -238,19 +238,19 @@ func (c *ClientMessage) toProto() *pb.GrpcLogEntry {
} else {
grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
}
ret := &pb.GrpcLogEntry{
Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
Payload: &pb.GrpcLogEntry_Message{
Message: &pb.Message{
ret := &binlogpb.GrpcLogEntry{
Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
Payload: &binlogpb.GrpcLogEntry_Message{
Message: &binlogpb.Message{
Length: uint32(len(data)),
Data: data,
},
},
}
if c.OnClientSide {
ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
} else {
ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
}
return ret
}
@@ -263,7 +263,7 @@ type ServerMessage struct {
Message interface{}
}
func (c *ServerMessage) toProto() *pb.GrpcLogEntry {
func (c *ServerMessage) toProto() *binlogpb.GrpcLogEntry {
var (
data []byte
err error
@@ -278,19 +278,19 @@ func (c *ServerMessage) toProto() *pb.GrpcLogEntry {
} else {
grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
}
ret := &pb.GrpcLogEntry{
Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
Payload: &pb.GrpcLogEntry_Message{
Message: &pb.Message{
ret := &binlogpb.GrpcLogEntry{
Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
Payload: &binlogpb.GrpcLogEntry_Message{
Message: &binlogpb.Message{
Length: uint32(len(data)),
Data: data,
},
},
}
if c.OnClientSide {
ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
} else {
ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
}
return ret
}
@@ -300,15 +300,15 @@ type ClientHalfClose struct {
OnClientSide bool
}
func (c *ClientHalfClose) toProto() *pb.GrpcLogEntry {
ret := &pb.GrpcLogEntry{
Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE,
func (c *ClientHalfClose) toProto() *binlogpb.GrpcLogEntry {
ret := &binlogpb.GrpcLogEntry{
Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE,
Payload: nil, // No payload here.
}
if c.OnClientSide {
ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
} else {
ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
}
return ret
}
@@ -324,7 +324,7 @@ type ServerTrailer struct {
PeerAddr net.Addr
}
func (c *ServerTrailer) toProto() *pb.GrpcLogEntry {
func (c *ServerTrailer) toProto() *binlogpb.GrpcLogEntry {
st, ok := status.FromError(c.Err)
if !ok {
grpclogLogger.Info("binarylogging: error in trailer is not a status error")
@@ -340,10 +340,10 @@ func (c *ServerTrailer) toProto() *pb.GrpcLogEntry {
grpclogLogger.Infof("binarylogging: failed to marshal status proto: %v", err)
}
}
ret := &pb.GrpcLogEntry{
Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
Payload: &pb.GrpcLogEntry_Trailer{
Trailer: &pb.Trailer{
ret := &binlogpb.GrpcLogEntry{
Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
Payload: &binlogpb.GrpcLogEntry_Trailer{
Trailer: &binlogpb.Trailer{
Metadata: mdToMetadataProto(c.Trailer),
StatusCode: uint32(st.Code()),
StatusMessage: st.Message(),
@@ -352,9 +352,9 @@ func (c *ServerTrailer) toProto() *pb.GrpcLogEntry {
},
}
if c.OnClientSide {
ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
} else {
ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
}
if c.PeerAddr != nil {
ret.Peer = addrToProto(c.PeerAddr)
@@ -367,15 +367,15 @@ type Cancel struct {
OnClientSide bool
}
func (c *Cancel) toProto() *pb.GrpcLogEntry {
ret := &pb.GrpcLogEntry{
Type: pb.GrpcLogEntry_EVENT_TYPE_CANCEL,
func (c *Cancel) toProto() *binlogpb.GrpcLogEntry {
ret := &binlogpb.GrpcLogEntry{
Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CANCEL,
Payload: nil,
}
if c.OnClientSide {
ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
} else {
ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
}
return ret
}
@@ -392,15 +392,15 @@ func metadataKeyOmit(key string) bool {
return strings.HasPrefix(key, "grpc-")
}
func mdToMetadataProto(md metadata.MD) *pb.Metadata {
ret := &pb.Metadata{}
func mdToMetadataProto(md metadata.MD) *binlogpb.Metadata {
ret := &binlogpb.Metadata{}
for k, vv := range md {
if metadataKeyOmit(k) {
continue
}
for _, v := range vv {
ret.Entry = append(ret.Entry,
&pb.MetadataEntry{
&binlogpb.MetadataEntry{
Key: k,
Value: []byte(v),
},
@@ -410,26 +410,26 @@ func mdToMetadataProto(md metadata.MD) *pb.Metadata {
return ret
}
func addrToProto(addr net.Addr) *pb.Address {
ret := &pb.Address{}
func addrToProto(addr net.Addr) *binlogpb.Address {
ret := &binlogpb.Address{}
switch a := addr.(type) {
case *net.TCPAddr:
if a.IP.To4() != nil {
ret.Type = pb.Address_TYPE_IPV4
ret.Type = binlogpb.Address_TYPE_IPV4
} else if a.IP.To16() != nil {
ret.Type = pb.Address_TYPE_IPV6
ret.Type = binlogpb.Address_TYPE_IPV6
} else {
ret.Type = pb.Address_TYPE_UNKNOWN
ret.Type = binlogpb.Address_TYPE_UNKNOWN
// Do not set address and port fields.
break
}
ret.Address = a.IP.String()
ret.IpPort = uint32(a.Port)
case *net.UnixAddr:
ret.Type = pb.Address_TYPE_UNIX
ret.Type = binlogpb.Address_TYPE_UNIX
ret.Address = a.String()
default:
ret.Type = pb.Address_TYPE_UNKNOWN
ret.Type = binlogpb.Address_TYPE_UNKNOWN
}
return ret
}

View File

@@ -26,7 +26,7 @@ import (
"time"
"github.com/golang/protobuf/proto"
pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
)
var (
@@ -42,15 +42,15 @@ type Sink interface {
// Write will be called to write the log entry into the sink.
//
// It should be thread-safe so it can be called in parallel.
Write(*pb.GrpcLogEntry) error
Write(*binlogpb.GrpcLogEntry) error
// Close will be called when the Sink is replaced by a new Sink.
Close() error
}
type noopSink struct{}
func (ns *noopSink) Write(*pb.GrpcLogEntry) error { return nil }
func (ns *noopSink) Close() error { return nil }
func (ns *noopSink) Write(*binlogpb.GrpcLogEntry) error { return nil }
func (ns *noopSink) Close() error { return nil }
// newWriterSink creates a binary log sink with the given writer.
//
@@ -66,7 +66,7 @@ type writerSink struct {
out io.Writer
}
func (ws *writerSink) Write(e *pb.GrpcLogEntry) error {
func (ws *writerSink) Write(e *binlogpb.GrpcLogEntry) error {
b, err := proto.Marshal(e)
if err != nil {
grpclogLogger.Errorf("binary logging: failed to marshal proto message: %v", err)
@@ -96,7 +96,7 @@ type bufferedSink struct {
done chan struct{}
}
func (fs *bufferedSink) Write(e *pb.GrpcLogEntry) error {
func (fs *bufferedSink) Write(e *binlogpb.GrpcLogEntry) error {
fs.mu.Lock()
defer fs.mu.Unlock()
if !fs.flusherStarted {

View File

@@ -21,19 +21,42 @@ package envconfig
import (
"os"
"strconv"
"strings"
)
const (
prefix = "GRPC_GO_"
txtErrIgnoreStr = prefix + "IGNORE_TXT_ERRORS"
advertiseCompressorsStr = prefix + "ADVERTISE_COMPRESSORS"
)
var (
// TXTErrIgnore is set if TXT errors should be ignored ("GRPC_GO_IGNORE_TXT_ERRORS" is not "false").
TXTErrIgnore = !strings.EqualFold(os.Getenv(txtErrIgnoreStr), "false")
TXTErrIgnore = boolFromEnv("GRPC_GO_IGNORE_TXT_ERRORS", true)
// AdvertiseCompressors is set if registered compressor should be advertised
// ("GRPC_GO_ADVERTISE_COMPRESSORS" is not "false").
AdvertiseCompressors = !strings.EqualFold(os.Getenv(advertiseCompressorsStr), "false")
AdvertiseCompressors = boolFromEnv("GRPC_GO_ADVERTISE_COMPRESSORS", true)
// RingHashCap indicates the maximum ring size which defaults to 4096
// entries but may be overridden by setting the environment variable
// "GRPC_RING_HASH_CAP". This does not override the default bounds
// checking which NACKs configs specifying ring sizes > 8*1024*1024 (~8M).
RingHashCap = uint64FromEnv("GRPC_RING_HASH_CAP", 4096, 1, 8*1024*1024)
)
func boolFromEnv(envVar string, def bool) bool {
if def {
// The default is true; return true unless the variable is "false".
return !strings.EqualFold(os.Getenv(envVar), "false")
}
// The default is false; return false unless the variable is "true".
return strings.EqualFold(os.Getenv(envVar), "true")
}
func uint64FromEnv(envVar string, def, min, max uint64) uint64 {
v, err := strconv.ParseUint(os.Getenv(envVar), 10, 64)
if err != nil {
return def
}
if v < min {
return min
}
if v > max {
return max
}
return v
}

View File

@@ -20,7 +20,6 @@ package envconfig
import (
"os"
"strings"
)
const (
@@ -36,16 +35,6 @@ const (
//
// When both bootstrap FileName and FileContent are set, FileName is used.
XDSBootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG"
ringHashSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"
clientSideSecuritySupportEnv = "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT"
aggregateAndDNSSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"
rbacSupportEnv = "GRPC_XDS_EXPERIMENTAL_RBAC"
outlierDetectionSupportEnv = "GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION"
federationEnv = "GRPC_EXPERIMENTAL_XDS_FEDERATION"
rlsInXDSEnv = "GRPC_EXPERIMENTAL_XDS_RLS_LB"
c2pResolverTestOnlyTrafficDirectorURIEnv = "GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI"
)
var (
@@ -64,38 +53,40 @@ var (
// XDSRingHash indicates whether ring hash support is enabled, which can be
// disabled by setting the environment variable
// "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH" to "false".
XDSRingHash = !strings.EqualFold(os.Getenv(ringHashSupportEnv), "false")
XDSRingHash = boolFromEnv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", true)
// XDSClientSideSecurity is used to control processing of security
// configuration on the client-side.
//
// Note that there is no env var protection for the server-side because we
// have a brand new API on the server-side and users explicitly need to use
// the new API to get security integration on the server.
XDSClientSideSecurity = !strings.EqualFold(os.Getenv(clientSideSecuritySupportEnv), "false")
XDSClientSideSecurity = boolFromEnv("GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT", true)
// XDSAggregateAndDNS indicates whether processing of aggregated cluster
// and DNS cluster is enabled, which can be enabled by setting the
// environment variable
// "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" to
// "true".
XDSAggregateAndDNS = !strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "false")
XDSAggregateAndDNS = boolFromEnv("GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER", true)
// XDSRBAC indicates whether xDS configured RBAC HTTP Filter is enabled,
// which can be disabled by setting the environment variable
// "GRPC_XDS_EXPERIMENTAL_RBAC" to "false".
XDSRBAC = !strings.EqualFold(os.Getenv(rbacSupportEnv), "false")
XDSRBAC = boolFromEnv("GRPC_XDS_EXPERIMENTAL_RBAC", true)
// XDSOutlierDetection indicates whether outlier detection support is
// enabled, which can be disabled by setting the environment variable
// "GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION" to "false".
XDSOutlierDetection = !strings.EqualFold(os.Getenv(outlierDetectionSupportEnv), "false")
// XDSFederation indicates whether federation support is enabled.
XDSFederation = strings.EqualFold(os.Getenv(federationEnv), "true")
XDSOutlierDetection = boolFromEnv("GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION", true)
// XDSFederation indicates whether federation support is enabled, which can
// be enabled by setting the environment variable
// "GRPC_EXPERIMENTAL_XDS_FEDERATION" to "true".
XDSFederation = boolFromEnv("GRPC_EXPERIMENTAL_XDS_FEDERATION", false)
// XDSRLS indicates whether processing of Cluster Specifier plugins and
// support for the RLS CLuster Specifier is enabled, which can be enabled by
// setting the environment variable "GRPC_EXPERIMENTAL_XDS_RLS_LB" to
// "true".
XDSRLS = strings.EqualFold(os.Getenv(rlsInXDSEnv), "true")
XDSRLS = boolFromEnv("GRPC_EXPERIMENTAL_XDS_RLS_LB", false)
// C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing.
C2PResolverTestOnlyTrafficDirectorURI = os.Getenv(c2pResolverTestOnlyTrafficDirectorURIEnv)
C2PResolverTestOnlyTrafficDirectorURI = os.Getenv("GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI")
)

View File

@@ -77,6 +77,9 @@ var (
// ClearGlobalDialOptions clears the array of extra DialOption. This
// method is useful in testing and benchmarking.
ClearGlobalDialOptions func()
// JoinDialOptions combines the dial options passed as arguments into a
// single dial option.
JoinDialOptions interface{} // func(...grpc.DialOption) grpc.DialOption
// JoinServerOptions combines the server options passed as arguments into a
// single server option.
JoinServerOptions interface{} // func(...grpc.ServerOption) grpc.ServerOption

View File

@@ -116,7 +116,7 @@ type dnsBuilder struct{}
// Build creates and starts a DNS resolver that watches the name resolution of the target.
func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
host, port, err := parseTarget(target.Endpoint, defaultPort)
host, port, err := parseTarget(target.Endpoint(), defaultPort)
if err != nil {
return nil, err
}

View File

@@ -31,7 +31,7 @@ const scheme = "passthrough"
type passthroughBuilder struct{}
func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
if target.Endpoint == "" && opts.Dialer == nil {
if target.Endpoint() == "" && opts.Dialer == nil {
return nil, errors.New("passthrough: received empty target in Build()")
}
r := &passthroughResolver{
@@ -52,7 +52,7 @@ type passthroughResolver struct {
}
func (r *passthroughResolver) start() {
r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint}}})
r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint()}}})
}
func (*passthroughResolver) ResolveNow(o resolver.ResolveNowOptions) {}

View File

@@ -527,6 +527,9 @@ const minBatchSize = 1000
// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
// if the batch size is too low to give stream goroutines a chance to fill it up.
func (l *loopyWriter) run() (err error) {
// Always flush the writer before exiting in case there are pending frames
// to be sent.
defer l.framer.writer.Flush()
for {
it, err := l.cbuf.get(true)
if err != nil {
@@ -650,16 +653,18 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error {
itl: &itemList{},
wq: h.wq,
}
str.itl.enqueue(h)
return l.originateStream(str)
return l.originateStream(str, h)
}
func (l *loopyWriter) originateStream(str *outStream) error {
hdr := str.itl.dequeue().(*headerFrame)
func (l *loopyWriter) originateStream(str *outStream, hdr *headerFrame) error {
// l.draining is set when handling GoAway. In which case, we want to avoid
// creating new streams.
if l.draining {
// TODO: provide a better error with the reason we are in draining.
hdr.onOrphaned(errStreamDrain)
return nil
}
if err := hdr.initStream(str.id); err != nil {
if err == errStreamDrain { // errStreamDrain need not close transport
return nil
}
return err
}
if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
@@ -757,7 +762,7 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
return err
}
}
if l.side == clientSide && l.draining && len(l.estdStreams) == 0 {
if l.draining && len(l.estdStreams) == 0 {
return errors.New("finished processing active streams while in draining mode")
}
return nil
@@ -812,7 +817,6 @@ func (l *loopyWriter) goAwayHandler(g *goAway) error {
}
func (l *loopyWriter) closeConnectionHandler() error {
l.framer.writer.Flush()
// Exit loopyWriter entirely by returning an error here. This will lead to
// the transport closing the connection, and, ultimately, transport
// closure.

View File

@@ -47,3 +47,9 @@ const (
defaultClientMaxHeaderListSize = uint32(16 << 20)
defaultServerMaxHeaderListSize = uint32(16 << 20)
)
// MaxStreamID is the upper bound for the stream ID before the current
// transport gracefully closes and new transport is created for subsequent RPCs.
// This is set to 75% of 2^31-1. Streams are identified with an unsigned 31-bit
// integer. It's exported so that tests can override it.
var MaxStreamID = uint32(math.MaxInt32 * 3 / 4)

View File

@@ -65,7 +65,7 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s
contentSubtype, validContentType := grpcutil.ContentSubtype(contentType)
if !validContentType {
msg := fmt.Sprintf("invalid gRPC request content-type %q", contentType)
http.Error(w, msg, http.StatusBadRequest)
http.Error(w, msg, http.StatusUnsupportedMediaType)
return nil, errors.New(msg)
}
if _, ok := w.(http.Flusher); !ok {
@@ -87,7 +87,7 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s
if v := r.Header.Get("grpc-timeout"); v != "" {
to, err := decodeTimeout(v)
if err != nil {
msg := fmt.Sprintf("malformed time-out: %v", err)
msg := fmt.Sprintf("malformed grpc-timeout: %v", err)
http.Error(w, msg, http.StatusBadRequest)
return nil, status.Error(codes.Internal, msg)
}

View File

@@ -140,8 +140,7 @@ type http2Client struct {
channelzID *channelz.Identifier
czData *channelzData
onGoAway func(GoAwayReason)
onClose func()
onClose func(GoAwayReason)
bufferPool *bufferPool
@@ -197,7 +196,7 @@ func isTemporary(err error) bool {
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ *http2Client, err error) {
scheme := "http"
ctx, cancel := context.WithCancel(ctx)
defer func() {
@@ -217,7 +216,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
if opts.FailOnNonTempDialError {
return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
}
return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
return nil, connectionErrorf(true, err, "transport: Error while dialing: %v", err)
}
// Any further errors will close the underlying connection
@@ -343,7 +342,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
streamQuota: defaultMaxStreamsClient,
streamsQuotaAvailable: make(chan struct{}, 1),
czData: new(channelzData),
onGoAway: onGoAway,
keepaliveEnabled: keepaliveEnabled,
bufferPool: newBufferPool(),
onClose: onClose,
@@ -744,15 +742,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
endStream: false,
initStream: func(id uint32) error {
t.mu.Lock()
if state := t.state; state != reachable {
// TODO: handle transport closure in loopy instead and remove this
// initStream is never called when transport is draining.
if t.state == closing {
t.mu.Unlock()
// Do a quick cleanup.
err := error(errStreamDrain)
if state == closing {
err = ErrConnClosing
}
cleanup(err)
return err
cleanup(ErrConnClosing)
return ErrConnClosing
}
if channelz.IsOn() {
atomic.AddInt64(&t.czData.streamsStarted, 1)
@@ -770,6 +765,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
}
firstTry := true
var ch chan struct{}
transportDrainRequired := false
checkForStreamQuota := func(it interface{}) bool {
if t.streamQuota <= 0 { // Can go negative if server decreases it.
if firstTry {
@@ -785,6 +781,11 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
h := it.(*headerFrame)
h.streamID = t.nextID
t.nextID += 2
// Drain client transport if nextID > MaxStreamID which signals gRPC that
// the connection is closed and a new one must be created for subsequent RPCs.
transportDrainRequired = t.nextID > MaxStreamID
s.id = h.streamID
s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
t.mu.Lock()
@@ -864,6 +865,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
sh.HandleRPC(s.ctx, outHeader)
}
}
if transportDrainRequired {
if logger.V(logLevel) {
logger.Infof("transport: t.nextID > MaxStreamID. Draining")
}
t.GracefulClose()
}
return s, nil
}
@@ -957,7 +964,9 @@ func (t *http2Client) Close(err error) {
}
// Call t.onClose ASAP to prevent the client from attempting to create new
// streams.
t.onClose()
if t.state != draining {
t.onClose(GoAwayInvalid)
}
t.state = closing
streams := t.activeStreams
t.activeStreams = nil
@@ -1010,6 +1019,7 @@ func (t *http2Client) GracefulClose() {
if logger.V(logLevel) {
logger.Infof("transport: GracefulClose called")
}
t.onClose(GoAwayInvalid)
t.state = draining
active := len(t.activeStreams)
t.mu.Unlock()
@@ -1172,7 +1182,7 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
statusCode, ok := http2ErrConvTab[f.ErrCode]
if !ok {
if logger.V(logLevel) {
logger.Warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
logger.Warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error: %v", f.ErrCode)
}
statusCode = codes.Unknown
}
@@ -1290,8 +1300,10 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
// Notify the clientconn about the GOAWAY before we set the state to
// draining, to allow the client to stop attempting to create streams
// before disallowing new streams on this connection.
t.onGoAway(t.goAwayReason)
t.state = draining
if t.state != draining {
t.onClose(t.goAwayReason)
t.state = draining
}
}
// All streams with IDs greater than the GoAwayId
// and smaller than the previous GoAway ID should be killed.
@@ -1780,3 +1792,9 @@ func (t *http2Client) getOutFlowWindow() int64 {
return -2
}
}
func (t *http2Client) stateForTesting() transportState {
t.mu.Lock()
defer t.mu.Unlock()
return t.state
}

View File

@@ -380,13 +380,14 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
fc: &inFlow{limit: uint32(t.initialWindowSize)},
}
var (
// If a gRPC Response-Headers has already been received, then it means
// that the peer is speaking gRPC and we are in gRPC mode.
isGRPC = false
mdata = make(map[string][]string)
httpMethod string
// headerError is set if an error is encountered while parsing the headers
headerError bool
// if false, content-type was missing or invalid
isGRPC = false
contentType = ""
mdata = make(map[string][]string)
httpMethod string
// these are set if an error is encountered while parsing the headers
protocolError bool
headerError *status.Status
timeoutSet bool
timeout time.Duration
@@ -397,6 +398,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
case "content-type":
contentSubtype, validContentType := grpcutil.ContentSubtype(hf.Value)
if !validContentType {
contentType = hf.Value
break
}
mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
@@ -412,7 +414,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
timeoutSet = true
var err error
if timeout, err = decodeTimeout(hf.Value); err != nil {
headerError = true
headerError = status.Newf(codes.Internal, "malformed grpc-timeout: %v", err)
}
// "Transports must consider requests containing the Connection header
// as malformed." - A41
@@ -420,14 +422,14 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.operateHeaders parsed a :connection header which makes a request malformed as per the HTTP/2 spec")
}
headerError = true
protocolError = true
default:
if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
break
}
v, err := decodeMetadataHeader(hf.Name, hf.Value)
if err != nil {
headerError = true
headerError = status.Newf(codes.Internal, "malformed binary metadata %q in header %q: %v", hf.Value, hf.Name, err)
logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
break
}
@@ -446,7 +448,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
logger.Errorf("transport: %v", errMsg)
}
t.controlBuf.put(&earlyAbortStream{
httpStatus: 400,
httpStatus: http.StatusBadRequest,
streamID: streamID,
contentSubtype: s.contentSubtype,
status: status.New(codes.Internal, errMsg),
@@ -455,7 +457,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
return nil
}
if !isGRPC || headerError {
if protocolError {
t.controlBuf.put(&cleanupStream{
streamID: streamID,
rst: true,
@@ -464,6 +466,26 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
})
return nil
}
if !isGRPC {
t.controlBuf.put(&earlyAbortStream{
httpStatus: http.StatusUnsupportedMediaType,
streamID: streamID,
contentSubtype: s.contentSubtype,
status: status.Newf(codes.InvalidArgument, "invalid gRPC request content-type %q", contentType),
rst: !frame.StreamEnded(),
})
return nil
}
if headerError != nil {
t.controlBuf.put(&earlyAbortStream{
httpStatus: http.StatusBadRequest,
streamID: streamID,
contentSubtype: s.contentSubtype,
status: headerError,
rst: !frame.StreamEnded(),
})
return nil
}
// "If :authority is missing, Host must be renamed to :authority." - A41
if len(mdata[":authority"]) == 0 {

View File

@@ -583,8 +583,8 @@ type ConnectOptions struct {
// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
return newHTTP2Client(connectCtx, ctx, addr, opts, onGoAway, onClose)
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (ClientTransport, error) {
return newHTTP2Client(connectCtx, ctx, addr, opts, onClose)
}
// Options provides additional hints and information for message

View File

@@ -58,12 +58,18 @@ func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
pw.mu.Unlock()
}
func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) func(balancer.DoneInfo) {
// doneChannelzWrapper performs the following:
// - increments the calls started channelz counter
// - wraps the done function in the passed in result to increment the calls
// failed or calls succeeded channelz counter before invoking the actual
// done function.
func doneChannelzWrapper(acw *acBalancerWrapper, result *balancer.PickResult) {
acw.mu.Lock()
ac := acw.ac
acw.mu.Unlock()
ac.incrCallsStarted()
return func(b balancer.DoneInfo) {
done := result.Done
result.Done = func(b balancer.DoneInfo) {
if b.Err != nil && b.Err != io.EOF {
ac.incrCallsFailed()
} else {
@@ -82,7 +88,7 @@ func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) f
// - the current picker returns other errors and failfast is false.
// - the subConn returned by the current picker is not READY
// When one of these situations happens, pick blocks until the picker gets updated.
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, balancer.PickResult, error) {
var ch chan struct{}
var lastPickErr error
@@ -90,7 +96,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
pw.mu.Lock()
if pw.done {
pw.mu.Unlock()
return nil, nil, ErrClientConnClosing
return nil, balancer.PickResult{}, ErrClientConnClosing
}
if pw.picker == nil {
@@ -111,9 +117,9 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
}
switch ctx.Err() {
case context.DeadlineExceeded:
return nil, nil, status.Error(codes.DeadlineExceeded, errStr)
return nil, balancer.PickResult{}, status.Error(codes.DeadlineExceeded, errStr)
case context.Canceled:
return nil, nil, status.Error(codes.Canceled, errStr)
return nil, balancer.PickResult{}, status.Error(codes.Canceled, errStr)
}
case <-ch:
}
@@ -125,7 +131,6 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
pw.mu.Unlock()
pickResult, err := p.Pick(info)
if err != nil {
if err == balancer.ErrNoSubConnAvailable {
continue
@@ -136,7 +141,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
if istatus.IsRestrictedControlPlaneCode(st) {
err = status.Errorf(codes.Internal, "received picker error with illegal status: %v", err)
}
return nil, nil, dropError{error: err}
return nil, balancer.PickResult{}, dropError{error: err}
}
// For all other errors, wait for ready RPCs should block and other
// RPCs should fail with unavailable.
@@ -144,7 +149,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
lastPickErr = err
continue
}
return nil, nil, status.Error(codes.Unavailable, err.Error())
return nil, balancer.PickResult{}, status.Error(codes.Unavailable, err.Error())
}
acw, ok := pickResult.SubConn.(*acBalancerWrapper)
@@ -154,9 +159,10 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
}
if t := acw.getAddrConn().getReadyTransport(); t != nil {
if channelz.IsOn() {
return t, doneChannelzWrapper(acw, pickResult.Done), nil
doneChannelzWrapper(acw, &pickResult)
return t, pickResult, nil
}
return t, pickResult.Done, nil
return t, pickResult, nil
}
if pickResult.Done != nil {
// Calling done with nil error, no bytes sent and no bytes received.

View File

@@ -51,7 +51,7 @@ type pickfirstBalancer struct {
func (b *pickfirstBalancer) ResolverError(err error) {
if logger.V(2) {
logger.Infof("pickfirstBalancer: ResolverError called with error %v", err)
logger.Infof("pickfirstBalancer: ResolverError called with error: %v", err)
}
if b.subConn == nil {
b.state = connectivity.TransientFailure

View File

@@ -24,6 +24,7 @@ import (
"context"
"net"
"net/url"
"strings"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/credentials"
@@ -247,9 +248,6 @@ type Target struct {
Scheme string
// Deprecated: use URL.Host instead.
Authority string
// Deprecated: use URL.Path or URL.Opaque instead. The latter is set when
// the former is empty.
Endpoint string
// URL contains the parsed dial target with an optional default scheme added
// to it if the original dial target contained no scheme or contained an
// unregistered scheme. Any query params specified in the original dial
@@ -257,6 +255,24 @@ type Target struct {
URL url.URL
}
// Endpoint retrieves endpoint without leading "/" from either `URL.Path`
// or `URL.Opaque`. The latter is used when the former is empty.
func (t Target) Endpoint() string {
endpoint := t.URL.Path
if endpoint == "" {
endpoint = t.URL.Opaque
}
// For targets of the form "[scheme]://[authority]/endpoint, the endpoint
// value returned from url.Parse() contains a leading "/". Although this is
// in accordance with RFC 3986, we do not want to break existing resolver
// implementations which expect the endpoint without the leading "/". So, we
// end up stripping the leading "/" here. But this will result in an
// incorrect parsing for something like "unix:///path/to/socket". Since we
// own the "unix" resolver, we can workaround in the unix resolver by using
// the `URL` field.
return strings.TrimPrefix(endpoint, "/")
}
// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
// Build creates a new resolver for the given target.

View File

@@ -25,7 +25,6 @@ import (
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"math"
"strings"
"sync"
@@ -77,7 +76,7 @@ func NewGZIPCompressorWithLevel(level int) (Compressor, error) {
return &gzipCompressor{
pool: sync.Pool{
New: func() interface{} {
w, err := gzip.NewWriterLevel(ioutil.Discard, level)
w, err := gzip.NewWriterLevel(io.Discard, level)
if err != nil {
panic(err)
}
@@ -143,7 +142,7 @@ func (d *gzipDecompressor) Do(r io.Reader) ([]byte, error) {
z.Close()
d.pool.Put(z)
}()
return ioutil.ReadAll(z)
return io.ReadAll(z)
}
func (d *gzipDecompressor) Type() string {
@@ -297,7 +296,8 @@ func (o FailFastCallOption) before(c *callInfo) error {
func (o FailFastCallOption) after(c *callInfo, attempt *csAttempt) {}
// MaxCallRecvMsgSize returns a CallOption which sets the maximum message size
// in bytes the client can receive.
// in bytes the client can receive. If this is not set, gRPC uses the default
// 4MB.
func MaxCallRecvMsgSize(bytes int) CallOption {
return MaxRecvMsgSizeCallOption{MaxRecvMsgSize: bytes}
}
@@ -320,7 +320,8 @@ func (o MaxRecvMsgSizeCallOption) before(c *callInfo) error {
func (o MaxRecvMsgSizeCallOption) after(c *callInfo, attempt *csAttempt) {}
// MaxCallSendMsgSize returns a CallOption which sets the maximum message size
// in bytes the client can send.
// in bytes the client can send. If this is not set, gRPC uses the default
// `math.MaxInt32`.
func MaxCallSendMsgSize(bytes int) CallOption {
return MaxSendMsgSizeCallOption{MaxSendMsgSize: bytes}
}
@@ -711,7 +712,7 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei
d, size, err = decompress(compressor, d, maxReceiveMessageSize)
}
if err != nil {
return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message: %v", err)
}
if size > maxReceiveMessageSize {
// TODO: Revisit the error code. Currently keep it consistent with java
@@ -746,7 +747,7 @@ func decompress(compressor encoding.Compressor, d []byte, maxReceiveMessageSize
}
// Read from LimitReader with limit max+1. So if the underlying
// reader is over limit, the result will be bigger than max.
d, err = ioutil.ReadAll(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))
d, err = io.ReadAll(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))
return d, len(d), err
}
@@ -759,7 +760,7 @@ func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interf
return err
}
if err := c.Unmarshal(d, m); err != nil {
return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message: %v", err)
}
if payInfo != nil {
payInfo.uncompressedBytes = d

View File

@@ -1299,7 +1299,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
if err != nil {
if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
}
return err
}

View File

@@ -226,7 +226,7 @@ func parseServiceConfig(js string) *serviceconfig.ParseResult {
var rsc jsonSC
err := json.Unmarshal([]byte(js), &rsc)
if err != nil {
logger.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
logger.Warningf("grpc: unmarshaling service config %s: %v", js, err)
return &serviceconfig.ParseResult{Err: err}
}
sc := ServiceConfig{
@@ -254,7 +254,7 @@ func parseServiceConfig(js string) *serviceconfig.ParseResult {
}
d, err := parseDuration(m.Timeout)
if err != nil {
logger.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
logger.Warningf("grpc: unmarshaling service config %s: %v", js, err)
return &serviceconfig.ParseResult{Err: err}
}
@@ -263,7 +263,7 @@ func parseServiceConfig(js string) *serviceconfig.ParseResult {
Timeout: d,
}
if mc.RetryPolicy, err = convertRetryPolicy(m.RetryPolicy); err != nil {
logger.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
logger.Warningf("grpc: unmarshaling service config %s: %v", js, err)
return &serviceconfig.ParseResult{Err: err}
}
if m.MaxRequestMessageBytes != nil {
@@ -283,13 +283,13 @@ func parseServiceConfig(js string) *serviceconfig.ParseResult {
for i, n := range *m.Name {
path, err := n.generatePath()
if err != nil {
logger.Warningf("grpc: parseServiceConfig error unmarshaling %s due to methodConfig[%d]: %v", js, i, err)
logger.Warningf("grpc: error unmarshaling service config %s due to methodConfig[%d]: %v", js, i, err)
return &serviceconfig.ParseResult{Err: err}
}
if _, ok := paths[path]; ok {
err = errDuplicatedName
logger.Warningf("grpc: parseServiceConfig error unmarshaling %s due to methodConfig[%d]: %v", js, i, err)
logger.Warningf("grpc: error unmarshaling service config %s due to methodConfig[%d]: %v", js, i, err)
return &serviceconfig.ParseResult{Err: err}
}
paths[path] = struct{}{}

View File

@@ -438,7 +438,7 @@ func (a *csAttempt) getTransport() error {
cs := a.cs
var err error
a.t, a.done, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method)
a.t, a.pickResult, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method)
if err != nil {
if de, ok := err.(dropError); ok {
err = de.error
@@ -455,6 +455,25 @@ func (a *csAttempt) getTransport() error {
func (a *csAttempt) newStream() error {
cs := a.cs
cs.callHdr.PreviousAttempts = cs.numRetries
// Merge metadata stored in PickResult, if any, with existing call metadata.
// It is safe to overwrite the csAttempt's context here, since all state
// maintained in it are local to the attempt. When the attempt has to be
// retried, a new instance of csAttempt will be created.
if a.pickResult.Metatada != nil {
// We currently do not have a function it the metadata package which
// merges given metadata with existing metadata in a context. Existing
// function `AppendToOutgoingContext()` takes a variadic argument of key
// value pairs.
//
// TODO: Make it possible to retrieve key value pairs from metadata.MD
// in a form passable to AppendToOutgoingContext(), or create a version
// of AppendToOutgoingContext() that accepts a metadata.MD.
md, _ := metadata.FromOutgoingContext(a.ctx)
md = metadata.Join(md, a.pickResult.Metatada)
a.ctx = metadata.NewOutgoingContext(a.ctx, md)
}
s, err := a.t.NewStream(a.ctx, cs.callHdr)
if err != nil {
nse, ok := err.(*transport.NewStreamError)
@@ -529,12 +548,12 @@ type clientStream struct {
// csAttempt implements a single transport stream attempt within a
// clientStream.
type csAttempt struct {
ctx context.Context
cs *clientStream
t transport.ClientTransport
s *transport.Stream
p *parser
done func(balancer.DoneInfo)
ctx context.Context
cs *clientStream
t transport.ClientTransport
s *transport.Stream
p *parser
pickResult balancer.PickResult
finished bool
dc Decompressor
@@ -1103,12 +1122,12 @@ func (a *csAttempt) finish(err error) {
tr = a.s.Trailer()
}
if a.done != nil {
if a.pickResult.Done != nil {
br := false
if a.s != nil {
br = a.s.BytesReceived()
}
a.done(balancer.DoneInfo{
a.pickResult.Done(balancer.DoneInfo{
Err: err,
Trailer: tr,
BytesSent: a.s != nil,
@@ -1464,6 +1483,9 @@ type ServerStream interface {
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines.
//
// It is not safe to modify the message after calling SendMsg. Tracing
// libraries and stats handlers may use the message lazily.
SendMsg(m interface{}) error
// RecvMsg blocks until it receives a message into m or the stream is
// done. It returns io.EOF when the client has performed a CloseSend. On

View File

@@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
const Version = "1.52.3"
const Version = "1.53.0"

20
vendor/google.golang.org/grpc/vet.sh generated vendored
View File

@@ -66,6 +66,17 @@ elif [[ "$#" -ne 0 ]]; then
die "Unknown argument(s): $*"
fi
# - Check that generated proto files are up to date.
if [[ -z "${VET_SKIP_PROTO}" ]]; then
PATH="/home/travis/bin:${PATH}" make proto && \
git status --porcelain 2>&1 | fail_on_output || \
(git status; git --no-pager diff; exit 1)
fi
if [[ -n "${VET_ONLY_PROTO}" ]]; then
exit 0
fi
# - Ensure all source files contain a copyright message.
# (Done in two parts because Darwin "git grep" has broken support for compound
# exclusion matches.)
@@ -93,13 +104,6 @@ git grep '"github.com/envoyproxy/go-control-plane/envoy' -- '*.go' ':(exclude)*.
misspell -error .
# - Check that generated proto files are up to date.
if [[ -z "${VET_SKIP_PROTO}" ]]; then
PATH="/home/travis/bin:${PATH}" make proto && \
git status --porcelain 2>&1 | fail_on_output || \
(git status; git --no-pager diff; exit 1)
fi
# - gofmt, goimports, golint (with exceptions for generated code), go vet,
# go mod tidy.
# Perform these checks on each module inside gRPC.
@@ -111,7 +115,7 @@ for MOD_FILE in $(find . -name 'go.mod'); do
goimports -l . 2>&1 | not grep -vE "\.pb\.go"
golint ./... 2>&1 | not grep -vE "/grpc_testing_not_regenerate/.*\.pb\.go:"
go mod tidy
go mod tidy -compat=1.17
git status --porcelain 2>&1 | fail_on_output || \
(git status; git --no-pager diff; exit 1)
popd

6
vendor/modules.txt vendored
View File

@@ -527,7 +527,7 @@ github.com/gregjones/httpcache
# github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
## explicit
github.com/grpc-ecosystem/go-grpc-prometheus
# github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.1
# github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3
## explicit; go 1.17
github.com/grpc-ecosystem/grpc-gateway/v2/internal/httprule
github.com/grpc-ecosystem/grpc-gateway/v2/runtime
@@ -1366,7 +1366,7 @@ google.golang.org/genproto/googleapis/api/httpbody
google.golang.org/genproto/googleapis/rpc/errdetails
google.golang.org/genproto/googleapis/rpc/status
google.golang.org/genproto/protobuf/field_mask
# google.golang.org/grpc v1.52.3 => google.golang.org/grpc v1.52.3
# google.golang.org/grpc v1.53.0 => google.golang.org/grpc v1.53.0
## explicit; go 1.17
google.golang.org/grpc
google.golang.org/grpc/attributes
@@ -2806,7 +2806,7 @@ sigs.k8s.io/yaml
# gomodules.xyz/jsonpatch/v2 => gomodules.xyz/jsonpatch/v2 v2.2.0
# google.golang.org/appengine => google.golang.org/appengine v1.6.7
# google.golang.org/genproto => google.golang.org/genproto v0.0.0-20230124163310-31e0e69b6fc2
# google.golang.org/grpc => google.golang.org/grpc v1.52.3
# google.golang.org/grpc => google.golang.org/grpc v1.53.0
# google.golang.org/protobuf => google.golang.org/protobuf v1.28.1
# gopkg.in/asn1-ber.v1 => gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d
# gopkg.in/cas.v2 => gopkg.in/cas.v2 v2.2.0