Fix dynamic resource API (#5573)

This commit is contained in:
hongming
2023-03-08 19:10:44 +08:00
committed by GitHub
parent f63c371eaf
commit 00b0229f77
17 changed files with 491 additions and 591 deletions

View File

@@ -33,7 +33,6 @@ import (
urlruntime "k8s.io/apimachinery/pkg/util/runtime" urlruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
unionauth "k8s.io/apiserver/pkg/authentication/request/union" unionauth "k8s.io/apiserver/pkg/authentication/request/union"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/client-go/discovery" "k8s.io/client-go/discovery"
"k8s.io/client-go/util/retry" "k8s.io/client-go/util/retry"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@@ -61,9 +60,7 @@ import (
"kubesphere.io/kubesphere/pkg/apiserver/authorization/rbac" "kubesphere.io/kubesphere/pkg/apiserver/authorization/rbac"
unionauthorizer "kubesphere.io/kubesphere/pkg/apiserver/authorization/union" unionauthorizer "kubesphere.io/kubesphere/pkg/apiserver/authorization/union"
apiserverconfig "kubesphere.io/kubesphere/pkg/apiserver/config" apiserverconfig "kubesphere.io/kubesphere/pkg/apiserver/config"
"kubesphere.io/kubesphere/pkg/apiserver/dispatch"
"kubesphere.io/kubesphere/pkg/apiserver/filters" "kubesphere.io/kubesphere/pkg/apiserver/filters"
"kubesphere.io/kubesphere/pkg/apiserver/proxies"
"kubesphere.io/kubesphere/pkg/apiserver/request" "kubesphere.io/kubesphere/pkg/apiserver/request"
"kubesphere.io/kubesphere/pkg/informers" "kubesphere.io/kubesphere/pkg/informers"
alertingv1 "kubesphere.io/kubesphere/pkg/kapis/alerting/v1" alertingv1 "kubesphere.io/kubesphere/pkg/kapis/alerting/v1"
@@ -176,22 +173,21 @@ type APIServer struct {
func (s *APIServer) PrepareRun(stopCh <-chan struct{}) error { func (s *APIServer) PrepareRun(stopCh <-chan struct{}) error {
s.container = restful.NewContainer() s.container = restful.NewContainer()
s.container.Filter(logRequestAndResponse) s.container.Filter(logRequestAndResponse)
s.container.Filter(monitorRequest)
s.container.Router(restful.CurlyRouter{}) s.container.Router(restful.CurlyRouter{})
s.container.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) { s.container.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(panicReason, httpWriter) logStackOnRecover(panicReason, httpWriter)
}) })
s.installDynamicResourceAPI()
s.installKubeSphereAPIs(stopCh) s.installKubeSphereAPIs(stopCh)
s.installMetricsAPI() s.installMetricsAPI()
s.installHealthz() s.installHealthz()
s.container.Filter(monitorRequest)
for _, ws := range s.container.RegisteredWebServices() { for _, ws := range s.container.RegisteredWebServices() {
klog.V(2).Infof("%s", ws.RootPath()) klog.V(2).Infof("%s", ws.RootPath())
} }
s.Server.Handler = s.container s.Server.Handler = s.container
s.buildHandlerChain(stopCh) s.buildHandlerChain(stopCh)
return nil return nil
@@ -338,11 +334,9 @@ func (s *APIServer) buildHandlerChain(stopCh <-chan struct{}) {
notificationv2beta2.Resource(notificationv2beta2.ResourcesPluralSilence), notificationv2beta2.Resource(notificationv2beta2.ResourcesPluralSilence),
) )
} }
handler := s.Server.Handler
handler = filters.WithKubeAPIServer(handler, s.KubernetesClient.Config(), &errorResponder{})
middleware := proxies.NewUnregisteredMiddleware(s.container, resourcev1beta1.New(s.RuntimeClient, s.RuntimeCache)) handler := s.Server.Handler
handler = filters.WithMiddleware(handler, middleware) handler = filters.WithKubeAPIServer(handler, s.KubernetesClient.Config())
if s.Config.AuditingOptions.Enable { if s.Config.AuditingOptions.Enable {
handler = filters.WithAuditing(handler, handler = filters.WithAuditing(handler,
@@ -367,8 +361,7 @@ func (s *APIServer) buildHandlerChain(stopCh <-chan struct{}) {
handler = filters.WithAuthorization(handler, authorizers) handler = filters.WithAuthorization(handler, authorizers)
if s.Config.MultiClusterOptions.Enable { if s.Config.MultiClusterOptions.Enable {
clusterDispatcher := dispatch.NewClusterDispatch(s.ClusterClient) handler = filters.WithMulticluster(handler, s.ClusterClient)
handler = filters.WithMultipleClusterDispatcher(handler, clusterDispatcher)
} }
userLister := s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister() userLister := s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister()
@@ -633,6 +626,18 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error {
} }
func (s *APIServer) installDynamicResourceAPI() {
dynamicResourceHandler := filters.NewDynamicResourceHandle(func(err restful.ServiceError, req *restful.Request, resp *restful.Response) {
for header, values := range err.Header {
for _, value := range values {
resp.Header().Add(header, value)
}
}
resp.WriteErrorString(err.Code, err.Message)
}, resourcev1beta1.New(s.RuntimeClient, s.RuntimeCache))
s.container.ServiceErrorHandler(dynamicResourceHandler.HandleServiceError)
}
func logStackOnRecover(panicReason interface{}, w http.ResponseWriter) { func logStackOnRecover(panicReason interface{}, w http.ResponseWriter) {
var buffer bytes.Buffer var buffer bytes.Buffer
buffer.WriteString(fmt.Sprintf("recover from panic situation: - %v\r\n", panicReason)) buffer.WriteString(fmt.Sprintf("recover from panic situation: - %v\r\n", panicReason))
@@ -674,10 +679,3 @@ func logRequestAndResponse(req *restful.Request, resp *restful.Response, chain *
time.Since(start)/time.Millisecond, time.Since(start)/time.Millisecond,
) )
} }
type errorResponder struct{}
func (e *errorResponder) Error(w http.ResponseWriter, req *http.Request, err error) {
klog.Error(err)
responsewriters.InternalError(w, req, err)
}

View File

@@ -1,17 +0,0 @@
/*
Copyright 2020 KubeSphere Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dispatch

View File

@@ -25,39 +25,45 @@ import (
"kubesphere.io/kubesphere/pkg/apiserver/request" "kubesphere.io/kubesphere/pkg/apiserver/request"
) )
func WithAuditing(handler http.Handler, a auditing.Auditing) http.Handler { type auditingFilter struct {
next http.Handler
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { auditing.Auditing
}
// When auditing level is LevelNone, request should not be auditing.
// Auditing level can be modified with cr kube-auditing-webhook, func WithAuditing(next http.Handler, auditing auditing.Auditing) http.Handler {
// so it need to judge every time. return &auditingFilter{
if !a.Enabled() { next: next,
handler.ServeHTTP(w, req) Auditing: auditing,
return }
} }
info, ok := request.RequestInfoFrom(req.Context()) func (a *auditingFilter) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if !ok { // When auditing level is LevelNone, request should not be auditing.
klog.Error("Unable to retrieve request info from request") // Auditing level can be modified with cr kube-auditing-webhook,
handler.ServeHTTP(w, req) // so it need to judge every time.
return if !a.Enabled() {
} a.next.ServeHTTP(w, req)
return
// Auditing should igonre k8s request when k8s auditing is enabled. }
if info.IsKubernetesRequest && a.K8sAuditingEnabled() {
handler.ServeHTTP(w, req) info, ok := request.RequestInfoFrom(req.Context())
return if !ok {
} klog.Error("Unable to retrieve request info from request")
a.next.ServeHTTP(w, req)
e := a.LogRequestObject(req, info) return
if e != nil { }
resp := auditing.NewResponseCapture(w)
handler.ServeHTTP(resp, req) // Auditing should ignore k8s request when k8s auditing is enabled.
if info.IsKubernetesRequest && a.K8sAuditingEnabled() {
go a.LogResponseObject(e, resp) a.next.ServeHTTP(w, req)
} else { return
handler.ServeHTTP(w, req) }
}
}) if event := a.LogRequestObject(req, info); event != nil {
resp := auditing.NewResponseCapture(w)
a.next.ServeHTTP(resp, req)
go a.LogResponseObject(event, resp)
} else {
a.next.ServeHTTP(w, req)
}
} }

View File

@@ -32,41 +32,51 @@ import (
"kubesphere.io/kubesphere/pkg/apiserver/request" "kubesphere.io/kubesphere/pkg/apiserver/request"
) )
type authnFilter struct {
next http.Handler
authenticator.Request
serializer runtime.NegotiatedSerializer
}
// WithAuthentication installs authentication handler to handler chain. // WithAuthentication installs authentication handler to handler chain.
// The following part is a little bit ugly, WithAuthentication also logs user failed login attempt // The following part is a little bit ugly, WithAuthentication also logs user failed login attempt
// if using basic auth. But only treats request with requestURI `/oauth/authorize` as login attempt // if using basic auth. But only treats request with requestURI `/oauth/authorize` as login attempt
func WithAuthentication(handler http.Handler, authRequest authenticator.Request) http.Handler { func WithAuthentication(next http.Handler, authenticator authenticator.Request) http.Handler {
if authRequest == nil { if authenticator == nil {
klog.Warningf("Authentication is disabled") klog.Warningf("Authentication is disabled")
return handler return next
} }
s := serializer.NewCodecFactory(runtime.NewScheme()).WithoutConversion() return &authnFilter{
next: next,
Request: authenticator,
serializer: serializer.NewCodecFactory(runtime.NewScheme()).WithoutConversion(),
}
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { func (a *authnFilter) ServeHTTP(w http.ResponseWriter, req *http.Request) {
resp, ok, err := authRequest.AuthenticateRequest(req) resp, ok, err := a.AuthenticateRequest(req)
_, _, usingBasicAuth := req.BasicAuth() _, _, usingBasicAuth := req.BasicAuth()
defer func() { defer func() {
// if we authenticated successfully, go ahead and remove the bearer token so that no one // if we authenticated successfully, go ahead and remove the bearer token so that no one
// is ever tempted to use it inside of the API server // is ever tempted to use it inside the API server
if usingBasicAuth && ok { if usingBasicAuth && ok {
req.Header.Del("Authorization") req.Header.Del("Authorization")
} }
}() }()
if err != nil || !ok { if err != nil || !ok {
ctx := req.Context() ctx := req.Context()
requestInfo, found := request.RequestInfoFrom(ctx) requestInfo, found := request.RequestInfoFrom(ctx)
if !found { if !found {
responsewriters.InternalError(w, req, errors.New("no RequestInfo found in the context")) responsewriters.InternalError(w, req, errors.New("no RequestInfo found in the context"))
return
}
gv := schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}
responsewriters.ErrorNegotiated(apierrors.NewUnauthorized(fmt.Sprintf("Unauthorized: %s", err)), s, gv, w, req)
return return
} }
gv := schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}
responsewriters.ErrorNegotiated(apierrors.NewUnauthorized(fmt.Sprintf("Unauthorized: %s", err)), a.serializer, gv, w, req)
return
}
req = req.WithContext(request.WithUser(req.Context(), resp.User)) req = req.WithContext(request.WithUser(req.Context(), resp.User))
handler.ServeHTTP(w, req) a.next.ServeHTTP(w, req)
})
} }

View File

@@ -30,37 +30,46 @@ import (
"kubesphere.io/kubesphere/pkg/apiserver/request" "kubesphere.io/kubesphere/pkg/apiserver/request"
) )
type authzFilter struct {
next http.Handler
authorizer.Authorizer
serializer runtime.NegotiatedSerializer
}
// WithAuthorization passes all authorized requests on to handler, and returns forbidden error otherwise. // WithAuthorization passes all authorized requests on to handler, and returns forbidden error otherwise.
func WithAuthorization(handler http.Handler, authorizers authorizer.Authorizer) http.Handler { func WithAuthorization(next http.Handler, authorizers authorizer.Authorizer) http.Handler {
if authorizers == nil { if authorizers == nil {
klog.Warningf("Authorization is disabled") klog.Warningf("Authorization is disabled")
return handler return next
} }
defaultSerializer := serializer.NewCodecFactory(runtime.NewScheme()).WithoutConversion() return &authzFilter{
next: next,
Authorizer: authorizers,
serializer: serializer.NewCodecFactory(runtime.NewScheme()).WithoutConversion(),
}
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { func (a *authzFilter) ServeHTTP(w http.ResponseWriter, req *http.Request) {
ctx := req.Context() ctx := req.Context()
attributes, err := getAuthorizerAttributes(ctx)
if err != nil {
responsewriters.InternalError(w, req, err)
}
attributes, err := getAuthorizerAttributes(ctx) authorized, reason, err := a.Authorize(attributes)
if err != nil { if authorized == authorizer.DecisionAllow {
responsewriters.InternalError(w, req, err) a.next.ServeHTTP(w, req)
} return
}
authorized, reason, err := authorizers.Authorize(attributes) if err != nil {
if authorized == authorizer.DecisionAllow { responsewriters.InternalError(w, req, err)
handler.ServeHTTP(w, req) return
return }
}
if err != nil { klog.V(4).Infof("Forbidden: %#v, Reason: %q", req.RequestURI, reason)
responsewriters.InternalError(w, req, err) responsewriters.Forbidden(ctx, attributes, w, req, reason, a.serializer)
return
}
klog.V(4).Infof("Forbidden: %#v, Reason: %q", req.RequestURI, reason)
responsewriters.Forbidden(ctx, attributes, w, req, reason, defaultSerializer)
})
} }
func getAuthorizerAttributes(ctx context.Context) (authorizer.Attributes, error) { func getAuthorizerAttributes(ctx context.Context) (authorizer.Attributes, error) {

View File

@@ -1,50 +0,0 @@
/*
Copyright 2020 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"fmt"
"net/http"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/klog/v2"
"kubesphere.io/kubesphere/pkg/apiserver/dispatch"
"kubesphere.io/kubesphere/pkg/apiserver/request"
)
// Multiple cluster dispatcher forward request to desired cluster based on request cluster name
// which included in request path clusters/{cluster}
func WithMultipleClusterDispatcher(handler http.Handler, dispatch dispatch.Dispatcher) http.Handler {
if dispatch == nil {
klog.V(4).Infof("Multiple cluster dispatcher is disabled")
return handler
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
info, ok := request.RequestInfoFrom(req.Context())
if !ok {
responsewriters.InternalError(w, req, fmt.Errorf(""))
return
}
if info.Cluster == "" {
handler.ServeHTTP(w, req)
} else {
dispatch.Dispatch(w, req, handler)
}
})
}

View File

@@ -0,0 +1,95 @@
package filters
import (
"fmt"
"net/http"
"github.com/emicklei/go-restful"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"kubesphere.io/kubesphere/pkg/api"
"kubesphere.io/kubesphere/pkg/apiserver/query"
"kubesphere.io/kubesphere/pkg/apiserver/request"
"kubesphere.io/kubesphere/pkg/models/resources/v1beta1"
)
type DynamicResourceHandler struct {
v1beta1.ResourceManager
serviceErrorHandleFallback restful.ServiceErrorHandleFunction
}
func NewDynamicResourceHandle(serviceErrorHandleFallback restful.ServiceErrorHandleFunction, resourceGetter v1beta1.ResourceManager) *DynamicResourceHandler {
return &DynamicResourceHandler{
ResourceManager: resourceGetter,
serviceErrorHandleFallback: serviceErrorHandleFallback,
}
}
func (d *DynamicResourceHandler) HandleServiceError(serviceError restful.ServiceError, req *restful.Request, w *restful.Response) {
// only not found error will be handled
if serviceError.Code != http.StatusNotFound {
d.serviceErrorHandleFallback(serviceError, req, w)
return
}
// TODO support write operation and workspace scope API
if req.Request.Method != http.MethodGet {
d.serviceErrorHandleFallback(serviceError, req, w)
return
}
reqInfo, exist := request.RequestInfoFrom(req.Request.Context())
if !exist {
responsewriters.InternalError(w, req.Request, fmt.Errorf("no RequestInfo found in the context"))
return
}
if reqInfo.IsKubernetesRequest {
d.serviceErrorHandleFallback(serviceError, req, w)
return
}
gvr := schema.GroupVersionResource{
Group: reqInfo.APIGroup,
Version: reqInfo.APIVersion,
Resource: reqInfo.Resource,
}
if gvr.Group == "" ||
gvr.Version == "" ||
gvr.Resource == "" {
d.serviceErrorHandleFallback(serviceError, req, w)
return
}
served, err := d.IsServed(gvr)
if err != nil {
responsewriters.InternalError(w, req.Request, err)
return
}
if !served {
d.serviceErrorHandleFallback(serviceError, req, w)
return
}
var result interface{}
if reqInfo.Verb == "list" {
result, err = d.ListResources(req.Request.Context(), gvr, reqInfo.Namespace, query.ParseQueryParameter(req))
} else {
result, err = d.GetResource(req.Request.Context(), gvr, reqInfo.Name, reqInfo.Namespace)
}
if err != nil {
if meta.IsNoMatchError(err) {
d.serviceErrorHandleFallback(serviceError, req, w)
return
}
api.HandleError(w, req, err)
return
}
w.WriteAsJson(result)
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package filters package filters
import ( import (
"fmt"
"net/http" "net/http"
"net/url" "net/url"
@@ -26,39 +27,48 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
"kubesphere.io/kubesphere/pkg/apiserver/request" "kubesphere.io/kubesphere/pkg/apiserver/request"
"kubesphere.io/kubesphere/pkg/server/errors"
) )
type kubeAPIProxy struct {
next http.Handler
kubeAPIServer *url.URL
transport http.RoundTripper
}
// WithKubeAPIServer proxy request to kubernetes service if requests path starts with /api // WithKubeAPIServer proxy request to kubernetes service if requests path starts with /api
func WithKubeAPIServer(handler http.Handler, config *rest.Config, failed proxy.ErrorResponder) http.Handler { func WithKubeAPIServer(next http.Handler, config *rest.Config) http.Handler {
kubernetes, _ := url.Parse(config.Host) kubeAPIServer, _ := url.Parse(config.Host)
defaultTransport, err := rest.TransportFor(config) transport, err := rest.TransportFor(config)
if err != nil { if err != nil {
klog.Errorf("Unable to create transport from rest.Config: %v", err) klog.Errorf("Unable to create transport from rest.Config: %v", err)
return handler return next
}
return &kubeAPIProxy{
next: next,
kubeAPIServer: kubeAPIServer,
transport: transport,
}
}
func (k kubeAPIProxy) ServeHTTP(w http.ResponseWriter, req *http.Request) {
info, ok := request.RequestInfoFrom(req.Context())
if !ok {
responsewriters.InternalError(w, req, fmt.Errorf("no RequestInfo found in the context"))
return
} }
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { if info.IsKubernetesRequest {
info, ok := request.RequestInfoFrom(req.Context()) s := *req.URL
if !ok { s.Host = k.kubeAPIServer.Host
err := errors.New("Unable to retrieve request info from request") s.Scheme = k.kubeAPIServer.Scheme
klog.Error(err)
responsewriters.InternalError(w, req, err)
}
if info.IsKubernetesRequest { // make sure we don't override kubernetes's authorization
s := *req.URL req.Header.Del("Authorization")
s.Host = kubernetes.Host httpProxy := proxy.NewUpgradeAwareHandler(&s, k.transport, true, false, &responder{})
s.Scheme = kubernetes.Scheme httpProxy.UpgradeTransport = proxy.NewUpgradeRequestRoundTripper(k.transport, k.transport)
httpProxy.ServeHTTP(w, req)
return
}
// make sure we don't override kubernetes's authorization k.next.ServeHTTP(w, req)
req.Header.Del("Authorization")
httpProxy := proxy.NewUpgradeAwareHandler(&s, defaultTransport, true, false, failed)
httpProxy.UpgradeTransport = proxy.NewUpgradeRequestRoundTripper(defaultTransport, defaultTransport)
httpProxy.ServeHTTP(w, req)
return
}
handler.ServeHTTP(w, req)
})
} }

View File

@@ -1,21 +0,0 @@
package filters
import "net/http"
type Middleware interface {
Handle(w http.ResponseWriter, req *http.Request) bool
}
func WithMiddleware(next http.Handler, middlewares ...Middleware) http.Handler {
if middlewares == nil {
return next
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
for _, middleware := range middlewares {
if middleware.Handle(w, req) {
return
}
}
next.ServeHTTP(w, req)
})
}

View File

@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package dispatch package filters
import ( import (
"fmt" "fmt"
@@ -26,7 +26,6 @@ import (
"k8s.io/apimachinery/pkg/util/proxy" "k8s.io/apimachinery/pkg/util/proxy"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/klog/v2" "k8s.io/klog/v2"
clusterv1alpha1 "kubesphere.io/api/cluster/v1alpha1" clusterv1alpha1 "kubesphere.io/api/cluster/v1alpha1"
"kubesphere.io/kubesphere/pkg/apiserver/request" "kubesphere.io/kubesphere/pkg/apiserver/request"
@@ -35,56 +34,60 @@ import (
const proxyURLFormat = "/api/v1/namespaces/kubesphere-system/services/:ks-apiserver:/proxy%s" const proxyURLFormat = "/api/v1/namespaces/kubesphere-system/services/:ks-apiserver:/proxy%s"
// Dispatcher defines how to forward request to designated cluster based on cluster name type multiclusterDispatcher struct {
// This should only be used in host cluster when multicluster mode enabled, use in any other cases may cause next http.Handler
// unexpected behavior
type Dispatcher interface {
Dispatch(w http.ResponseWriter, req *http.Request, handler http.Handler)
}
type clusterDispatch struct {
clusterclient.ClusterClients clusterclient.ClusterClients
} }
func NewClusterDispatch(cc clusterclient.ClusterClients) Dispatcher { // WithMulticluster forward request to desired cluster based on request cluster name
return &clusterDispatch{cc} // which included in request path clusters/{cluster}
func WithMulticluster(next http.Handler, clusterClient clusterclient.ClusterClients) http.Handler {
if clusterClient == nil {
klog.V(4).Infof("Multicluster dispatcher is disabled")
return next
}
return &multiclusterDispatcher{
next: next,
ClusterClients: clusterClient,
}
} }
// Dispatch dispatch requests to designated cluster func (m *multiclusterDispatcher) ServeHTTP(w http.ResponseWriter, req *http.Request) {
func (c *clusterDispatch) Dispatch(w http.ResponseWriter, req *http.Request, handler http.Handler) { info, ok := request.RequestInfoFrom(req.Context())
info, _ := request.RequestInfoFrom(req.Context()) if !ok {
responsewriters.InternalError(w, req, fmt.Errorf("no RequestInfo found in the context"))
if len(info.Cluster) == 0 { return
klog.Warningf("Request with empty cluster, %v", req.URL) }
http.Error(w, "Bad request, empty cluster", http.StatusBadRequest) if info.Cluster == "" {
m.next.ServeHTTP(w, req)
return return
} }
cluster, err := c.Get(info.Cluster) cluster, err := m.Get(info.Cluster)
if err != nil { if err != nil {
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
http.Error(w, fmt.Sprintf("cluster %s not found", info.Cluster), http.StatusNotFound) responsewriters.WriteRawJSON(http.StatusBadRequest, errors.NewBadRequest(fmt.Sprintf("cluster %s not exists", info.Cluster)), w)
} else { } else {
http.Error(w, err.Error(), http.StatusInternalServerError) responsewriters.InternalError(w, req, err)
} }
return return
} }
// request cluster is host cluster, no need go through agent // request cluster is host cluster, no need go through agent
if c.IsHostCluster(cluster) { if m.IsHostCluster(cluster) {
req.URL.Path = strings.Replace(req.URL.Path, fmt.Sprintf("/clusters/%s", info.Cluster), "", 1) req.URL.Path = strings.Replace(req.URL.Path, fmt.Sprintf("/clusters/%s", info.Cluster), "", 1)
handler.ServeHTTP(w, req) m.next.ServeHTTP(w, req)
return return
} }
if !c.IsClusterReady(cluster) { if !m.IsClusterReady(cluster) {
http.Error(w, fmt.Sprintf("cluster %s is not ready", cluster.Name), http.StatusBadRequest) responsewriters.InternalError(w, req, fmt.Errorf("cluster %s is not ready", cluster.Name))
return return
} }
innCluster := c.GetInnerCluster(cluster.Name) innCluster := m.GetInnerCluster(cluster.Name)
if innCluster == nil { if innCluster == nil {
http.Error(w, fmt.Sprintf("cluster %s is not ready", cluster.Name), http.StatusBadRequest) responsewriters.InternalError(w, req, fmt.Errorf("cluster %s is not ready", cluster.Name))
return return
} }
@@ -135,16 +138,11 @@ func (c *clusterDispatch) Dispatch(w http.ResponseWriter, req *http.Request, han
} }
} else { } else {
// everything else goes to ks-apiserver, since our ks-apiserver has the ability to proxy kube-apiserver requests // everything else goes to ks-apiserver, since our ks-apiserver has the ability to proxy kube-apiserver requests
u.Host = innCluster.KubesphereURL.Host u.Host = innCluster.KubesphereURL.Host
u.Scheme = innCluster.KubesphereURL.Scheme u.Scheme = innCluster.KubesphereURL.Scheme
} }
httpProxy := proxy.NewUpgradeAwareHandler(&u, transport, false, false, c) httpProxy := proxy.NewUpgradeAwareHandler(&u, transport, false, false, &responder{})
httpProxy.UpgradeTransport = proxy.NewUpgradeRequestRoundTripper(transport, transport) httpProxy.UpgradeTransport = proxy.NewUpgradeRequestRoundTripper(transport, transport)
httpProxy.ServeHTTP(w, req) httpProxy.ServeHTTP(w, req)
} }
func (c *clusterDispatch) Error(w http.ResponseWriter, req *http.Request, err error) {
responsewriters.InternalError(w, req, err)
}

View File

@@ -26,7 +26,7 @@ import (
"kubesphere.io/kubesphere/pkg/apiserver/request" "kubesphere.io/kubesphere/pkg/apiserver/request"
) )
func WithRequestInfo(handler http.Handler, resolver request.RequestInfoResolver) http.Handler { func WithRequestInfo(next http.Handler, resolver request.RequestInfoResolver) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
// KubeSphere supports kube-apiserver proxy requests in multicluster mode. But kube-apiserver // KubeSphere supports kube-apiserver proxy requests in multicluster mode. But kube-apiserver
// stripped all authorization headers. Use custom header to carry token to avoid losing authentication token. // stripped all authorization headers. Use custom header to carry token to avoid losing authentication token.
@@ -66,6 +66,6 @@ func WithRequestInfo(handler http.Handler, resolver request.RequestInfoResolver)
} }
req = req.WithContext(request.WithRequestInfo(ctx, info)) req = req.WithContext(request.WithRequestInfo(ctx, info))
handler.ServeHTTP(w, req) next.ServeHTTP(w, req)
}) })
} }

View File

@@ -0,0 +1,14 @@
package filters
import (
"net/http"
"k8s.io/klog/v2"
)
type responder struct{}
func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) {
klog.Errorf("Error while proxying request: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
}

View File

@@ -1,108 +0,0 @@
package proxies
import (
"fmt"
"net/http"
"strings"
"github.com/emicklei/go-restful"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
"kubesphere.io/kubesphere/pkg/api"
"kubesphere.io/kubesphere/pkg/apiserver/filters"
"kubesphere.io/kubesphere/pkg/apiserver/query"
"kubesphere.io/kubesphere/pkg/apiserver/request"
"kubesphere.io/kubesphere/pkg/models/resources/v1beta1"
)
type unregisteredMiddleware struct {
registeredGv sets.String
resourceGetter v1beta1.ResourceGetter
}
func NewUnregisteredMiddleware(c *restful.Container, resourceGetter v1beta1.ResourceGetter) filters.Middleware {
middleware := &unregisteredMiddleware{
registeredGv: sets.NewString(),
resourceGetter: resourceGetter,
}
for _, ws := range c.RegisteredWebServices() {
rootPath := ws.RootPath()
if strings.HasPrefix(rootPath, "/kapis") {
middleware.registeredGv.Insert(rootPath)
}
}
return middleware
}
func (u *unregisteredMiddleware) Handle(w http.ResponseWriter, req *http.Request) bool {
if req.Method != http.MethodGet {
return false
}
reqInfo, exist := request.RequestInfoFrom(req.Context())
if !exist {
return false
}
if reqInfo.IsKubernetesRequest {
return false
}
gvr := schema.GroupVersionResource{
Group: reqInfo.APIGroup,
Version: reqInfo.APIVersion,
Resource: reqInfo.Resource,
}
if gvr.Group == "" ||
gvr.Version == "" ||
gvr.Resource == "" {
return false
}
rootPath := fmt.Sprintf("/kapis/%s/%s", gvr.Group, gvr.Version)
if u.registeredGv.Has(rootPath) {
return false
}
var (
listReq bool
q *query.Query
)
restfulReq := restful.NewRequest(req)
restfulResp := restful.NewResponse(w)
if reqInfo.Name == "" {
listReq = true
q = query.ParseQueryParameter(restfulReq)
}
var (
result interface{}
err error
)
if listReq {
result, err = u.resourceGetter.ListResources(gvr, reqInfo.Namespace, q)
} else {
result, err = u.resourceGetter.GetResource(gvr, reqInfo.Name, reqInfo.Namespace)
}
handleResponse(result, err, restfulResp, restfulReq)
return true
}
func handleResponse(result interface{}, err error, resp *restful.Response, req *restful.Request) {
resp.SetRequestAccepts(restful.MIME_JSON)
if err != nil {
if err == v1beta1.ErrResourceNotSupported {
api.HandleBadRequest(resp, req, err)
return
}
klog.Error(err)
api.HandleError(resp, req, err)
return
}
resp.WriteEntity(result)
}

View File

@@ -1,66 +0,0 @@
package v1beta1
import (
"context"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"kubesphere.io/kubesphere/pkg/apiserver/query"
)
type resourceCache struct {
cache cache.Cache
}
func NewResourceCache(cache cache.Cache) Interface {
return &resourceCache{cache: cache}
}
func (u *resourceCache) Get(namespace, name string, object client.Object) error {
return u.cache.Get(context.Background(), client.ObjectKey{Namespace: namespace, Name: name}, object)
}
func (u *resourceCache) List(namespace string, query *query.Query, list client.ObjectList) error {
listOpt := &client.ListOptions{
LabelSelector: query.Selector(),
Namespace: namespace,
}
err := u.cache.List(context.Background(), list, listOpt)
if err != nil {
return err
}
extractList, err := meta.ExtractList(list)
if err != nil {
return err
}
filtered := DefaultList(extractList, query, compare, filter)
if err := meta.SetList(list, filtered); err != nil {
return err
}
return nil
}
func compare(left, right runtime.Object, field query.Field) bool {
l, err := meta.Accessor(left)
if err != nil {
return false
}
r, err := meta.Accessor(right)
if err != nil {
return false
}
return DefaultObjectMetaCompare(l, r, field)
}
func filter(object runtime.Object, filter query.Filter) bool {
o, err := meta.Accessor(object)
if err != nil {
return false
}
return DefaultObjectMetaFilter(o, filter)
}

View File

@@ -1,6 +1,7 @@
package v1beta1 package v1beta1
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"sort" "sort"
@@ -18,27 +19,22 @@ import (
"kubesphere.io/kubesphere/pkg/apiserver/query" "kubesphere.io/kubesphere/pkg/apiserver/query"
) )
type Interface interface { type ResourceManager interface {
// Get retrieves a single object by its namespace and name IsServed(schema.GroupVersionResource) (bool, error)
Get(namespace, name string, object client.Object) error GetResource(ctx context.Context, gvr schema.GroupVersionResource, namespace string, name string) (client.Object, error)
ListResources(ctx context.Context, gvr schema.GroupVersionResource, namespace string, query *query.Query) (client.ObjectList, error)
// List retrieves a collection of objects matches given query Get(ctx context.Context, namespace, name string, object client.Object) error
List(namespace string, query *query.Query, object client.ObjectList) error List(ctx context.Context, namespace string, query *query.Query, object client.ObjectList) error
} }
type ResourceGetter interface { // CompareFunc return true is left greater than right
GetResource(schema.GroupVersionResource, string, string) (client.Object, error)
ListResources(schema.GroupVersionResource, string, *query.Query) (client.ObjectList, error)
}
// CompareFunc return true is left great than right
type CompareFunc func(runtime.Object, runtime.Object, query.Field) bool type CompareFunc func(runtime.Object, runtime.Object, query.Field) bool
type FilterFunc func(runtime.Object, query.Filter) bool type FilterFunc func(runtime.Object, query.Filter) bool
type TransformFunc func(runtime.Object) runtime.Object type TransformFunc func(runtime.Object) runtime.Object
func DefaultList(objects []runtime.Object, q *query.Query, compareFunc CompareFunc, filterFunc FilterFunc, transformFuncs ...TransformFunc) []runtime.Object { func DefaultList(objects []runtime.Object, q *query.Query, compareFunc CompareFunc, filterFunc FilterFunc, transformFuncs ...TransformFunc) ([]runtime.Object, *int64) {
// selected matched ones // selected matched ones
var filtered []runtime.Object var filtered []runtime.Object
if len(q.Filters) != 0 { if len(q.Filters) != 0 {
@@ -77,11 +73,12 @@ func DefaultList(objects []runtime.Object, q *query.Query, compareFunc CompareFu
} }
start, end := q.Pagination.GetValidPagination(total) start, end := q.Pagination.GetValidPagination(total)
remainingItemCount := int64(total - end)
return filtered[start:end] return filtered[start:end], &remainingItemCount
} }
// DefaultObjectMetaCompare return true is left great than right // DefaultObjectMetaCompare return true is left greater than right
func DefaultObjectMetaCompare(left, right metav1.Object, sortBy query.Field) bool { func DefaultObjectMetaCompare(left, right metav1.Object, sortBy query.Field) bool {
switch sortBy { switch sortBy {
// ?sortBy=name // ?sortBy=name

View File

@@ -1,150 +0,0 @@
package v1beta1
import (
"context"
"errors"
"strings"
"sync"
"sigs.k8s.io/controller-runtime/pkg/cache"
"kubesphere.io/kubesphere/pkg/apiserver/query"
extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
)
var ErrResourceNotSupported = errors.New("resource is not supported")
var ErrResourceNotServed = errors.New("resource is not served")
const labelResourceServed = "kubesphere.io/resource-served"
// TODO If delete the crd at the cluster when ks is running, the client.cache doesn`t return err but empty result
func New(client client.Client, cache cache.Cache) ResourceGetter {
return &resourceGetter{
client: client,
cache: NewResourceCache(cache),
serveCRD: make(map[string]bool, 0),
}
}
type resourceGetter struct {
client client.Client
cache Interface
serveCRD map[string]bool
sync.RWMutex
}
func (h *resourceGetter) GetResource(gvr schema.GroupVersionResource, name, namespace string) (client.Object, error) {
var obj client.Object
gvk, err := h.getGVK(gvr)
if err != nil {
return nil, err
}
if h.client.Scheme().Recognizes(gvk) {
gvkObject, err := h.client.Scheme().New(gvk)
if err != nil {
return nil, err
}
obj = gvkObject.(client.Object)
} else {
serviced, err := h.isServed(gvr)
if err != nil {
return nil, err
}
if !serviced {
return nil, ErrResourceNotServed
}
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(gvk)
obj = u
}
if err := h.cache.Get(namespace, name, obj); err != nil {
return nil, err
}
return obj, nil
}
func (h *resourceGetter) ListResources(gvr schema.GroupVersionResource, namespace string, query *query.Query) (client.ObjectList, error) {
var obj client.ObjectList
gvk, err := h.getGVK(gvr)
if err != nil {
return nil, err
}
gvk = convertGVKToList(gvk)
if h.client.Scheme().Recognizes(gvk) {
gvkObject, err := h.client.Scheme().New(gvk)
if err != nil {
return nil, err
}
obj = gvkObject.(client.ObjectList)
} else {
serviced, err := h.isServed(gvr)
if err != nil {
return nil, err
}
if !serviced {
return nil, ErrResourceNotServed
}
u := &unstructured.UnstructuredList{}
u.SetGroupVersionKind(gvk)
obj = u
}
if err := h.cache.List(namespace, query, obj); err != nil {
return nil, err
}
return obj, nil
}
func convertGVKToList(gvk schema.GroupVersionKind) schema.GroupVersionKind {
if strings.HasSuffix(gvk.Kind, "List") {
return gvk
}
gvk.Kind = gvk.Kind + "List"
return gvk
}
func (h *resourceGetter) getGVK(gvr schema.GroupVersionResource) (schema.GroupVersionKind, error) {
var (
gvk schema.GroupVersionKind
err error
)
gvk, err = h.client.RESTMapper().KindFor(gvr)
if err != nil {
return gvk, err
}
return gvk, nil
}
func (h *resourceGetter) isServed(gvr schema.GroupVersionResource) (bool, error) {
resourceName := gvr.Resource + "." + gvr.Group
h.RWMutex.RLock()
isServed := h.serveCRD[resourceName]
h.RWMutex.RUnlock()
if isServed {
return true, nil
}
crd := &extv1.CustomResourceDefinition{}
err := h.client.Get(context.Background(), client.ObjectKey{Name: resourceName}, crd)
if err != nil {
return false, err
}
if crd.Labels[labelResourceServed] == "true" {
h.RWMutex.Lock()
h.serveCRD[resourceName] = true
h.RWMutex.Unlock()
return true, nil
}
return false, nil
}

View File

@@ -0,0 +1,175 @@
package v1beta1
import (
"context"
"strings"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"kubesphere.io/kubesphere/pkg/apiserver/query"
extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
)
const labelResourceServed = "kubesphere.io/resource-served"
// TODO If delete the crd at the cluster when ks is running, the client.cache doesn`t return err but empty result
func New(client client.Client, cache cache.Cache) ResourceManager {
return &resourceManager{
client: client,
cache: cache,
}
}
type resourceManager struct {
client client.Client
cache cache.Cache
}
func (h *resourceManager) GetResource(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (client.Object, error) {
var obj client.Object
gvk, err := h.getGVK(gvr)
if err != nil {
return nil, err
}
if h.client.Scheme().Recognizes(gvk) {
gvkObject, err := h.client.Scheme().New(gvk)
if err != nil {
return nil, err
}
obj = gvkObject.(client.Object)
} else {
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(gvk)
obj = u
}
if err := h.Get(ctx, namespace, name, obj); err != nil {
return nil, err
}
return obj, nil
}
func (h *resourceManager) ListResources(ctx context.Context, gvr schema.GroupVersionResource, namespace string, query *query.Query) (client.ObjectList, error) {
var obj client.ObjectList
gvk, err := h.getGVK(gvr)
if err != nil {
return nil, err
}
gvk = convertGVKToList(gvk)
if h.client.Scheme().Recognizes(gvk) {
gvkObject, err := h.client.Scheme().New(gvk)
if err != nil {
return nil, err
}
obj = gvkObject.(client.ObjectList)
} else {
u := &unstructured.UnstructuredList{}
u.SetGroupVersionKind(gvk)
obj = u
}
if err := h.List(ctx, namespace, query, obj); err != nil {
return nil, err
}
return obj, nil
}
func convertGVKToList(gvk schema.GroupVersionKind) schema.GroupVersionKind {
if strings.HasSuffix(gvk.Kind, "List") {
return gvk
}
gvk.Kind = gvk.Kind + "List"
return gvk
}
func (h *resourceManager) getGVK(gvr schema.GroupVersionResource) (schema.GroupVersionKind, error) {
var (
gvk schema.GroupVersionKind
err error
)
gvk, err = h.client.RESTMapper().KindFor(gvr)
if err != nil {
return gvk, err
}
return gvk, nil
}
func (h *resourceManager) IsServed(gvr schema.GroupVersionResource) (bool, error) {
// well-known group version is already registered
if h.client.Scheme().IsVersionRegistered(gvr.GroupVersion()) {
return true, nil
}
crd := &extv1.CustomResourceDefinition{}
if err := h.cache.Get(context.Background(), client.ObjectKey{Name: gvr.GroupResource().String()}, crd); err != nil {
if errors.IsNotFound(err) {
return false, nil
}
return false, err
}
if crd.Labels[labelResourceServed] == "true" {
return true, nil
}
return false, nil
}
func (h *resourceManager) Get(ctx context.Context, namespace, name string, object client.Object) error {
return h.cache.Get(ctx, client.ObjectKey{Namespace: namespace, Name: name}, object)
}
func (h *resourceManager) List(ctx context.Context, namespace string, query *query.Query, list client.ObjectList) error {
listOpt := &client.ListOptions{
LabelSelector: query.Selector(),
Namespace: namespace,
}
err := h.cache.List(ctx, list, listOpt)
if err != nil {
return err
}
extractList, err := meta.ExtractList(list)
if err != nil {
return err
}
filtered, remainingItemCount := DefaultList(extractList, query, compare, filter)
list.SetRemainingItemCount(remainingItemCount)
if err := meta.SetList(list, filtered); err != nil {
return err
}
return nil
}
func compare(left, right runtime.Object, field query.Field) bool {
l, err := meta.Accessor(left)
if err != nil {
return false
}
r, err := meta.Accessor(right)
if err != nil {
return false
}
return DefaultObjectMetaCompare(l, r, field)
}
func filter(object runtime.Object, filter query.Filter) bool {
o, err := meta.Accessor(object)
if err != nil {
return false
}
return DefaultObjectMetaFilter(o, filter)
}