openpitrix crd
Signed-off-by: LiHui <andrewli@yunify.com> delete helm repo, release and app Signed-off-by: LiHui <andrewli@yunify.com> Fix Dockerfile Signed-off-by: LiHui <andrewli@yunify.com> add unit test for category controller Signed-off-by: LiHui <andrewli@yunify.com> resource api Signed-off-by: LiHui <andrewli@yunify.com> miscellaneous Signed-off-by: LiHui <andrewli@yunify.com> resource api Signed-off-by: LiHui <andrewli@yunify.com> add s3 repo indx Signed-off-by: LiHui <andrewli@yunify.com> attachment api Signed-off-by: LiHui <andrewli@yunify.com> repo controller test Signed-off-by: LiHui <andrewli@yunify.com> application controller test Signed-off-by: LiHui <andrewli@yunify.com> release metric Signed-off-by: LiHui <andrewli@yunify.com> helm release controller test Signed-off-by: LiHui <andrewli@yunify.com> move constants to /pkg/apis/application Signed-off-by: LiHui <andrewli@yunify.com> remove unused code Signed-off-by: LiHui <andrewli@yunify.com> add license header Signed-off-by: LiHui <andrewli@yunify.com> Fix bugs Signed-off-by: LiHui <andrewli@yunify.com> cluster cluent Signed-off-by: LiHui <andrewli@yunify.com> format code Signed-off-by: LiHui <andrewli@yunify.com> move workspace,cluster from spec to labels Signed-off-by: LiHui <andrewli@yunify.com> add license header Signed-off-by: LiHui <andrewli@yunify.com> openpitrix test Signed-off-by: LiHui <andrewli@yunify.com> add worksapce labels for app in appstore Signed-off-by: LiHui <andrewli@yunify.com>
This commit is contained in:
@@ -18,25 +18,18 @@ package dispatch
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/util/httpstream"
|
||||
"k8s.io/apimachinery/pkg/util/proxy"
|
||||
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/klog"
|
||||
"kubesphere.io/kubesphere/pkg/utils/clusterclient"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
clusterv1alpha1 "kubesphere.io/kubesphere/pkg/apis/cluster/v1alpha1"
|
||||
"kubesphere.io/kubesphere/pkg/apiserver/request"
|
||||
clusterinformer "kubesphere.io/kubesphere/pkg/client/informers/externalversions/cluster/v1alpha1"
|
||||
clusterlister "kubesphere.io/kubesphere/pkg/client/listers/cluster/v1alpha1"
|
||||
)
|
||||
|
||||
const proxyURLFormat = "/api/v1/namespaces/kubesphere-system/services/:ks-apiserver:/proxy%s"
|
||||
@@ -48,48 +41,12 @@ type Dispatcher interface {
|
||||
Dispatch(w http.ResponseWriter, req *http.Request, handler http.Handler)
|
||||
}
|
||||
|
||||
type innerCluster struct {
|
||||
kubernetesURL *url.URL
|
||||
kubesphereURL *url.URL
|
||||
transport http.RoundTripper
|
||||
}
|
||||
|
||||
type clusterDispatch struct {
|
||||
clusterLister clusterlister.ClusterLister
|
||||
|
||||
// dispatcher will build a in memory cluster cache to speed things up
|
||||
innerClusters map[string]*innerCluster
|
||||
|
||||
clusterInformerSynced cache.InformerSynced
|
||||
|
||||
mutex sync.RWMutex
|
||||
clusterclient.ClusterClients
|
||||
}
|
||||
|
||||
func NewClusterDispatch(clusterInformer clusterinformer.ClusterInformer, clusterLister clusterlister.ClusterLister) Dispatcher {
|
||||
clusterDispatcher := &clusterDispatch{
|
||||
clusterLister: clusterLister,
|
||||
innerClusters: make(map[string]*innerCluster),
|
||||
mutex: sync.RWMutex{},
|
||||
}
|
||||
|
||||
clusterDispatcher.clusterInformerSynced = clusterInformer.Informer().HasSynced
|
||||
clusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: clusterDispatcher.updateInnerClusters,
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
clusterDispatcher.updateInnerClusters(newObj)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
cluster := obj.(*clusterv1alpha1.Cluster)
|
||||
clusterDispatcher.mutex.Lock()
|
||||
if _, ok := clusterDispatcher.innerClusters[cluster.Name]; ok {
|
||||
delete(clusterDispatcher.innerClusters, cluster.Name)
|
||||
}
|
||||
clusterDispatcher.mutex.Unlock()
|
||||
|
||||
},
|
||||
})
|
||||
|
||||
return clusterDispatcher
|
||||
func NewClusterDispatch(clusterInformer clusterinformer.ClusterInformer) Dispatcher {
|
||||
return &clusterDispatch{clusterclient.NewClusterClient(clusterInformer)}
|
||||
}
|
||||
|
||||
// Dispatch dispatch requests to designated cluster
|
||||
@@ -102,7 +59,7 @@ func (c *clusterDispatch) Dispatch(w http.ResponseWriter, req *http.Request, han
|
||||
return
|
||||
}
|
||||
|
||||
cluster, err := c.clusterLister.Get(info.Cluster)
|
||||
cluster, err := c.Get(info.Cluster)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
http.Error(w, fmt.Sprintf("cluster %s not found", info.Cluster), http.StatusNotFound)
|
||||
@@ -113,18 +70,18 @@ func (c *clusterDispatch) Dispatch(w http.ResponseWriter, req *http.Request, han
|
||||
}
|
||||
|
||||
// request cluster is host cluster, no need go through agent
|
||||
if isClusterHostCluster(cluster) {
|
||||
if c.IsHostCluster(cluster) {
|
||||
req.URL.Path = strings.Replace(req.URL.Path, fmt.Sprintf("/clusters/%s", info.Cluster), "", 1)
|
||||
handler.ServeHTTP(w, req)
|
||||
return
|
||||
}
|
||||
|
||||
if !isClusterReady(cluster) {
|
||||
if !c.IsClusterReady(cluster) {
|
||||
http.Error(w, fmt.Sprintf("cluster %s is not ready", cluster.Name), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
innCluster := c.getInnerCluster(cluster.Name)
|
||||
innCluster := c.GetInnerCluster(cluster.Name)
|
||||
if innCluster == nil {
|
||||
http.Error(w, fmt.Sprintf("cluster %s is not ready", cluster.Name), http.StatusInternalServerError)
|
||||
return
|
||||
@@ -141,10 +98,10 @@ func (c *clusterDispatch) Dispatch(w http.ResponseWriter, req *http.Request, han
|
||||
if cluster.Spec.Connection.Type == clusterv1alpha1.ConnectionTypeDirect &&
|
||||
len(cluster.Spec.Connection.KubeSphereAPIEndpoint) == 0 {
|
||||
|
||||
u.Scheme = innCluster.kubernetesURL.Scheme
|
||||
u.Host = innCluster.kubernetesURL.Host
|
||||
u.Scheme = innCluster.KubernetesURL.Scheme
|
||||
u.Host = innCluster.KubernetesURL.Host
|
||||
u.Path = fmt.Sprintf(proxyURLFormat, u.Path)
|
||||
transport = innCluster.transport
|
||||
transport = innCluster.Transport
|
||||
|
||||
// The reason we need this is kube-apiserver doesn't behave like a standard proxy, it will strip
|
||||
// authorization header of proxy requests. Use custom header to avoid stripping by kube-apiserver.
|
||||
@@ -178,8 +135,8 @@ func (c *clusterDispatch) Dispatch(w http.ResponseWriter, req *http.Request, han
|
||||
} else {
|
||||
// everything else goes to ks-apiserver, since our ks-apiserver has the ability to proxy kube-apiserver requests
|
||||
|
||||
u.Host = innCluster.kubesphereURL.Host
|
||||
u.Scheme = innCluster.kubesphereURL.Scheme
|
||||
u.Host = innCluster.KubesphereURL.Host
|
||||
u.Scheme = innCluster.KubesphereURL.Scheme
|
||||
}
|
||||
|
||||
httpProxy := proxy.NewUpgradeAwareHandler(&u, transport, false, false, c)
|
||||
@@ -190,73 +147,3 @@ func (c *clusterDispatch) Dispatch(w http.ResponseWriter, req *http.Request, han
|
||||
func (c *clusterDispatch) Error(w http.ResponseWriter, req *http.Request, err error) {
|
||||
responsewriters.InternalError(w, req, err)
|
||||
}
|
||||
|
||||
func (c *clusterDispatch) getInnerCluster(name string) *innerCluster {
|
||||
c.mutex.RLock()
|
||||
defer c.mutex.RUnlock()
|
||||
if cluster, ok := c.innerClusters[name]; ok {
|
||||
return cluster
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *clusterDispatch) updateInnerClusters(obj interface{}) {
|
||||
cluster := obj.(*clusterv1alpha1.Cluster)
|
||||
|
||||
kubernetesEndpoint, err := url.Parse(cluster.Spec.Connection.KubernetesAPIEndpoint)
|
||||
if err != nil {
|
||||
klog.Errorf("Parse kubernetes apiserver endpoint %s failed, %v", cluster.Spec.Connection.KubernetesAPIEndpoint, err)
|
||||
return
|
||||
}
|
||||
|
||||
kubesphereEndpoint, err := url.Parse(cluster.Spec.Connection.KubeSphereAPIEndpoint)
|
||||
if err != nil {
|
||||
klog.Errorf("Parse kubesphere apiserver endpoint %s failed, %v", cluster.Spec.Connection.KubeSphereAPIEndpoint, err)
|
||||
return
|
||||
}
|
||||
|
||||
// prepare for
|
||||
clientConfig, err := clientcmd.NewClientConfigFromBytes(cluster.Spec.Connection.KubeConfig)
|
||||
if err != nil {
|
||||
klog.Errorf("Unable to create client config from kubeconfig bytes, %#v", err)
|
||||
return
|
||||
}
|
||||
|
||||
clusterConfig, err := clientConfig.ClientConfig()
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get client config, %#v", err)
|
||||
return
|
||||
}
|
||||
|
||||
transport, err := rest.TransportFor(clusterConfig)
|
||||
if err != nil {
|
||||
klog.Errorf("Create transport failed, %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
c.mutex.Lock()
|
||||
c.innerClusters[cluster.Name] = &innerCluster{
|
||||
kubernetesURL: kubernetesEndpoint,
|
||||
kubesphereURL: kubesphereEndpoint,
|
||||
transport: transport,
|
||||
}
|
||||
c.mutex.Unlock()
|
||||
}
|
||||
|
||||
func isClusterReady(cluster *clusterv1alpha1.Cluster) bool {
|
||||
for _, condition := range cluster.Status.Conditions {
|
||||
if condition.Type == clusterv1alpha1.ClusterReady && condition.Status == corev1.ConditionTrue {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func isClusterHostCluster(cluster *clusterv1alpha1.Cluster) bool {
|
||||
if _, ok := cluster.Labels[clusterv1alpha1.HostCluster]; ok {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user