Merge remote-tracking branch 'upstream/dev' into dev
# Conflicts: # hack/generate_client.sh
This commit is contained in:
@@ -183,7 +183,7 @@ func (s *APIServer) buildHandlerChain() {
|
||||
handler := s.Server.Handler
|
||||
|
||||
handler = filters.WithKubeAPIServer(handler, s.KubernetesClient.Config(), &errorResponder{})
|
||||
handler = filters.WithMultipleClusterDispatcher(handler, dispatch.DefaultClusterDispatch)
|
||||
handler = filters.WithMultipleClusterDispatcher(handler, dispatch.NewClusterDispatch(s.InformerFactory.KubeSphereSharedInformerFactory().Tower().V1alpha1().Agents().Lister()))
|
||||
|
||||
excludedPaths := []string{"/oauth/*", "/kapis/config.kubesphere.io/*"}
|
||||
pathAuthorizer, _ := path.NewAuthorizer(excludedPaths)
|
||||
@@ -273,6 +273,7 @@ func (s *APIServer) waitForResourceSync(stopCh <-chan struct{}) error {
|
||||
|
||||
ksGVRs := []schema.GroupVersionResource{
|
||||
{Group: "tenant.kubesphere.io", Version: "v1alpha1", Resource: "workspaces"},
|
||||
{Group: "tower.kubesphere.io", Version: "v1alpha1", Resource: "agents"},
|
||||
}
|
||||
|
||||
devopsGVRs := []schema.GroupVersionResource{
|
||||
|
||||
@@ -1,36 +1,76 @@
|
||||
package dispatch
|
||||
|
||||
import (
|
||||
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
||||
"net/http"
|
||||
|
||||
"fmt"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/util/proxy"
|
||||
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
||||
towerv1alpha1 "kubesphere.io/kubesphere/pkg/apis/tower/v1alpha1"
|
||||
"kubesphere.io/kubesphere/pkg/apiserver/request"
|
||||
"kubesphere.io/kubesphere/pkg/client/listers/tower/v1alpha1"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const defaultMultipleClusterAgentNamespace = "kubesphere-system"
|
||||
|
||||
// Dispatcher defines how to forward request to designated cluster based on cluster name
|
||||
type Dispatcher interface {
|
||||
Dispatch(w http.ResponseWriter, req *http.Request)
|
||||
Dispatch(w http.ResponseWriter, req *http.Request, handler http.Handler)
|
||||
}
|
||||
|
||||
var DefaultClusterDispatch = newClusterDispatch()
|
||||
|
||||
type clusterDispatch struct {
|
||||
transport *http.Transport
|
||||
agentLister v1alpha1.AgentLister
|
||||
}
|
||||
|
||||
func newClusterDispatch() Dispatcher {
|
||||
return &clusterDispatch{}
|
||||
func NewClusterDispatch(agentLister v1alpha1.AgentLister) Dispatcher {
|
||||
return &clusterDispatch{
|
||||
agentLister: agentLister,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *clusterDispatch) Dispatch(w http.ResponseWriter, req *http.Request) {
|
||||
func (c *clusterDispatch) Dispatch(w http.ResponseWriter, req *http.Request, handler http.Handler) {
|
||||
|
||||
info, _ := request.RequestInfoFrom(req.Context())
|
||||
if info.Cluster == "" { // fallback to host cluster if cluster name if empty
|
||||
handler.ServeHTTP(w, req)
|
||||
return
|
||||
}
|
||||
|
||||
agent, err := c.agentLister.Agents(defaultMultipleClusterAgentNamespace).Get(info.Cluster)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
http.Error(w, fmt.Sprintf("cluster %s not found", info.Cluster), http.StatusNotFound)
|
||||
} else {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if !isAgentReady(agent) {
|
||||
http.Error(w, fmt.Sprintf("cluster agent is not ready"), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
u := *req.URL
|
||||
// u.Host = someHost
|
||||
u.Host = agent.Spec.Proxy
|
||||
u.Path = strings.Replace(u.Path, fmt.Sprintf("/clusters/%s", info.Cluster), "", 1)
|
||||
|
||||
httpProxy := proxy.NewUpgradeAwareHandler(&u, c.transport, false, false, c)
|
||||
httpProxy := proxy.NewUpgradeAwareHandler(&u, http.DefaultTransport, true, false, c)
|
||||
httpProxy.ServeHTTP(w, req)
|
||||
}
|
||||
|
||||
func (c *clusterDispatch) Error(w http.ResponseWriter, req *http.Request, err error) {
|
||||
responsewriters.InternalError(w, req, err)
|
||||
}
|
||||
|
||||
func isAgentReady(agent *towerv1alpha1.Agent) bool {
|
||||
for _, condition := range agent.Status.Conditions {
|
||||
if condition.Type == towerv1alpha1.AgentConnected && condition.Status == corev1.ConditionTrue {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"kubesphere.io/kubesphere/pkg/apiserver/dispatch"
|
||||
"kubesphere.io/kubesphere/pkg/apiserver/request"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Multiple cluster dispatcher forward request to desired cluster based on request cluster name
|
||||
@@ -24,12 +23,10 @@ func WithMultipleClusterDispatcher(handler http.Handler, dispatch dispatch.Dispa
|
||||
return
|
||||
}
|
||||
|
||||
if info.Cluster == "host-cluster" || info.Cluster == "" {
|
||||
if info.Cluster == "" {
|
||||
handler.ServeHTTP(w, req)
|
||||
} else {
|
||||
// remove cluster path
|
||||
req.URL.Path = strings.Replace(req.URL.Path, fmt.Sprintf("/clusters/%s", info.Cluster), "", 1)
|
||||
dispatch.Dispatch(w, req)
|
||||
dispatch.Dispatch(w, req, handler)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -126,8 +126,6 @@ func (r *RequestInfoFactory) NewRequestInfo(req *http.Request) (*RequestInfo, er
|
||||
if currentParts[0] == "clusters" {
|
||||
requestInfo.Cluster = currentParts[1]
|
||||
currentParts = currentParts[2:]
|
||||
} else if len(currentParts) > 0 {
|
||||
requestInfo.Cluster = "host-cluster"
|
||||
}
|
||||
|
||||
if currentParts[0] == "workspaces" {
|
||||
|
||||
@@ -44,7 +44,8 @@ func TestRequestInfoFactory_NewRequestInfo(t *testing.T) {
|
||||
expectedIsResourceRequest bool
|
||||
expectedCluster string
|
||||
expectedWorkspace string
|
||||
exceptedNamespace string
|
||||
expectedNamespace string
|
||||
expectedKubernetesRequest bool
|
||||
}{
|
||||
{
|
||||
name: "login",
|
||||
@@ -55,6 +56,7 @@ func TestRequestInfoFactory_NewRequestInfo(t *testing.T) {
|
||||
expectedResource: "",
|
||||
expectedIsResourceRequest: false,
|
||||
expectedCluster: "",
|
||||
expectedKubernetesRequest: false,
|
||||
},
|
||||
{
|
||||
name: "list cluster roles",
|
||||
@@ -65,6 +67,7 @@ func TestRequestInfoFactory_NewRequestInfo(t *testing.T) {
|
||||
expectedResource: "clusterroles",
|
||||
expectedIsResourceRequest: true,
|
||||
expectedCluster: "cluster1",
|
||||
expectedKubernetesRequest: true,
|
||||
},
|
||||
{
|
||||
name: "list cluster nodes",
|
||||
@@ -75,6 +78,7 @@ func TestRequestInfoFactory_NewRequestInfo(t *testing.T) {
|
||||
expectedResource: "nodes",
|
||||
expectedIsResourceRequest: true,
|
||||
expectedCluster: "cluster1",
|
||||
expectedKubernetesRequest: true,
|
||||
},
|
||||
{
|
||||
name: "list cluster nodes",
|
||||
@@ -85,6 +89,7 @@ func TestRequestInfoFactory_NewRequestInfo(t *testing.T) {
|
||||
expectedResource: "nodes",
|
||||
expectedIsResourceRequest: true,
|
||||
expectedCluster: "cluster1",
|
||||
expectedKubernetesRequest: true,
|
||||
},
|
||||
{
|
||||
name: "list cluster nodes",
|
||||
@@ -94,7 +99,8 @@ func TestRequestInfoFactory_NewRequestInfo(t *testing.T) {
|
||||
expectedVerb: "list",
|
||||
expectedResource: "nodes",
|
||||
expectedIsResourceRequest: true,
|
||||
expectedCluster: "host-cluster",
|
||||
expectedCluster: "",
|
||||
expectedKubernetesRequest: true,
|
||||
},
|
||||
{
|
||||
name: "list roles",
|
||||
@@ -104,8 +110,9 @@ func TestRequestInfoFactory_NewRequestInfo(t *testing.T) {
|
||||
expectedVerb: "list",
|
||||
expectedResource: "roles",
|
||||
expectedIsResourceRequest: true,
|
||||
exceptedNamespace: "namespace1",
|
||||
expectedNamespace: "namespace1",
|
||||
expectedCluster: "cluster1",
|
||||
expectedKubernetesRequest: true,
|
||||
},
|
||||
{
|
||||
name: "list roles",
|
||||
@@ -115,7 +122,9 @@ func TestRequestInfoFactory_NewRequestInfo(t *testing.T) {
|
||||
expectedVerb: "list",
|
||||
expectedResource: "roles",
|
||||
expectedIsResourceRequest: true,
|
||||
expectedCluster: "host-cluster",
|
||||
expectedCluster: "",
|
||||
expectedNamespace: "namespace1",
|
||||
expectedKubernetesRequest: true,
|
||||
},
|
||||
{
|
||||
name: "list namespaces",
|
||||
@@ -126,7 +135,8 @@ func TestRequestInfoFactory_NewRequestInfo(t *testing.T) {
|
||||
expectedResource: "namespaces",
|
||||
expectedIsResourceRequest: true,
|
||||
expectedWorkspace: "workspace1",
|
||||
expectedCluster: "host-cluster",
|
||||
expectedCluster: "",
|
||||
expectedKubernetesRequest: false,
|
||||
},
|
||||
{
|
||||
name: "list namespaces",
|
||||
@@ -138,6 +148,32 @@ func TestRequestInfoFactory_NewRequestInfo(t *testing.T) {
|
||||
expectedIsResourceRequest: true,
|
||||
expectedWorkspace: "workspace1",
|
||||
expectedCluster: "cluster1",
|
||||
expectedKubernetesRequest: false,
|
||||
},
|
||||
{
|
||||
name: "random query",
|
||||
url: "/foo/bar",
|
||||
method: http.MethodGet,
|
||||
expectedErr: nil,
|
||||
expectedVerb: "GET",
|
||||
expectedResource: "",
|
||||
expectedIsResourceRequest: false,
|
||||
expectedWorkspace: "",
|
||||
expectedCluster: "",
|
||||
expectedKubernetesRequest: false,
|
||||
},
|
||||
{
|
||||
name: "kubesphere api without clusters",
|
||||
url: "/kapis/foo/bar/",
|
||||
method: http.MethodPost,
|
||||
expectedErr: nil,
|
||||
expectedVerb: "POST",
|
||||
expectedResource: "",
|
||||
expectedNamespace: "",
|
||||
expectedWorkspace: "",
|
||||
expectedCluster: "",
|
||||
expectedIsResourceRequest: false,
|
||||
expectedKubernetesRequest: false,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -155,23 +191,27 @@ func TestRequestInfoFactory_NewRequestInfo(t *testing.T) {
|
||||
t.Errorf("%s: expected error %v, actual %v", test.name, test.expectedErr, err)
|
||||
}
|
||||
} else {
|
||||
if test.expectedVerb != "" && test.expectedVerb != requestInfo.Verb {
|
||||
if test.expectedVerb != requestInfo.Verb {
|
||||
t.Errorf("%s: expected verb %v, actual %+v", test.name, test.expectedVerb, requestInfo.Verb)
|
||||
}
|
||||
if test.expectedResource != "" && test.expectedResource != requestInfo.Resource {
|
||||
if test.expectedResource != requestInfo.Resource {
|
||||
t.Errorf("%s: expected resource %v, actual %+v", test.name, test.expectedResource, requestInfo.Resource)
|
||||
}
|
||||
if test.expectedIsResourceRequest != requestInfo.IsResourceRequest {
|
||||
t.Errorf("%s: expected is resource request %v, actual %+v", test.name, test.expectedIsResourceRequest, requestInfo.IsResourceRequest)
|
||||
}
|
||||
if test.expectedCluster != "" && test.expectedCluster != requestInfo.Cluster {
|
||||
if test.expectedCluster != requestInfo.Cluster {
|
||||
t.Errorf("%s: expected cluster %v, actual %+v", test.name, test.expectedCluster, requestInfo.Cluster)
|
||||
}
|
||||
if test.expectedWorkspace != "" && test.expectedWorkspace != requestInfo.Workspace {
|
||||
if test.expectedWorkspace != requestInfo.Workspace {
|
||||
t.Errorf("%s: expected workspace %v, actual %+v", test.name, test.expectedWorkspace, requestInfo.Workspace)
|
||||
}
|
||||
if test.exceptedNamespace != "" && test.exceptedNamespace != requestInfo.Namespace {
|
||||
t.Errorf("%s: expected namespace %v, actual %+v", test.name, test.exceptedNamespace, requestInfo.Namespace)
|
||||
if test.expectedNamespace != requestInfo.Namespace {
|
||||
t.Errorf("%s: expected namespace %v, actual %+v", test.name, test.expectedNamespace, requestInfo.Namespace)
|
||||
}
|
||||
|
||||
if test.expectedKubernetesRequest != requestInfo.IsKubernetesRequest {
|
||||
t.Errorf("%s: expected kubernetes request %v, actual %+v", test.name, test.expectedKubernetesRequest, requestInfo.IsKubernetesRequest)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user