@@ -2,6 +2,7 @@ package v1alpha1
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/emicklei/go-restful"
|
||||
"io"
|
||||
@@ -10,20 +11,30 @@ import (
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
v1 "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"kubesphere.io/kubesphere/pkg/api"
|
||||
"kubesphere.io/kubesphere/pkg/apis/cluster/v1alpha1"
|
||||
clusterlister "kubesphere.io/kubesphere/pkg/client/listers/cluster/v1alpha1"
|
||||
"kubesphere.io/kubesphere/pkg/version"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"k8s.io/cli-runtime/pkg/printers"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultAgentImage = "kubesphere/tower:v1.0"
|
||||
defaultTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
var ErrClusterConnectionIsNotProxy = fmt.Errorf("cluster is not using proxy connection")
|
||||
var errClusterConnectionIsNotProxy = fmt.Errorf("cluster is not using proxy connection")
|
||||
var errNon200Response = fmt.Errorf("non-200 response returned from endpoint")
|
||||
var errInvalidResponse = fmt.Errorf("invalid response from kubesphere apiserver")
|
||||
|
||||
type handler struct {
|
||||
serviceLister v1.ServiceLister
|
||||
@@ -87,7 +98,6 @@ func (h *handler) GenerateAgentDeployment(request *restful.Request, response *re
|
||||
}
|
||||
|
||||
response.Write(buf.Bytes())
|
||||
|
||||
}
|
||||
|
||||
//
|
||||
@@ -138,7 +148,7 @@ func (h *handler) populateProxyAddress() error {
|
||||
func (h *handler) generateDefaultDeployment(cluster *v1alpha1.Cluster, w io.Writer) error {
|
||||
|
||||
if cluster.Spec.Connection.Type == v1alpha1.ConnectionTypeDirect {
|
||||
return ErrClusterConnectionIsNotProxy
|
||||
return errClusterConnectionIsNotProxy
|
||||
}
|
||||
|
||||
agent := appsv1.Deployment{
|
||||
@@ -174,6 +184,7 @@ func (h *handler) generateDefaultDeployment(cluster *v1alpha1.Cluster, w io.Writ
|
||||
fmt.Sprintf("--name=%s", cluster.Name),
|
||||
fmt.Sprintf("--token=%s", cluster.Spec.Connection.Token),
|
||||
fmt.Sprintf("--proxy-server=%s", h.proxyAddress),
|
||||
fmt.Sprintf("--keepalive=30s"),
|
||||
fmt.Sprintf("--kubesphere-service=ks-apiserver.kubesphere-system.svc:80"),
|
||||
fmt.Sprintf("--kubernetes-service=kubernetes.default.svc:443"),
|
||||
},
|
||||
@@ -198,3 +209,102 @@ func (h *handler) generateDefaultDeployment(cluster *v1alpha1.Cluster, w io.Writ
|
||||
|
||||
return h.yamlPrinter.PrintObj(&agent, w)
|
||||
}
|
||||
|
||||
// ValidateCluster validate cluster kubeconfig and kubesphere apiserver address, check their accessibility
|
||||
func (h *handler) ValidateCluster(request *restful.Request, response *restful.Response) {
|
||||
var cluster v1alpha1.Cluster
|
||||
|
||||
err := request.ReadEntity(&cluster)
|
||||
if err != nil {
|
||||
api.HandleBadRequest(response, request, err)
|
||||
return
|
||||
}
|
||||
|
||||
if cluster.Spec.Connection.Type != v1alpha1.ConnectionTypeDirect {
|
||||
api.HandleBadRequest(response, request, fmt.Errorf("cluster connection type is not direct"))
|
||||
return
|
||||
}
|
||||
|
||||
if len(cluster.Spec.Connection.KubeConfig) == 0 || len(cluster.Spec.Connection.KubeSphereAPIEndpoint) == 0 {
|
||||
api.HandleBadRequest(response, request, fmt.Errorf("cluster kubeconfig and kubesphere endpoint should not be empty"))
|
||||
return
|
||||
}
|
||||
|
||||
err = validateKubeConfig(cluster.Spec.Connection.KubeConfig)
|
||||
if err != nil {
|
||||
api.HandleBadRequest(response, request, err)
|
||||
return
|
||||
}
|
||||
|
||||
_, err = validateKubeSphereAPIServer(cluster.Spec.Connection.KubeSphereAPIEndpoint)
|
||||
if err != nil {
|
||||
api.HandleBadRequest(response, request, fmt.Errorf("unable validate kubesphere endpoint, %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
response.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
// validateKubeConfig takes base64 encoded kubeconfig and check its validity
|
||||
func validateKubeConfig(kubeconfig []byte) error {
|
||||
config, err := loadKubeConfigFromBytes(kubeconfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
config.Timeout = defaultTimeout
|
||||
|
||||
clientSet, err := kubernetes.NewForConfig(config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = clientSet.Discovery().ServerVersion()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func loadKubeConfigFromBytes(kubeconfig []byte) (*rest.Config, error) {
|
||||
clientConfig, err := clientcmd.NewClientConfigFromBytes(kubeconfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
config, err := clientConfig.ClientConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
||||
// validateKubeSphereAPIServer uses version api to check the accessibility
|
||||
func validateKubeSphereAPIServer(ksEndpoint string) (*version.Info, error) {
|
||||
_, err := url.Parse(ksEndpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
path := fmt.Sprintf("%s/kapis/version", ksEndpoint)
|
||||
|
||||
client := http.Client{
|
||||
Timeout: defaultTimeout,
|
||||
}
|
||||
|
||||
response, err := client.Get(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if response.StatusCode != http.StatusOK {
|
||||
return nil, errNon200Response
|
||||
}
|
||||
|
||||
ver := version.Info{}
|
||||
err = json.NewDecoder(response.Body).Decode(&ver)
|
||||
if err != nil {
|
||||
return nil, errInvalidResponse
|
||||
}
|
||||
|
||||
return &ver, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user