diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index ea87fc99e..0c3b8d380 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -33,7 +33,6 @@ import ( urlruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" unionauth "k8s.io/apiserver/pkg/authentication/request/union" - "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" "k8s.io/client-go/discovery" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" @@ -61,9 +60,7 @@ import ( "kubesphere.io/kubesphere/pkg/apiserver/authorization/rbac" unionauthorizer "kubesphere.io/kubesphere/pkg/apiserver/authorization/union" apiserverconfig "kubesphere.io/kubesphere/pkg/apiserver/config" - "kubesphere.io/kubesphere/pkg/apiserver/dispatch" "kubesphere.io/kubesphere/pkg/apiserver/filters" - "kubesphere.io/kubesphere/pkg/apiserver/proxies" "kubesphere.io/kubesphere/pkg/apiserver/request" "kubesphere.io/kubesphere/pkg/informers" alertingv1 "kubesphere.io/kubesphere/pkg/kapis/alerting/v1" @@ -176,22 +173,21 @@ type APIServer struct { func (s *APIServer) PrepareRun(stopCh <-chan struct{}) error { s.container = restful.NewContainer() s.container.Filter(logRequestAndResponse) + s.container.Filter(monitorRequest) s.container.Router(restful.CurlyRouter{}) s.container.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) { logStackOnRecover(panicReason, httpWriter) }) - + s.installDynamicResourceAPI() s.installKubeSphereAPIs(stopCh) s.installMetricsAPI() s.installHealthz() - s.container.Filter(monitorRequest) for _, ws := range s.container.RegisteredWebServices() { klog.V(2).Infof("%s", ws.RootPath()) } s.Server.Handler = s.container - s.buildHandlerChain(stopCh) return nil @@ -338,11 +334,9 @@ func (s *APIServer) buildHandlerChain(stopCh <-chan struct{}) { notificationv2beta2.Resource(notificationv2beta2.ResourcesPluralSilence), ) } - handler := s.Server.Handler - handler = filters.WithKubeAPIServer(handler, s.KubernetesClient.Config(), &errorResponder{}) - middleware := proxies.NewUnregisteredMiddleware(s.container, resourcev1beta1.New(s.RuntimeClient, s.RuntimeCache)) - handler = filters.WithMiddleware(handler, middleware) + handler := s.Server.Handler + handler = filters.WithKubeAPIServer(handler, s.KubernetesClient.Config()) if s.Config.AuditingOptions.Enable { handler = filters.WithAuditing(handler, @@ -367,8 +361,7 @@ func (s *APIServer) buildHandlerChain(stopCh <-chan struct{}) { handler = filters.WithAuthorization(handler, authorizers) if s.Config.MultiClusterOptions.Enable { - clusterDispatcher := dispatch.NewClusterDispatch(s.ClusterClient) - handler = filters.WithMultipleClusterDispatcher(handler, clusterDispatcher) + handler = filters.WithMulticluster(handler, s.ClusterClient) } userLister := s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister() @@ -633,6 +626,18 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error { } +func (s *APIServer) installDynamicResourceAPI() { + dynamicResourceHandler := filters.NewDynamicResourceHandle(func(err restful.ServiceError, req *restful.Request, resp *restful.Response) { + for header, values := range err.Header { + for _, value := range values { + resp.Header().Add(header, value) + } + } + resp.WriteErrorString(err.Code, err.Message) + }, resourcev1beta1.New(s.RuntimeClient, s.RuntimeCache)) + s.container.ServiceErrorHandler(dynamicResourceHandler.HandleServiceError) +} + func logStackOnRecover(panicReason interface{}, w http.ResponseWriter) { var buffer bytes.Buffer buffer.WriteString(fmt.Sprintf("recover from panic situation: - %v\r\n", panicReason)) @@ -674,10 +679,3 @@ func logRequestAndResponse(req *restful.Request, resp *restful.Response, chain * time.Since(start)/time.Millisecond, ) } - -type errorResponder struct{} - -func (e *errorResponder) Error(w http.ResponseWriter, req *http.Request, err error) { - klog.Error(err) - responsewriters.InternalError(w, req, err) -} diff --git a/pkg/apiserver/dispatch/dispatch_test.go b/pkg/apiserver/dispatch/dispatch_test.go deleted file mode 100644 index a8427e6ef..000000000 --- a/pkg/apiserver/dispatch/dispatch_test.go +++ /dev/null @@ -1,17 +0,0 @@ -/* -Copyright 2020 KubeSphere Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package dispatch diff --git a/pkg/apiserver/filters/auditing.go b/pkg/apiserver/filters/auditing.go index 3c28f3241..d23d135c0 100644 --- a/pkg/apiserver/filters/auditing.go +++ b/pkg/apiserver/filters/auditing.go @@ -25,39 +25,45 @@ import ( "kubesphere.io/kubesphere/pkg/apiserver/request" ) -func WithAuditing(handler http.Handler, a auditing.Auditing) http.Handler { - - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - - // When auditing level is LevelNone, request should not be auditing. - // Auditing level can be modified with cr kube-auditing-webhook, - // so it need to judge every time. - if !a.Enabled() { - handler.ServeHTTP(w, req) - return - } - - info, ok := request.RequestInfoFrom(req.Context()) - if !ok { - klog.Error("Unable to retrieve request info from request") - handler.ServeHTTP(w, req) - return - } - - // Auditing should igonre k8s request when k8s auditing is enabled. - if info.IsKubernetesRequest && a.K8sAuditingEnabled() { - handler.ServeHTTP(w, req) - return - } - - e := a.LogRequestObject(req, info) - if e != nil { - resp := auditing.NewResponseCapture(w) - handler.ServeHTTP(resp, req) - - go a.LogResponseObject(e, resp) - } else { - handler.ServeHTTP(w, req) - } - }) +type auditingFilter struct { + next http.Handler + auditing.Auditing +} + +func WithAuditing(next http.Handler, auditing auditing.Auditing) http.Handler { + return &auditingFilter{ + next: next, + Auditing: auditing, + } +} + +func (a *auditingFilter) ServeHTTP(w http.ResponseWriter, req *http.Request) { + // When auditing level is LevelNone, request should not be auditing. + // Auditing level can be modified with cr kube-auditing-webhook, + // so it need to judge every time. + if !a.Enabled() { + a.next.ServeHTTP(w, req) + return + } + + info, ok := request.RequestInfoFrom(req.Context()) + if !ok { + klog.Error("Unable to retrieve request info from request") + a.next.ServeHTTP(w, req) + return + } + + // Auditing should ignore k8s request when k8s auditing is enabled. + if info.IsKubernetesRequest && a.K8sAuditingEnabled() { + a.next.ServeHTTP(w, req) + return + } + + if event := a.LogRequestObject(req, info); event != nil { + resp := auditing.NewResponseCapture(w) + a.next.ServeHTTP(resp, req) + go a.LogResponseObject(event, resp) + } else { + a.next.ServeHTTP(w, req) + } } diff --git a/pkg/apiserver/filters/authentication.go b/pkg/apiserver/filters/authentication.go index 6b177b8c3..5582e5e8e 100644 --- a/pkg/apiserver/filters/authentication.go +++ b/pkg/apiserver/filters/authentication.go @@ -32,41 +32,51 @@ import ( "kubesphere.io/kubesphere/pkg/apiserver/request" ) +type authnFilter struct { + next http.Handler + authenticator.Request + serializer runtime.NegotiatedSerializer +} + // WithAuthentication installs authentication handler to handler chain. // The following part is a little bit ugly, WithAuthentication also logs user failed login attempt // if using basic auth. But only treats request with requestURI `/oauth/authorize` as login attempt -func WithAuthentication(handler http.Handler, authRequest authenticator.Request) http.Handler { - if authRequest == nil { +func WithAuthentication(next http.Handler, authenticator authenticator.Request) http.Handler { + if authenticator == nil { klog.Warningf("Authentication is disabled") - return handler + return next } - s := serializer.NewCodecFactory(runtime.NewScheme()).WithoutConversion() + return &authnFilter{ + next: next, + Request: authenticator, + serializer: serializer.NewCodecFactory(runtime.NewScheme()).WithoutConversion(), + } +} - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - resp, ok, err := authRequest.AuthenticateRequest(req) - _, _, usingBasicAuth := req.BasicAuth() +func (a *authnFilter) ServeHTTP(w http.ResponseWriter, req *http.Request) { + resp, ok, err := a.AuthenticateRequest(req) + _, _, usingBasicAuth := req.BasicAuth() - defer func() { - // if we authenticated successfully, go ahead and remove the bearer token so that no one - // is ever tempted to use it inside of the API server - if usingBasicAuth && ok { - req.Header.Del("Authorization") - } - }() + defer func() { + // if we authenticated successfully, go ahead and remove the bearer token so that no one + // is ever tempted to use it inside the API server + if usingBasicAuth && ok { + req.Header.Del("Authorization") + } + }() - if err != nil || !ok { - ctx := req.Context() - requestInfo, found := request.RequestInfoFrom(ctx) - if !found { - responsewriters.InternalError(w, req, errors.New("no RequestInfo found in the context")) - return - } - gv := schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion} - responsewriters.ErrorNegotiated(apierrors.NewUnauthorized(fmt.Sprintf("Unauthorized: %s", err)), s, gv, w, req) + if err != nil || !ok { + ctx := req.Context() + requestInfo, found := request.RequestInfoFrom(ctx) + if !found { + responsewriters.InternalError(w, req, errors.New("no RequestInfo found in the context")) return } + gv := schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion} + responsewriters.ErrorNegotiated(apierrors.NewUnauthorized(fmt.Sprintf("Unauthorized: %s", err)), a.serializer, gv, w, req) + return + } - req = req.WithContext(request.WithUser(req.Context(), resp.User)) - handler.ServeHTTP(w, req) - }) + req = req.WithContext(request.WithUser(req.Context(), resp.User)) + a.next.ServeHTTP(w, req) } diff --git a/pkg/apiserver/filters/authorization.go b/pkg/apiserver/filters/authorization.go index b80b29695..65d96fc5d 100644 --- a/pkg/apiserver/filters/authorization.go +++ b/pkg/apiserver/filters/authorization.go @@ -30,37 +30,46 @@ import ( "kubesphere.io/kubesphere/pkg/apiserver/request" ) +type authzFilter struct { + next http.Handler + authorizer.Authorizer + serializer runtime.NegotiatedSerializer +} + // WithAuthorization passes all authorized requests on to handler, and returns forbidden error otherwise. -func WithAuthorization(handler http.Handler, authorizers authorizer.Authorizer) http.Handler { +func WithAuthorization(next http.Handler, authorizers authorizer.Authorizer) http.Handler { if authorizers == nil { klog.Warningf("Authorization is disabled") - return handler + return next } - defaultSerializer := serializer.NewCodecFactory(runtime.NewScheme()).WithoutConversion() + return &authzFilter{ + next: next, + Authorizer: authorizers, + serializer: serializer.NewCodecFactory(runtime.NewScheme()).WithoutConversion(), + } +} - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - ctx := req.Context() +func (a *authzFilter) ServeHTTP(w http.ResponseWriter, req *http.Request) { + ctx := req.Context() + attributes, err := getAuthorizerAttributes(ctx) + if err != nil { + responsewriters.InternalError(w, req, err) + } - attributes, err := getAuthorizerAttributes(ctx) - if err != nil { - responsewriters.InternalError(w, req, err) - } + authorized, reason, err := a.Authorize(attributes) + if authorized == authorizer.DecisionAllow { + a.next.ServeHTTP(w, req) + return + } - authorized, reason, err := authorizers.Authorize(attributes) - if authorized == authorizer.DecisionAllow { - handler.ServeHTTP(w, req) - return - } + if err != nil { + responsewriters.InternalError(w, req, err) + return + } - if err != nil { - responsewriters.InternalError(w, req, err) - return - } - - klog.V(4).Infof("Forbidden: %#v, Reason: %q", req.RequestURI, reason) - responsewriters.Forbidden(ctx, attributes, w, req, reason, defaultSerializer) - }) + klog.V(4).Infof("Forbidden: %#v, Reason: %q", req.RequestURI, reason) + responsewriters.Forbidden(ctx, attributes, w, req, reason, a.serializer) } func getAuthorizerAttributes(ctx context.Context) (authorizer.Attributes, error) { diff --git a/pkg/apiserver/filters/dispatch.go b/pkg/apiserver/filters/dispatch.go deleted file mode 100644 index d53b566d9..000000000 --- a/pkg/apiserver/filters/dispatch.go +++ /dev/null @@ -1,50 +0,0 @@ -/* -Copyright 2020 The KubeSphere Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package filters - -import ( - "fmt" - "net/http" - - "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" - "k8s.io/klog/v2" - - "kubesphere.io/kubesphere/pkg/apiserver/dispatch" - "kubesphere.io/kubesphere/pkg/apiserver/request" -) - -// Multiple cluster dispatcher forward request to desired cluster based on request cluster name -// which included in request path clusters/{cluster} -func WithMultipleClusterDispatcher(handler http.Handler, dispatch dispatch.Dispatcher) http.Handler { - if dispatch == nil { - klog.V(4).Infof("Multiple cluster dispatcher is disabled") - return handler - } - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - info, ok := request.RequestInfoFrom(req.Context()) - if !ok { - responsewriters.InternalError(w, req, fmt.Errorf("")) - return - } - - if info.Cluster == "" { - handler.ServeHTTP(w, req) - } else { - dispatch.Dispatch(w, req, handler) - } - }) -} diff --git a/pkg/apiserver/filters/dynamicresource.go b/pkg/apiserver/filters/dynamicresource.go new file mode 100644 index 000000000..da26896b0 --- /dev/null +++ b/pkg/apiserver/filters/dynamicresource.go @@ -0,0 +1,95 @@ +package filters + +import ( + "fmt" + "net/http" + + "github.com/emicklei/go-restful" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" + + "kubesphere.io/kubesphere/pkg/api" + "kubesphere.io/kubesphere/pkg/apiserver/query" + "kubesphere.io/kubesphere/pkg/apiserver/request" + "kubesphere.io/kubesphere/pkg/models/resources/v1beta1" +) + +type DynamicResourceHandler struct { + v1beta1.ResourceManager + serviceErrorHandleFallback restful.ServiceErrorHandleFunction +} + +func NewDynamicResourceHandle(serviceErrorHandleFallback restful.ServiceErrorHandleFunction, resourceGetter v1beta1.ResourceManager) *DynamicResourceHandler { + return &DynamicResourceHandler{ + ResourceManager: resourceGetter, + serviceErrorHandleFallback: serviceErrorHandleFallback, + } +} + +func (d *DynamicResourceHandler) HandleServiceError(serviceError restful.ServiceError, req *restful.Request, w *restful.Response) { + // only not found error will be handled + if serviceError.Code != http.StatusNotFound { + d.serviceErrorHandleFallback(serviceError, req, w) + return + } + + // TODO support write operation and workspace scope API + if req.Request.Method != http.MethodGet { + d.serviceErrorHandleFallback(serviceError, req, w) + return + } + + reqInfo, exist := request.RequestInfoFrom(req.Request.Context()) + if !exist { + responsewriters.InternalError(w, req.Request, fmt.Errorf("no RequestInfo found in the context")) + return + } + + if reqInfo.IsKubernetesRequest { + d.serviceErrorHandleFallback(serviceError, req, w) + return + } + + gvr := schema.GroupVersionResource{ + Group: reqInfo.APIGroup, + Version: reqInfo.APIVersion, + Resource: reqInfo.Resource, + } + + if gvr.Group == "" || + gvr.Version == "" || + gvr.Resource == "" { + d.serviceErrorHandleFallback(serviceError, req, w) + return + } + + served, err := d.IsServed(gvr) + if err != nil { + responsewriters.InternalError(w, req.Request, err) + return + } + + if !served { + d.serviceErrorHandleFallback(serviceError, req, w) + return + } + + var result interface{} + if reqInfo.Verb == "list" { + result, err = d.ListResources(req.Request.Context(), gvr, reqInfo.Namespace, query.ParseQueryParameter(req)) + } else { + result, err = d.GetResource(req.Request.Context(), gvr, reqInfo.Name, reqInfo.Namespace) + } + + if err != nil { + if meta.IsNoMatchError(err) { + d.serviceErrorHandleFallback(serviceError, req, w) + return + } + api.HandleError(w, req, err) + return + } + + w.WriteAsJson(result) +} diff --git a/pkg/apiserver/filters/kubeapiserver.go b/pkg/apiserver/filters/kubeapiserver.go index aa3499108..fbc379fb8 100644 --- a/pkg/apiserver/filters/kubeapiserver.go +++ b/pkg/apiserver/filters/kubeapiserver.go @@ -17,6 +17,7 @@ limitations under the License. package filters import ( + "fmt" "net/http" "net/url" @@ -26,39 +27,48 @@ import ( "k8s.io/klog/v2" "kubesphere.io/kubesphere/pkg/apiserver/request" - "kubesphere.io/kubesphere/pkg/server/errors" ) +type kubeAPIProxy struct { + next http.Handler + kubeAPIServer *url.URL + transport http.RoundTripper +} + // WithKubeAPIServer proxy request to kubernetes service if requests path starts with /api -func WithKubeAPIServer(handler http.Handler, config *rest.Config, failed proxy.ErrorResponder) http.Handler { - kubernetes, _ := url.Parse(config.Host) - defaultTransport, err := rest.TransportFor(config) +func WithKubeAPIServer(next http.Handler, config *rest.Config) http.Handler { + kubeAPIServer, _ := url.Parse(config.Host) + transport, err := rest.TransportFor(config) if err != nil { klog.Errorf("Unable to create transport from rest.Config: %v", err) - return handler + return next + } + return &kubeAPIProxy{ + next: next, + kubeAPIServer: kubeAPIServer, + transport: transport, + } +} + +func (k kubeAPIProxy) ServeHTTP(w http.ResponseWriter, req *http.Request) { + info, ok := request.RequestInfoFrom(req.Context()) + if !ok { + responsewriters.InternalError(w, req, fmt.Errorf("no RequestInfo found in the context")) + return } - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - info, ok := request.RequestInfoFrom(req.Context()) - if !ok { - err := errors.New("Unable to retrieve request info from request") - klog.Error(err) - responsewriters.InternalError(w, req, err) - } + if info.IsKubernetesRequest { + s := *req.URL + s.Host = k.kubeAPIServer.Host + s.Scheme = k.kubeAPIServer.Scheme - if info.IsKubernetesRequest { - s := *req.URL - s.Host = kubernetes.Host - s.Scheme = kubernetes.Scheme + // make sure we don't override kubernetes's authorization + req.Header.Del("Authorization") + httpProxy := proxy.NewUpgradeAwareHandler(&s, k.transport, true, false, &responder{}) + httpProxy.UpgradeTransport = proxy.NewUpgradeRequestRoundTripper(k.transport, k.transport) + httpProxy.ServeHTTP(w, req) + return + } - // make sure we don't override kubernetes's authorization - req.Header.Del("Authorization") - httpProxy := proxy.NewUpgradeAwareHandler(&s, defaultTransport, true, false, failed) - httpProxy.UpgradeTransport = proxy.NewUpgradeRequestRoundTripper(defaultTransport, defaultTransport) - httpProxy.ServeHTTP(w, req) - return - } - - handler.ServeHTTP(w, req) - }) + k.next.ServeHTTP(w, req) } diff --git a/pkg/apiserver/filters/middleware.go b/pkg/apiserver/filters/middleware.go deleted file mode 100644 index fda53980a..000000000 --- a/pkg/apiserver/filters/middleware.go +++ /dev/null @@ -1,21 +0,0 @@ -package filters - -import "net/http" - -type Middleware interface { - Handle(w http.ResponseWriter, req *http.Request) bool -} - -func WithMiddleware(next http.Handler, middlewares ...Middleware) http.Handler { - if middlewares == nil { - return next - } - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - for _, middleware := range middlewares { - if middleware.Handle(w, req) { - return - } - } - next.ServeHTTP(w, req) - }) -} diff --git a/pkg/apiserver/dispatch/dispatch.go b/pkg/apiserver/filters/multicluster.go similarity index 74% rename from pkg/apiserver/dispatch/dispatch.go rename to pkg/apiserver/filters/multicluster.go index 778799a7f..9ed6af8ee 100644 --- a/pkg/apiserver/dispatch/dispatch.go +++ b/pkg/apiserver/filters/multicluster.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package dispatch +package filters import ( "fmt" @@ -26,7 +26,6 @@ import ( "k8s.io/apimachinery/pkg/util/proxy" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" "k8s.io/klog/v2" - clusterv1alpha1 "kubesphere.io/api/cluster/v1alpha1" "kubesphere.io/kubesphere/pkg/apiserver/request" @@ -35,56 +34,60 @@ import ( const proxyURLFormat = "/api/v1/namespaces/kubesphere-system/services/:ks-apiserver:/proxy%s" -// Dispatcher defines how to forward request to designated cluster based on cluster name -// This should only be used in host cluster when multicluster mode enabled, use in any other cases may cause -// unexpected behavior -type Dispatcher interface { - Dispatch(w http.ResponseWriter, req *http.Request, handler http.Handler) -} - -type clusterDispatch struct { +type multiclusterDispatcher struct { + next http.Handler clusterclient.ClusterClients } -func NewClusterDispatch(cc clusterclient.ClusterClients) Dispatcher { - return &clusterDispatch{cc} +// WithMulticluster forward request to desired cluster based on request cluster name +// which included in request path clusters/{cluster} +func WithMulticluster(next http.Handler, clusterClient clusterclient.ClusterClients) http.Handler { + if clusterClient == nil { + klog.V(4).Infof("Multicluster dispatcher is disabled") + return next + } + return &multiclusterDispatcher{ + next: next, + ClusterClients: clusterClient, + } } -// Dispatch dispatch requests to designated cluster -func (c *clusterDispatch) Dispatch(w http.ResponseWriter, req *http.Request, handler http.Handler) { - info, _ := request.RequestInfoFrom(req.Context()) - - if len(info.Cluster) == 0 { - klog.Warningf("Request with empty cluster, %v", req.URL) - http.Error(w, "Bad request, empty cluster", http.StatusBadRequest) +func (m *multiclusterDispatcher) ServeHTTP(w http.ResponseWriter, req *http.Request) { + info, ok := request.RequestInfoFrom(req.Context()) + if !ok { + responsewriters.InternalError(w, req, fmt.Errorf("no RequestInfo found in the context")) + return + } + if info.Cluster == "" { + m.next.ServeHTTP(w, req) return } - cluster, err := c.Get(info.Cluster) + cluster, err := m.Get(info.Cluster) if err != nil { if errors.IsNotFound(err) { - http.Error(w, fmt.Sprintf("cluster %s not found", info.Cluster), http.StatusNotFound) + responsewriters.WriteRawJSON(http.StatusBadRequest, errors.NewBadRequest(fmt.Sprintf("cluster %s not exists", info.Cluster)), w) } else { - http.Error(w, err.Error(), http.StatusInternalServerError) + responsewriters.InternalError(w, req, err) } return } // request cluster is host cluster, no need go through agent - if c.IsHostCluster(cluster) { + if m.IsHostCluster(cluster) { req.URL.Path = strings.Replace(req.URL.Path, fmt.Sprintf("/clusters/%s", info.Cluster), "", 1) - handler.ServeHTTP(w, req) + m.next.ServeHTTP(w, req) return } - if !c.IsClusterReady(cluster) { - http.Error(w, fmt.Sprintf("cluster %s is not ready", cluster.Name), http.StatusBadRequest) + if !m.IsClusterReady(cluster) { + responsewriters.InternalError(w, req, fmt.Errorf("cluster %s is not ready", cluster.Name)) return } - innCluster := c.GetInnerCluster(cluster.Name) + innCluster := m.GetInnerCluster(cluster.Name) if innCluster == nil { - http.Error(w, fmt.Sprintf("cluster %s is not ready", cluster.Name), http.StatusBadRequest) + responsewriters.InternalError(w, req, fmt.Errorf("cluster %s is not ready", cluster.Name)) return } @@ -135,16 +138,11 @@ func (c *clusterDispatch) Dispatch(w http.ResponseWriter, req *http.Request, han } } else { // everything else goes to ks-apiserver, since our ks-apiserver has the ability to proxy kube-apiserver requests - u.Host = innCluster.KubesphereURL.Host u.Scheme = innCluster.KubesphereURL.Scheme } - httpProxy := proxy.NewUpgradeAwareHandler(&u, transport, false, false, c) + httpProxy := proxy.NewUpgradeAwareHandler(&u, transport, false, false, &responder{}) httpProxy.UpgradeTransport = proxy.NewUpgradeRequestRoundTripper(transport, transport) httpProxy.ServeHTTP(w, req) } - -func (c *clusterDispatch) Error(w http.ResponseWriter, req *http.Request, err error) { - responsewriters.InternalError(w, req, err) -} diff --git a/pkg/apiserver/filters/requestinfo.go b/pkg/apiserver/filters/requestinfo.go index e1bbff3e6..1c8b74008 100644 --- a/pkg/apiserver/filters/requestinfo.go +++ b/pkg/apiserver/filters/requestinfo.go @@ -26,7 +26,7 @@ import ( "kubesphere.io/kubesphere/pkg/apiserver/request" ) -func WithRequestInfo(handler http.Handler, resolver request.RequestInfoResolver) http.Handler { +func WithRequestInfo(next http.Handler, resolver request.RequestInfoResolver) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { // KubeSphere supports kube-apiserver proxy requests in multicluster mode. But kube-apiserver // stripped all authorization headers. Use custom header to carry token to avoid losing authentication token. @@ -66,6 +66,6 @@ func WithRequestInfo(handler http.Handler, resolver request.RequestInfoResolver) } req = req.WithContext(request.WithRequestInfo(ctx, info)) - handler.ServeHTTP(w, req) + next.ServeHTTP(w, req) }) } diff --git a/pkg/apiserver/filters/responder.go b/pkg/apiserver/filters/responder.go new file mode 100644 index 000000000..8d14ce574 --- /dev/null +++ b/pkg/apiserver/filters/responder.go @@ -0,0 +1,14 @@ +package filters + +import ( + "net/http" + + "k8s.io/klog/v2" +) + +type responder struct{} + +func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) { + klog.Errorf("Error while proxying request: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) +} diff --git a/pkg/apiserver/proxies/unregistered.go b/pkg/apiserver/proxies/unregistered.go deleted file mode 100644 index 8c33f57fc..000000000 --- a/pkg/apiserver/proxies/unregistered.go +++ /dev/null @@ -1,108 +0,0 @@ -package proxies - -import ( - "fmt" - "net/http" - "strings" - - "github.com/emicklei/go-restful" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/klog/v2" - - "kubesphere.io/kubesphere/pkg/api" - "kubesphere.io/kubesphere/pkg/apiserver/filters" - "kubesphere.io/kubesphere/pkg/apiserver/query" - "kubesphere.io/kubesphere/pkg/apiserver/request" - "kubesphere.io/kubesphere/pkg/models/resources/v1beta1" -) - -type unregisteredMiddleware struct { - registeredGv sets.String - resourceGetter v1beta1.ResourceGetter -} - -func NewUnregisteredMiddleware(c *restful.Container, resourceGetter v1beta1.ResourceGetter) filters.Middleware { - middleware := &unregisteredMiddleware{ - registeredGv: sets.NewString(), - resourceGetter: resourceGetter, - } - - for _, ws := range c.RegisteredWebServices() { - rootPath := ws.RootPath() - if strings.HasPrefix(rootPath, "/kapis") { - middleware.registeredGv.Insert(rootPath) - } - } - - return middleware -} - -func (u *unregisteredMiddleware) Handle(w http.ResponseWriter, req *http.Request) bool { - if req.Method != http.MethodGet { - return false - } - - reqInfo, exist := request.RequestInfoFrom(req.Context()) - if !exist { - return false - } - - if reqInfo.IsKubernetesRequest { - return false - } - - gvr := schema.GroupVersionResource{ - Group: reqInfo.APIGroup, - Version: reqInfo.APIVersion, - Resource: reqInfo.Resource, - } - - if gvr.Group == "" || - gvr.Version == "" || - gvr.Resource == "" { - return false - } - - rootPath := fmt.Sprintf("/kapis/%s/%s", gvr.Group, gvr.Version) - if u.registeredGv.Has(rootPath) { - return false - } - - var ( - listReq bool - q *query.Query - ) - restfulReq := restful.NewRequest(req) - restfulResp := restful.NewResponse(w) - if reqInfo.Name == "" { - listReq = true - q = query.ParseQueryParameter(restfulReq) - } - - var ( - result interface{} - err error - ) - if listReq { - result, err = u.resourceGetter.ListResources(gvr, reqInfo.Namespace, q) - } else { - result, err = u.resourceGetter.GetResource(gvr, reqInfo.Name, reqInfo.Namespace) - } - handleResponse(result, err, restfulResp, restfulReq) - return true -} - -func handleResponse(result interface{}, err error, resp *restful.Response, req *restful.Request) { - resp.SetRequestAccepts(restful.MIME_JSON) - if err != nil { - if err == v1beta1.ErrResourceNotSupported { - api.HandleBadRequest(resp, req, err) - return - } - klog.Error(err) - api.HandleError(resp, req, err) - return - } - resp.WriteEntity(result) -} diff --git a/pkg/models/resources/v1beta1/cache.go b/pkg/models/resources/v1beta1/cache.go deleted file mode 100644 index f3a2991da..000000000 --- a/pkg/models/resources/v1beta1/cache.go +++ /dev/null @@ -1,66 +0,0 @@ -package v1beta1 - -import ( - "context" - - "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/runtime" - "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/client" - - "kubesphere.io/kubesphere/pkg/apiserver/query" -) - -type resourceCache struct { - cache cache.Cache -} - -func NewResourceCache(cache cache.Cache) Interface { - return &resourceCache{cache: cache} -} - -func (u *resourceCache) Get(namespace, name string, object client.Object) error { - return u.cache.Get(context.Background(), client.ObjectKey{Namespace: namespace, Name: name}, object) -} - -func (u *resourceCache) List(namespace string, query *query.Query, list client.ObjectList) error { - listOpt := &client.ListOptions{ - LabelSelector: query.Selector(), - Namespace: namespace, - } - err := u.cache.List(context.Background(), list, listOpt) - if err != nil { - return err - } - - extractList, err := meta.ExtractList(list) - if err != nil { - return err - } - - filtered := DefaultList(extractList, query, compare, filter) - if err := meta.SetList(list, filtered); err != nil { - return err - } - return nil -} - -func compare(left, right runtime.Object, field query.Field) bool { - l, err := meta.Accessor(left) - if err != nil { - return false - } - r, err := meta.Accessor(right) - if err != nil { - return false - } - return DefaultObjectMetaCompare(l, r, field) -} - -func filter(object runtime.Object, filter query.Filter) bool { - o, err := meta.Accessor(object) - if err != nil { - return false - } - return DefaultObjectMetaFilter(o, filter) -} diff --git a/pkg/models/resources/v1beta1/interface.go b/pkg/models/resources/v1beta1/interface.go index 603679bb5..11ed88194 100644 --- a/pkg/models/resources/v1beta1/interface.go +++ b/pkg/models/resources/v1beta1/interface.go @@ -1,6 +1,7 @@ package v1beta1 import ( + "context" "encoding/json" "fmt" "sort" @@ -18,27 +19,22 @@ import ( "kubesphere.io/kubesphere/pkg/apiserver/query" ) -type Interface interface { - // Get retrieves a single object by its namespace and name - Get(namespace, name string, object client.Object) error - - // List retrieves a collection of objects matches given query - List(namespace string, query *query.Query, object client.ObjectList) error +type ResourceManager interface { + IsServed(schema.GroupVersionResource) (bool, error) + GetResource(ctx context.Context, gvr schema.GroupVersionResource, namespace string, name string) (client.Object, error) + ListResources(ctx context.Context, gvr schema.GroupVersionResource, namespace string, query *query.Query) (client.ObjectList, error) + Get(ctx context.Context, namespace, name string, object client.Object) error + List(ctx context.Context, namespace string, query *query.Query, object client.ObjectList) error } -type ResourceGetter interface { - GetResource(schema.GroupVersionResource, string, string) (client.Object, error) - ListResources(schema.GroupVersionResource, string, *query.Query) (client.ObjectList, error) -} - -// CompareFunc return true is left great than right +// CompareFunc return true is left greater than right type CompareFunc func(runtime.Object, runtime.Object, query.Field) bool type FilterFunc func(runtime.Object, query.Filter) bool type TransformFunc func(runtime.Object) runtime.Object -func DefaultList(objects []runtime.Object, q *query.Query, compareFunc CompareFunc, filterFunc FilterFunc, transformFuncs ...TransformFunc) []runtime.Object { +func DefaultList(objects []runtime.Object, q *query.Query, compareFunc CompareFunc, filterFunc FilterFunc, transformFuncs ...TransformFunc) ([]runtime.Object, *int64) { // selected matched ones var filtered []runtime.Object if len(q.Filters) != 0 { @@ -77,11 +73,12 @@ func DefaultList(objects []runtime.Object, q *query.Query, compareFunc CompareFu } start, end := q.Pagination.GetValidPagination(total) + remainingItemCount := int64(total - end) - return filtered[start:end] + return filtered[start:end], &remainingItemCount } -// DefaultObjectMetaCompare return true is left great than right +// DefaultObjectMetaCompare return true is left greater than right func DefaultObjectMetaCompare(left, right metav1.Object, sortBy query.Field) bool { switch sortBy { // ?sortBy=name diff --git a/pkg/models/resources/v1beta1/resourcelgetter.go b/pkg/models/resources/v1beta1/resourcelgetter.go deleted file mode 100644 index 0c84b1bab..000000000 --- a/pkg/models/resources/v1beta1/resourcelgetter.go +++ /dev/null @@ -1,150 +0,0 @@ -package v1beta1 - -import ( - "context" - "errors" - "strings" - "sync" - - "sigs.k8s.io/controller-runtime/pkg/cache" - - "kubesphere.io/kubesphere/pkg/apiserver/query" - - extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime/schema" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -var ErrResourceNotSupported = errors.New("resource is not supported") -var ErrResourceNotServed = errors.New("resource is not served") - -const labelResourceServed = "kubesphere.io/resource-served" - -// TODO If delete the crd at the cluster when ks is running, the client.cache doesn`t return err but empty result -func New(client client.Client, cache cache.Cache) ResourceGetter { - return &resourceGetter{ - client: client, - cache: NewResourceCache(cache), - serveCRD: make(map[string]bool, 0), - } -} - -type resourceGetter struct { - client client.Client - cache Interface - serveCRD map[string]bool - sync.RWMutex -} - -func (h *resourceGetter) GetResource(gvr schema.GroupVersionResource, name, namespace string) (client.Object, error) { - var obj client.Object - gvk, err := h.getGVK(gvr) - if err != nil { - return nil, err - } - - if h.client.Scheme().Recognizes(gvk) { - gvkObject, err := h.client.Scheme().New(gvk) - if err != nil { - return nil, err - } - obj = gvkObject.(client.Object) - } else { - serviced, err := h.isServed(gvr) - if err != nil { - return nil, err - } - if !serviced { - return nil, ErrResourceNotServed - } - - u := &unstructured.Unstructured{} - u.SetGroupVersionKind(gvk) - obj = u - } - - if err := h.cache.Get(namespace, name, obj); err != nil { - return nil, err - } - return obj, nil -} - -func (h *resourceGetter) ListResources(gvr schema.GroupVersionResource, namespace string, query *query.Query) (client.ObjectList, error) { - var obj client.ObjectList - - gvk, err := h.getGVK(gvr) - if err != nil { - return nil, err - } - - gvk = convertGVKToList(gvk) - - if h.client.Scheme().Recognizes(gvk) { - gvkObject, err := h.client.Scheme().New(gvk) - if err != nil { - return nil, err - } - obj = gvkObject.(client.ObjectList) - } else { - serviced, err := h.isServed(gvr) - if err != nil { - return nil, err - } - if !serviced { - return nil, ErrResourceNotServed - } - u := &unstructured.UnstructuredList{} - u.SetGroupVersionKind(gvk) - obj = u - } - - if err := h.cache.List(namespace, query, obj); err != nil { - return nil, err - } - return obj, nil -} - -func convertGVKToList(gvk schema.GroupVersionKind) schema.GroupVersionKind { - if strings.HasSuffix(gvk.Kind, "List") { - return gvk - } - gvk.Kind = gvk.Kind + "List" - return gvk -} - -func (h *resourceGetter) getGVK(gvr schema.GroupVersionResource) (schema.GroupVersionKind, error) { - var ( - gvk schema.GroupVersionKind - err error - ) - gvk, err = h.client.RESTMapper().KindFor(gvr) - if err != nil { - return gvk, err - } - - return gvk, nil -} - -func (h *resourceGetter) isServed(gvr schema.GroupVersionResource) (bool, error) { - resourceName := gvr.Resource + "." + gvr.Group - h.RWMutex.RLock() - isServed := h.serveCRD[resourceName] - h.RWMutex.RUnlock() - if isServed { - return true, nil - } - - crd := &extv1.CustomResourceDefinition{} - err := h.client.Get(context.Background(), client.ObjectKey{Name: resourceName}, crd) - if err != nil { - return false, err - } - if crd.Labels[labelResourceServed] == "true" { - h.RWMutex.Lock() - h.serveCRD[resourceName] = true - h.RWMutex.Unlock() - return true, nil - } - return false, nil -} diff --git a/pkg/models/resources/v1beta1/resourcemanager.go b/pkg/models/resources/v1beta1/resourcemanager.go new file mode 100644 index 000000000..6e16b8181 --- /dev/null +++ b/pkg/models/resources/v1beta1/resourcemanager.go @@ -0,0 +1,175 @@ +package v1beta1 + +import ( + "context" + "strings" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + + "kubesphere.io/kubesphere/pkg/apiserver/query" + + extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const labelResourceServed = "kubesphere.io/resource-served" + +// TODO If delete the crd at the cluster when ks is running, the client.cache doesn`t return err but empty result +func New(client client.Client, cache cache.Cache) ResourceManager { + return &resourceManager{ + client: client, + cache: cache, + } +} + +type resourceManager struct { + client client.Client + cache cache.Cache +} + +func (h *resourceManager) GetResource(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (client.Object, error) { + var obj client.Object + gvk, err := h.getGVK(gvr) + if err != nil { + return nil, err + } + + if h.client.Scheme().Recognizes(gvk) { + gvkObject, err := h.client.Scheme().New(gvk) + if err != nil { + return nil, err + } + obj = gvkObject.(client.Object) + } else { + u := &unstructured.Unstructured{} + u.SetGroupVersionKind(gvk) + obj = u + } + + if err := h.Get(ctx, namespace, name, obj); err != nil { + return nil, err + } + return obj, nil +} + +func (h *resourceManager) ListResources(ctx context.Context, gvr schema.GroupVersionResource, namespace string, query *query.Query) (client.ObjectList, error) { + var obj client.ObjectList + + gvk, err := h.getGVK(gvr) + if err != nil { + return nil, err + } + + gvk = convertGVKToList(gvk) + + if h.client.Scheme().Recognizes(gvk) { + gvkObject, err := h.client.Scheme().New(gvk) + if err != nil { + return nil, err + } + obj = gvkObject.(client.ObjectList) + } else { + u := &unstructured.UnstructuredList{} + u.SetGroupVersionKind(gvk) + obj = u + } + + if err := h.List(ctx, namespace, query, obj); err != nil { + return nil, err + } + return obj, nil +} + +func convertGVKToList(gvk schema.GroupVersionKind) schema.GroupVersionKind { + if strings.HasSuffix(gvk.Kind, "List") { + return gvk + } + gvk.Kind = gvk.Kind + "List" + return gvk +} + +func (h *resourceManager) getGVK(gvr schema.GroupVersionResource) (schema.GroupVersionKind, error) { + var ( + gvk schema.GroupVersionKind + err error + ) + gvk, err = h.client.RESTMapper().KindFor(gvr) + if err != nil { + return gvk, err + } + return gvk, nil +} + +func (h *resourceManager) IsServed(gvr schema.GroupVersionResource) (bool, error) { + // well-known group version is already registered + if h.client.Scheme().IsVersionRegistered(gvr.GroupVersion()) { + return true, nil + } + + crd := &extv1.CustomResourceDefinition{} + if err := h.cache.Get(context.Background(), client.ObjectKey{Name: gvr.GroupResource().String()}, crd); err != nil { + if errors.IsNotFound(err) { + return false, nil + } + return false, err + } + + if crd.Labels[labelResourceServed] == "true" { + return true, nil + } + + return false, nil +} + +func (h *resourceManager) Get(ctx context.Context, namespace, name string, object client.Object) error { + return h.cache.Get(ctx, client.ObjectKey{Namespace: namespace, Name: name}, object) +} + +func (h *resourceManager) List(ctx context.Context, namespace string, query *query.Query, list client.ObjectList) error { + listOpt := &client.ListOptions{ + LabelSelector: query.Selector(), + Namespace: namespace, + } + + err := h.cache.List(ctx, list, listOpt) + if err != nil { + return err + } + + extractList, err := meta.ExtractList(list) + if err != nil { + return err + } + + filtered, remainingItemCount := DefaultList(extractList, query, compare, filter) + list.SetRemainingItemCount(remainingItemCount) + if err := meta.SetList(list, filtered); err != nil { + return err + } + return nil +} + +func compare(left, right runtime.Object, field query.Field) bool { + l, err := meta.Accessor(left) + if err != nil { + return false + } + r, err := meta.Accessor(right) + if err != nil { + return false + } + return DefaultObjectMetaCompare(l, r, field) +} + +func filter(object runtime.Object, filter query.Filter) bool { + o, err := meta.Accessor(object) + if err != nil { + return false + } + return DefaultObjectMetaFilter(o, filter) +}