From 59900f6e1c4bbb9475e1abdd8b97f56a9a17fc15 Mon Sep 17 00:00:00 2001 From: zryfish Date: Mon, 1 Jun 2020 19:54:05 +0800 Subject: [PATCH] use kube-apiserver proxy if no kubesphere api endpoint provided (#2144) --- pkg/api/utils.go | 22 ++- pkg/apiserver/apiserver.go | 14 +- pkg/apiserver/dispatch/dispatch.go | 135 +++++++++++++++++-- pkg/controller/cluster/cluster_controller.go | 5 + pkg/kapis/cluster/v1alpha1/handler.go | 7 + 5 files changed, 163 insertions(+), 20 deletions(-) diff --git a/pkg/api/utils.go b/pkg/api/utils.go index 781dcdd64..0028cdef8 100644 --- a/pkg/api/utils.go +++ b/pkg/api/utils.go @@ -18,26 +18,38 @@ package api import ( "github.com/emicklei/go-restful" + "k8s.io/klog" "net/http" + "runtime" ) func HandleInternalError(response *restful.Response, req *restful.Request, err error) { - response.WriteError(http.StatusInternalServerError, err) + _, fn, line, _ := runtime.Caller(1) + klog.Errorf("%s:%d %v", fn, line, err) + _ = response.WriteError(http.StatusInternalServerError, err) } // HandleBadRequest writes http.StatusBadRequest and log error func HandleBadRequest(response *restful.Response, req *restful.Request, err error) { - response.WriteError(http.StatusBadRequest, err) + _, fn, line, _ := runtime.Caller(1) + klog.Errorf("%s:%d %v", fn, line, err) + _ = response.WriteError(http.StatusBadRequest, err) } func HandleNotFound(response *restful.Response, req *restful.Request, err error) { - response.WriteError(http.StatusNotFound, err) + _, fn, line, _ := runtime.Caller(1) + klog.Errorf("%s:%d %v", fn, line, err) + _ = response.WriteError(http.StatusNotFound, err) } func HandleForbidden(response *restful.Response, req *restful.Request, err error) { - response.WriteError(http.StatusForbidden, err) + _, fn, line, _ := runtime.Caller(1) + klog.Errorf("%s:%d %v", fn, line, err) + _ = response.WriteError(http.StatusForbidden, err) } func HandleConflict(response *restful.Response, req *restful.Request, err error) { - response.WriteError(http.StatusConflict, err) + _, fn, line, _ := runtime.Caller(1) + klog.Errorf("%s:%d %v", fn, line, err) + _ = response.WriteError(http.StatusConflict, err) } diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 2498f65a9..bd255d7b1 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -244,7 +244,8 @@ func (s *APIServer) buildHandlerChain() { handler = filters.WithKubeAPIServer(handler, s.KubernetesClient.Config(), &errorResponder{}) if s.Config.MultiClusterOptions.Enable { - clusterDispatcher := dispatch.NewClusterDispatch(s.InformerFactory.KubeSphereSharedInformerFactory().Cluster().V1alpha1().Clusters().Lister()) + clusterDispatcher := dispatch.NewClusterDispatch(s.InformerFactory.KubeSphereSharedInformerFactory().Cluster().V1alpha1().Clusters(), + s.InformerFactory.KubeSphereSharedInformerFactory().Cluster().V1alpha1().Clusters().Lister()) handler = filters.WithMultipleClusterDispatcher(handler, clusterDispatcher) } @@ -483,10 +484,17 @@ func logStackOnRecover(panicReason interface{}, w http.ResponseWriter) { func logRequestAndResponse(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) { start := time.Now() chain.ProcessFilter(req, resp) - klog.V(4).Infof("%s - \"%s %s %s\" %d %d %dms", + + // Always log error response + logWithVerbose := klog.V(4) + if resp.StatusCode() > http.StatusBadRequest { + logWithVerbose = klog.V(0) + } + + logWithVerbose.Infof("%s - \"%s %s %s\" %d %d %dms", getRequestIP(req), req.Request.Method, - req.Request.RequestURI, + req.Request.URL, req.Request.Proto, resp.StatusCode(), resp.ContentLength(), diff --git a/pkg/apiserver/dispatch/dispatch.go b/pkg/apiserver/dispatch/dispatch.go index 05d1837e6..44e17162b 100644 --- a/pkg/apiserver/dispatch/dispatch.go +++ b/pkg/apiserver/dispatch/dispatch.go @@ -22,32 +22,73 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/proxy" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/clientcmd" "k8s.io/klog" clusterv1alpha1 "kubesphere.io/kubesphere/pkg/apis/cluster/v1alpha1" "kubesphere.io/kubesphere/pkg/apiserver/request" - "kubesphere.io/kubesphere/pkg/client/listers/cluster/v1alpha1" + clusterinformer "kubesphere.io/kubesphere/pkg/client/informers/externalversions/cluster/v1alpha1" + clusterlister "kubesphere.io/kubesphere/pkg/client/listers/cluster/v1alpha1" "net/http" "net/url" "strings" + "sync" ) +const proxyURLFormat = "/api/v1/namespaces/kubesphere-system/services/:ks-apiserver:/proxy%s" + // Dispatcher defines how to forward request to designated cluster based on cluster name type Dispatcher interface { Dispatch(w http.ResponseWriter, req *http.Request, handler http.Handler) } +type innerCluster struct { + kubernetesURL *url.URL + kubesphereURL *url.URL + transport http.RoundTripper +} + type clusterDispatch struct { - clusterLister v1alpha1.ClusterLister + clusterLister clusterlister.ClusterLister + + // dispatcher will build a in memory cluster cache to speed things up + innerClusters map[string]*innerCluster + + clusterInformerSynced cache.InformerSynced + + mutex sync.RWMutex } -func NewClusterDispatch(clusterLister v1alpha1.ClusterLister) Dispatcher { - return &clusterDispatch{ +func NewClusterDispatch(clusterInformer clusterinformer.ClusterInformer, clusterLister clusterlister.ClusterLister) Dispatcher { + clusterDispatcher := &clusterDispatch{ clusterLister: clusterLister, + innerClusters: make(map[string]*innerCluster), + mutex: sync.RWMutex{}, } + + clusterDispatcher.clusterInformerSynced = clusterInformer.Informer().HasSynced + clusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: clusterDispatcher.updateInnerClusters, + UpdateFunc: func(oldObj, newObj interface{}) { + clusterDispatcher.updateInnerClusters(newObj) + }, + DeleteFunc: func(obj interface{}) { + cluster := obj.(*clusterv1alpha1.Cluster) + clusterDispatcher.mutex.Lock() + if _, ok := clusterDispatcher.innerClusters[cluster.Name]; ok { + delete(clusterDispatcher.innerClusters, cluster.Name) + } + clusterDispatcher.mutex.Unlock() + + }, + }) + + return clusterDispatcher } +// Dispatch dispatch requests to designated cluster func (c *clusterDispatch) Dispatch(w http.ResponseWriter, req *http.Request, handler http.Handler) { - info, _ := request.RequestInfoFrom(req.Context()) if len(info.Cluster) == 0 { @@ -74,21 +115,40 @@ func (c *clusterDispatch) Dispatch(w http.ResponseWriter, req *http.Request, han } if !isClusterReady(cluster) { - http.Error(w, fmt.Sprintf("cluster agent is not ready"), http.StatusInternalServerError) + http.Error(w, fmt.Sprintf("cluster is not ready"), http.StatusInternalServerError) return } - endpoint, err := url.Parse(cluster.Spec.Connection.KubeSphereAPIEndpoint) - if err != nil { - klog.Error(err) - http.Error(w, err.Error(), http.StatusInternalServerError) + innCluster := c.getInnerCluster(cluster.Name) + if innCluster == nil { + http.Error(w, fmt.Sprintf("cluster not ready"), http.StatusInternalServerError) + return } + transport := http.DefaultTransport + u := *req.URL - u.Host = endpoint.Host u.Path = strings.Replace(u.Path, fmt.Sprintf("/clusters/%s", info.Cluster), "", 1) - httpProxy := proxy.NewUpgradeAwareHandler(&u, http.DefaultTransport, true, false, c) + if info.IsKubernetesRequest { + u.Host = innCluster.kubernetesURL.Host + u.Scheme = innCluster.kubernetesURL.Scheme + } else { + u.Host = innCluster.kubesphereURL.Host + + // if cluster connection is direct and kubesphere apiserver endpoint is empty + // we use kube-apiserver proxy + if cluster.Spec.Connection.Type == clusterv1alpha1.ConnectionTypeDirect && + len(cluster.Spec.Connection.KubeSphereAPIEndpoint) == 0 { + + u.Scheme = innCluster.kubernetesURL.Scheme + u.Host = innCluster.kubernetesURL.Host + u.Path = fmt.Sprintf(proxyURLFormat, u.Path) + transport = innCluster.transport + } + } + + httpProxy := proxy.NewUpgradeAwareHandler(&u, transport, false, false, c) httpProxy.ServeHTTP(w, req) } @@ -96,6 +156,57 @@ func (c *clusterDispatch) Error(w http.ResponseWriter, req *http.Request, err er responsewriters.InternalError(w, req, err) } +func (c *clusterDispatch) getInnerCluster(name string) *innerCluster { + c.mutex.RLock() + defer c.mutex.RUnlock() + if cluster, ok := c.innerClusters[name]; ok { + return cluster + } + return nil +} + +func (c *clusterDispatch) updateInnerClusters(obj interface{}) { + cluster := obj.(*clusterv1alpha1.Cluster) + + kubernetesEndpoint, err := url.Parse(cluster.Spec.Connection.KubernetesAPIEndpoint) + if err != nil { + klog.Errorf("Parse kubernetes apiserver endpoint %s failed, %v", cluster.Spec.Connection.KubernetesAPIEndpoint, err) + return + } + + kubesphereEndpoint, err := url.Parse(cluster.Spec.Connection.KubeSphereAPIEndpoint) + if err != nil { + klog.Errorf("Parse kubesphere apiserver endpoint %s failed, %v", cluster.Spec.Connection.KubeSphereAPIEndpoint, err) + return + } + + // prepare for + clientConfig, err := clientcmd.NewClientConfigFromBytes(cluster.Spec.Connection.KubeConfig) + if err != nil { + klog.Errorf("Unable to create client config from kubeconfig bytes, %#v", err) + return + } + + clusterConfig, err := clientConfig.ClientConfig() + if err != nil { + klog.Errorf("Failed to get client config, %#v", err) + return + } + + transport, err := rest.TransportFor(clusterConfig) + if err != nil { + klog.Errorf("Create transport failed, %v", err) + } + + c.mutex.Lock() + c.innerClusters[cluster.Name] = &innerCluster{ + kubernetesURL: kubernetesEndpoint, + kubesphereURL: kubesphereEndpoint, + transport: transport, + } + c.mutex.Unlock() +} + func isClusterReady(cluster *clusterv1alpha1.Cluster) bool { for _, condition := range cluster.Status.Conditions { if condition.Type == clusterv1alpha1.ClusterReady && condition.Status == corev1.ConditionTrue { diff --git a/pkg/controller/cluster/cluster_controller.go b/pkg/controller/cluster/cluster_controller.go index 18a088094..27c7781f3 100644 --- a/pkg/controller/cluster/cluster_controller.go +++ b/pkg/controller/cluster/cluster_controller.go @@ -388,6 +388,11 @@ func (c *ClusterController) syncCluster(key string) error { // is safe. if isConditionTrue(cluster, clusterv1alpha1.ClusterAgentAvailable) || cluster.Spec.Connection.Type == clusterv1alpha1.ConnectionTypeDirect { + + if len(cluster.Spec.Connection.KubernetesAPIEndpoint) == 0 { + cluster.Spec.Connection.KubernetesAPIEndpoint = clusterConfig.Host + } + version, err := clientSet.Discovery().ServerVersion() if err != nil { klog.Errorf("Failed to get kubernetes version, %#v", err) diff --git a/pkg/kapis/cluster/v1alpha1/handler.go b/pkg/kapis/cluster/v1alpha1/handler.go index d34a7a575..41a945a4b 100644 --- a/pkg/kapis/cluster/v1alpha1/handler.go +++ b/pkg/kapis/cluster/v1alpha1/handler.go @@ -236,6 +236,13 @@ func (h *handler) ValidateCluster(request *restful.Request, response *restful.Re return } + // kubesphere apiserver endpoint not provided, that's allowed + // Cluster dispatcher will use kube-apiserver proxy instead + if len(cluster.Spec.Connection.KubeSphereAPIEndpoint) == 0 { + response.WriteHeader(http.StatusOK) + return + } + _, err = validateKubeSphereAPIServer(cluster.Spec.Connection.KubeSphereAPIEndpoint) if err != nil { api.HandleBadRequest(response, request, fmt.Errorf("unable validate kubesphere endpoint, %v", err))