This is a huge commit, it does following things: (#1942)
1. Remove ks-iam standalone binary, move it to ks-apiserver 2. Generate all devops apis inside kubesphere repository, no need to import s2ioperator. 3. Reorganize ldap code, make it more flexible to use.
This commit is contained in:
@@ -1,18 +1,38 @@
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/emicklei/go-restful"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
urlruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/klog"
|
||||
"kubesphere.io/kubesphere/pkg/api/iam"
|
||||
"kubesphere.io/kubesphere/pkg/informers"
|
||||
iamv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/iam/v1alpha2"
|
||||
loggingv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/logging/v1alpha2"
|
||||
monitoringv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/monitoring/v1alpha2"
|
||||
openpitrixv1 "kubesphere.io/kubesphere/pkg/kapis/openpitrix/v1"
|
||||
operationsv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/operations/v1alpha2"
|
||||
resourcesv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/resources/v1alpha2"
|
||||
resourcev1alpha3 "kubesphere.io/kubesphere/pkg/kapis/resources/v1alpha3"
|
||||
"kubesphere.io/kubesphere/pkg/server/options"
|
||||
servicemeshv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/servicemesh/metrics/v1alpha2"
|
||||
terminalv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/terminal/v1alpha2"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/cache"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/db"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/devops"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/k8s"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/ldap"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/logging"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/monitoring"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/openpitrix"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/s3"
|
||||
"net"
|
||||
"net/http"
|
||||
rt "runtime"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -26,56 +46,295 @@ const (
|
||||
MimeJsonPatchJson = "application/json-patch+json"
|
||||
)
|
||||
|
||||
// Dependencies is objects constructed at runtime that are necessary for running apiserver.
|
||||
type Dependencies struct {
|
||||
|
||||
// Injected Dependencies
|
||||
KubeClient k8s.Client
|
||||
S3 s3.Interface
|
||||
OpenPitrix openpitrix.Client
|
||||
Monitoring monitoring.Interface
|
||||
Logging logging.Interface
|
||||
Devops devops.Interface
|
||||
DB db.Interface
|
||||
}
|
||||
|
||||
type APIServer struct {
|
||||
|
||||
// number of kubesphere apiserver
|
||||
apiserverCount int
|
||||
ServerCount int
|
||||
|
||||
//
|
||||
genericServerOptions *options.ServerRunOptions
|
||||
Server *http.Server
|
||||
|
||||
AuthenticateOptions *iam.AuthenticationOptions
|
||||
|
||||
// webservice container, where all webservice defines
|
||||
container *restful.Container
|
||||
|
||||
// kubeClient is a collection of all kubernetes(include CRDs) objects clientset
|
||||
kubeClient k8s.Client
|
||||
KubernetesClient k8s.Client
|
||||
|
||||
// informerFactory is a collection of all kubernetes(include CRDs) objects informers,
|
||||
// mainly for fast query
|
||||
informerFactory informers.InformerFactory
|
||||
InformerFactory informers.InformerFactory
|
||||
|
||||
// cache is used for short lived objects, like session
|
||||
cache cache.Interface
|
||||
CacheClient cache.Interface
|
||||
|
||||
// monitoring client set
|
||||
MonitoringClient monitoring.Interface
|
||||
|
||||
//
|
||||
OpenpitrixClient openpitrix.Client
|
||||
|
||||
//
|
||||
LoggingClient logging.Interface
|
||||
|
||||
//
|
||||
DevopsClient devops.Interface
|
||||
|
||||
//
|
||||
S3Client s3.Interface
|
||||
|
||||
//
|
||||
DBClient db.Interface
|
||||
|
||||
//
|
||||
LdapClient ldap.Interface
|
||||
|
||||
//
|
||||
}
|
||||
|
||||
func (s *APIServer) PrepareRun() error {
|
||||
|
||||
s.container = restful.NewContainer()
|
||||
s.container.Filter(logRequestAndResponse)
|
||||
s.container.Router(restful.CurlyRouter{})
|
||||
s.container.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
|
||||
logStackOnRecover(panicReason, httpWriter)
|
||||
})
|
||||
|
||||
s.installKubeSphereAPIs()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *APIServer) installKubeSphereAPIs() {
|
||||
|
||||
urlruntime.Must(resourcev1alpha3.AddToContainer(s.container, s.InformerFactory))
|
||||
urlruntime.Must(servicemeshv1alpha2.AddToContainer(s.container))
|
||||
|
||||
// Need to refactor devops api registration, too much dependencies
|
||||
//urlruntime.Must(devopsv1alpha2.AddToContainer(s.container, s.DevopsClient))
|
||||
|
||||
urlruntime.Must(loggingv1alpha2.AddToContainer(s.container, s.KubernetesClient, s.LoggingClient))
|
||||
urlruntime.Must(monitoringv1alpha2.AddToContainer(s.container, s.KubernetesClient, s.MonitoringClient))
|
||||
urlruntime.Must(openpitrixv1.AddToContainer(s.container, s.InformerFactory, s.OpenpitrixClient))
|
||||
urlruntime.Must(operationsv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes()))
|
||||
urlruntime.Must(resourcesv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.InformerFactory))
|
||||
//urlruntime.Must(tenantv1alpha2.AddToContainer(s.container, s.KubernetesClient, s.InformerFactory, s.DBClient))
|
||||
urlruntime.Must(terminalv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.KubernetesClient.Config()))
|
||||
urlruntime.Must(iamv1alpha2.AddToContainer(s.container, s.KubernetesClient, s.InformerFactory, s.LdapClient, s.CacheClient, s.AuthenticateOptions))
|
||||
|
||||
}
|
||||
|
||||
func New(deps *Dependencies) *APIServer {
|
||||
func (s *APIServer) Run(stopCh <-chan struct{}) error {
|
||||
|
||||
server := &APIServer{}
|
||||
err := s.waitForResourceSync(stopCh)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return server
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
go func() {
|
||||
<-stopCh
|
||||
_ = s.Server.Shutdown(ctx)
|
||||
}()
|
||||
|
||||
if s.Server.TLSConfig != nil {
|
||||
return s.Server.ListenAndServeTLS("", "")
|
||||
} else {
|
||||
return s.Server.ListenAndServe()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *APIServer) InstallKubeSphereAPIs() {
|
||||
func (s *APIServer) waitForResourceSync(stopCh <-chan struct{}) error {
|
||||
klog.V(0).Info("Start cache objects")
|
||||
|
||||
discoveryClient := s.KubernetesClient.Kubernetes().Discovery()
|
||||
apiResourcesList, err := discoveryClient.ServerResources()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
isResourceExists := func(resource schema.GroupVersionResource) bool {
|
||||
for _, apiResource := range apiResourcesList {
|
||||
if apiResource.GroupVersion == resource.GroupVersion().String() {
|
||||
for _, rsc := range apiResource.APIResources {
|
||||
if rsc.Name == resource.Resource {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// resources we have to create informer first
|
||||
k8sGVRs := []schema.GroupVersionResource{
|
||||
{Group: "", Version: "v1", Resource: "namespaces"},
|
||||
{Group: "", Version: "v1", Resource: "nodes"},
|
||||
{Group: "", Version: "v1", Resource: "resourcequotas"},
|
||||
{Group: "", Version: "v1", Resource: "pods"},
|
||||
{Group: "", Version: "v1", Resource: "services"},
|
||||
{Group: "", Version: "v1", Resource: "persistentvolumeclaims"},
|
||||
{Group: "", Version: "v1", Resource: "secrets"},
|
||||
{Group: "", Version: "v1", Resource: "configmaps"},
|
||||
|
||||
{Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "roles"},
|
||||
{Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "rolebindings"},
|
||||
{Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "clusterroles"},
|
||||
{Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "clusterrolebindings"},
|
||||
|
||||
{Group: "apps", Version: "v1", Resource: "deployments"},
|
||||
{Group: "apps", Version: "v1", Resource: "daemonsets"},
|
||||
{Group: "apps", Version: "v1", Resource: "replicasets"},
|
||||
{Group: "apps", Version: "v1", Resource: "statefulsets"},
|
||||
{Group: "apps", Version: "v1", Resource: "controllerrevisions"},
|
||||
|
||||
{Group: "storage.k8s.io", Version: "v1", Resource: "storageclasses"},
|
||||
|
||||
{Group: "batch", Version: "v1", Resource: "jobs"},
|
||||
{Group: "batch", Version: "v1beta1", Resource: "cronjobs"},
|
||||
|
||||
{Group: "extensions", Version: "v1beta1", Resource: "ingresses"},
|
||||
|
||||
{Group: "autoscaling", Version: "v2beta2", Resource: "horizontalpodautoscalers"},
|
||||
}
|
||||
|
||||
for _, gvr := range k8sGVRs {
|
||||
if !isResourceExists(gvr) {
|
||||
klog.Warningf("resource %s not exists in the cluster", gvr)
|
||||
} else {
|
||||
_, err := s.InformerFactory.KubernetesSharedInformerFactory().ForResource(gvr)
|
||||
if err != nil {
|
||||
klog.Errorf("cannot create informer for %s", gvr)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
s.InformerFactory.KubernetesSharedInformerFactory().Start(stopCh)
|
||||
s.InformerFactory.KubernetesSharedInformerFactory().WaitForCacheSync(stopCh)
|
||||
|
||||
ksInformerFactory := s.InformerFactory.KubeSphereSharedInformerFactory()
|
||||
|
||||
ksGVRs := []schema.GroupVersionResource{
|
||||
{Group: "tenant.kubesphere.io", Version: "v1alpha1", Resource: "workspaces"},
|
||||
}
|
||||
|
||||
devopsGVRs := []schema.GroupVersionResource{
|
||||
{Group: "devops.kubesphere.io", Version: "v1alpha1", Resource: "s2ibinaries"},
|
||||
{Group: "devops.kubesphere.io", Version: "v1alpha1", Resource: "s2ibuildertemplates"},
|
||||
{Group: "devops.kubesphere.io", Version: "v1alpha1", Resource: "s2iruns"},
|
||||
{Group: "devops.kubesphere.io", Version: "v1alpha1", Resource: "s2ibuilders"},
|
||||
}
|
||||
|
||||
servicemeshGVRs := []schema.GroupVersionResource{
|
||||
{Group: "servicemesh.kubesphere.io", Version: "v1alpha2", Resource: "strategies"},
|
||||
{Group: "servicemesh.kubesphere.io", Version: "v1alpha2", Resource: "servicepolicies"},
|
||||
}
|
||||
|
||||
// skip caching devops resources if devops not enabled
|
||||
if s.DevopsClient != nil {
|
||||
ksGVRs = append(ksGVRs, devopsGVRs...)
|
||||
}
|
||||
|
||||
// skip caching servicemesh resources if servicemesh not enabled
|
||||
if s.KubernetesClient.Istio() != nil {
|
||||
ksGVRs = append(ksGVRs, servicemeshGVRs...)
|
||||
}
|
||||
|
||||
for _, gvr := range ksGVRs {
|
||||
if !isResourceExists(gvr) {
|
||||
klog.Warningf("resource %s not exists in the cluster", gvr)
|
||||
} else {
|
||||
_, err := ksInformerFactory.ForResource(gvr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ksInformerFactory.Start(stopCh)
|
||||
ksInformerFactory.WaitForCacheSync(stopCh)
|
||||
|
||||
appInformerFactory := s.InformerFactory.ApplicationSharedInformerFactory()
|
||||
|
||||
appGVRs := []schema.GroupVersionResource{
|
||||
{Group: "app.k8s.io", Version: "v1beta1", Resource: "applications"},
|
||||
}
|
||||
|
||||
for _, gvr := range appGVRs {
|
||||
if !isResourceExists(gvr) {
|
||||
klog.Warningf("resource %s not exists in the cluster", gvr)
|
||||
} else {
|
||||
_, err := appInformerFactory.ForResource(gvr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
appInformerFactory.Start(stopCh)
|
||||
appInformerFactory.WaitForCacheSync(stopCh)
|
||||
|
||||
klog.V(0).Info("Finished caching objects")
|
||||
|
||||
return nil
|
||||
|
||||
resourcev1alpha3.AddWebService(s.container, s.kubeClient)
|
||||
}
|
||||
|
||||
func (s *APIServer) Serve() error {
|
||||
panic("implement me")
|
||||
func logStackOnRecover(panicReason interface{}, w http.ResponseWriter) {
|
||||
var buffer bytes.Buffer
|
||||
buffer.WriteString(fmt.Sprintf("recover from panic situation: - %v\r\n", panicReason))
|
||||
for i := 2; ; i += 1 {
|
||||
_, file, line, ok := rt.Caller(i)
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
buffer.WriteString(fmt.Sprintf(" %s:%d\r\n", file, line))
|
||||
}
|
||||
klog.Errorln(buffer.String())
|
||||
|
||||
headers := http.Header{}
|
||||
if ct := w.Header().Get("Content-Type"); len(ct) > 0 {
|
||||
headers.Set("Accept", ct)
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
w.Write([]byte("Internal server error"))
|
||||
}
|
||||
|
||||
func logRequestAndResponse(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) {
|
||||
start := time.Now()
|
||||
chain.ProcessFilter(req, resp)
|
||||
klog.V(4).Infof("%s - \"%s %s %s\" %d %d %dms",
|
||||
getRequestIP(req),
|
||||
req.Request.Method,
|
||||
req.Request.RequestURI,
|
||||
req.Request.Proto,
|
||||
resp.StatusCode(),
|
||||
resp.ContentLength(),
|
||||
time.Since(start)/time.Millisecond,
|
||||
)
|
||||
}
|
||||
|
||||
func getRequestIP(req *restful.Request) string {
|
||||
address := strings.Trim(req.Request.Header.Get("X-Real-Ip"), " ")
|
||||
if address != "" {
|
||||
return address
|
||||
}
|
||||
|
||||
address = strings.Trim(req.Request.Header.Get("X-Forwarded-For"), " ")
|
||||
if address != "" {
|
||||
return address
|
||||
}
|
||||
|
||||
address, _, err := net.SplitHostPort(req.Request.RemoteAddr)
|
||||
if err != nil {
|
||||
return req.Request.RemoteAddr
|
||||
}
|
||||
|
||||
return address
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user