move agent crd to kubesphere (#1974)
This commit is contained in:
@@ -1,36 +1,76 @@
|
||||
package dispatch
|
||||
|
||||
import (
|
||||
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
||||
"net/http"
|
||||
|
||||
"fmt"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/util/proxy"
|
||||
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
||||
towerv1alpha1 "kubesphere.io/kubesphere/pkg/apis/tower/v1alpha1"
|
||||
"kubesphere.io/kubesphere/pkg/apiserver/request"
|
||||
"kubesphere.io/kubesphere/pkg/client/listers/tower/v1alpha1"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const defaultMultipleClusterAgentNamespace = "kubesphere-system"
|
||||
|
||||
// Dispatcher defines how to forward request to designated cluster based on cluster name
|
||||
type Dispatcher interface {
|
||||
Dispatch(w http.ResponseWriter, req *http.Request)
|
||||
Dispatch(w http.ResponseWriter, req *http.Request, handler http.Handler)
|
||||
}
|
||||
|
||||
var DefaultClusterDispatch = newClusterDispatch()
|
||||
|
||||
type clusterDispatch struct {
|
||||
transport *http.Transport
|
||||
agentLister v1alpha1.AgentLister
|
||||
}
|
||||
|
||||
func newClusterDispatch() Dispatcher {
|
||||
return &clusterDispatch{}
|
||||
func NewClusterDispatch(agentLister v1alpha1.AgentLister) Dispatcher {
|
||||
return &clusterDispatch{
|
||||
agentLister: agentLister,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *clusterDispatch) Dispatch(w http.ResponseWriter, req *http.Request) {
|
||||
func (c *clusterDispatch) Dispatch(w http.ResponseWriter, req *http.Request, handler http.Handler) {
|
||||
|
||||
info, _ := request.RequestInfoFrom(req.Context())
|
||||
if info.Cluster == "" { // fallback to host cluster if cluster name if empty
|
||||
handler.ServeHTTP(w, req)
|
||||
return
|
||||
}
|
||||
|
||||
agent, err := c.agentLister.Agents(defaultMultipleClusterAgentNamespace).Get(info.Cluster)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
http.Error(w, fmt.Sprintf("cluster %s not found", info.Cluster), http.StatusNotFound)
|
||||
} else {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if !isAgentReady(agent) {
|
||||
http.Error(w, fmt.Sprintf("cluster agent is not ready"), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
u := *req.URL
|
||||
// u.Host = someHost
|
||||
u.Host = agent.Spec.Proxy
|
||||
u.Path = strings.Replace(u.Path, fmt.Sprintf("/clusters/%s", info.Cluster), "", 1)
|
||||
|
||||
httpProxy := proxy.NewUpgradeAwareHandler(&u, c.transport, false, false, c)
|
||||
httpProxy := proxy.NewUpgradeAwareHandler(&u, http.DefaultTransport, true, false, c)
|
||||
httpProxy.ServeHTTP(w, req)
|
||||
}
|
||||
|
||||
func (c *clusterDispatch) Error(w http.ResponseWriter, req *http.Request, err error) {
|
||||
responsewriters.InternalError(w, req, err)
|
||||
}
|
||||
|
||||
func isAgentReady(agent *towerv1alpha1.Agent) bool {
|
||||
for _, condition := range agent.Status.Conditions {
|
||||
if condition.Type == towerv1alpha1.AgentConnected && condition.Status == corev1.ConditionTrue {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user