This is a huge commit, it does following things:
1. refactor kubesphere dependency service client creation, we can disable dependency by config 2. dependencies can be configured by configuration file 3. refactor cmd package using cobra.Command, so we can use hypersphere to invoke command sepearately. Later we only need to build one image to contains all kubesphere core components. One command to rule them all! 4. live reloading configuration currently not implemented
This commit is contained in:
@@ -18,113 +18,110 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
goflag "flag"
|
||||
"fmt"
|
||||
"github.com/golang/glog"
|
||||
"github.com/json-iterator/go"
|
||||
kconfig "github.com/kiali/kiali/config"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/pflag"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
cliflag "k8s.io/component-base/cli/flag"
|
||||
"k8s.io/klog"
|
||||
"kubesphere.io/kubesphere/cmd/ks-apiserver/app/options"
|
||||
"kubesphere.io/kubesphere/pkg/apiserver/runtime"
|
||||
"kubesphere.io/kubesphere/pkg/apiserver/servicemesh/tracing"
|
||||
"kubesphere.io/kubesphere/pkg/filter"
|
||||
"kubesphere.io/kubesphere/pkg/informers"
|
||||
"kubesphere.io/kubesphere/pkg/models/devops"
|
||||
logging "kubesphere.io/kubesphere/pkg/models/log"
|
||||
"kubesphere.io/kubesphere/pkg/server"
|
||||
"kubesphere.io/kubesphere/pkg/signals"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/admin_jenkins"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/devops_mysql"
|
||||
"log"
|
||||
apiserverconfig "kubesphere.io/kubesphere/pkg/server/config"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client"
|
||||
"kubesphere.io/kubesphere/pkg/utils/signals"
|
||||
"kubesphere.io/kubesphere/pkg/utils/term"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
var jsonIter = jsoniter.ConfigCompatibleWithStandardLibrary
|
||||
|
||||
func NewAPIServerCommand() *cobra.Command {
|
||||
s := options.NewServerRunOptions()
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: "ks-apiserver",
|
||||
Long: `The KubeSphere API server validates and configures data
|
||||
for the api objects. The API Server services REST operations and provides the frontend to the
|
||||
Long: `The KubeSphere API server validates and configures data for the api objects.
|
||||
The API Server services REST operations and provides the frontend to the
|
||||
cluster's shared state through which all other components interact.`,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
return Run(s)
|
||||
err := apiserverconfig.Load()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = Complete(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if errs := s.Validate(); len(errs) != 0 {
|
||||
return utilerrors.NewAggregate(errs)
|
||||
}
|
||||
|
||||
return Run(s, signals.SetupSignalHandler())
|
||||
},
|
||||
}
|
||||
|
||||
s.AddFlags(cmd.Flags())
|
||||
cmd.Flags().AddGoFlagSet(goflag.CommandLine)
|
||||
glog.CopyStandardLogTo("INFO")
|
||||
fs := cmd.Flags()
|
||||
namedFlagSets := s.Flags()
|
||||
|
||||
for _, f := range namedFlagSets.FlagSets {
|
||||
fs.AddFlagSet(f)
|
||||
}
|
||||
|
||||
usageFmt := "Usage:\n %s\n"
|
||||
cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
|
||||
cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
|
||||
fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
|
||||
cliflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
|
||||
})
|
||||
return cmd
|
||||
}
|
||||
|
||||
func Run(s *options.ServerRunOptions) error {
|
||||
func Run(s *options.ServerRunOptions, stopCh <-chan struct{}) error {
|
||||
|
||||
pflag.VisitAll(func(flag *pflag.Flag) {
|
||||
log.Printf("FLAG: --%s=%q", flag.Name, flag.Value)
|
||||
})
|
||||
|
||||
var err error
|
||||
|
||||
waitForResourceSync()
|
||||
|
||||
container := runtime.Container
|
||||
container.DoNotRecover(false)
|
||||
container.Filter(filter.Logging)
|
||||
container.RecoverHandler(server.LogStackOnRecover)
|
||||
for _, webservice := range container.RegisteredWebServices() {
|
||||
for _, route := range webservice.Routes() {
|
||||
log.Println(route.Method, route.Path)
|
||||
}
|
||||
err := CreateClientSet(apiserverconfig.Get(), stopCh)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = WaitForResourceSync(stopCh)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
initializeAdminJenkins()
|
||||
initializeDevOpsDatabase()
|
||||
initializeESClientConfig()
|
||||
initializeServicemeshConfig(s)
|
||||
|
||||
if s.GenericServerRunOptions.InsecurePort != 0 {
|
||||
log.Printf("Server listening on %d.", s.GenericServerRunOptions.InsecurePort)
|
||||
err = http.ListenAndServe(fmt.Sprintf("%s:%d", s.GenericServerRunOptions.BindAddress, s.GenericServerRunOptions.InsecurePort), container)
|
||||
initializeESClientConfig()
|
||||
|
||||
err = CreateAPIServer(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if s.GenericServerRunOptions.SecurePort != 0 && len(s.GenericServerRunOptions.TlsCertFile) > 0 && len(s.GenericServerRunOptions.TlsPrivateKey) > 0 {
|
||||
log.Printf("Server listening on %d.", s.GenericServerRunOptions.SecurePort)
|
||||
err = http.ListenAndServeTLS(fmt.Sprintf("%s:%d", s.GenericServerRunOptions.BindAddress, s.GenericServerRunOptions.SecurePort), s.GenericServerRunOptions.TlsCertFile, s.GenericServerRunOptions.TlsPrivateKey, container)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func initializeAdminJenkins() {
|
||||
devops.JenkinsInit()
|
||||
admin_jenkins.Client()
|
||||
}
|
||||
|
||||
func initializeDevOpsDatabase() {
|
||||
devops_mysql.OpenDatabase()
|
||||
return nil
|
||||
}
|
||||
|
||||
func initializeServicemeshConfig(s *options.ServerRunOptions) {
|
||||
// Initialize kiali config
|
||||
config := kconfig.NewConfig()
|
||||
|
||||
tracing.JaegerQueryUrl = s.JaegerQueryServiceUrl
|
||||
tracing.JaegerQueryUrl = s.ServiceMeshOptions.JaegerQueryHost
|
||||
|
||||
// Exclude system namespaces
|
||||
config.API.Namespaces.Exclude = []string{"istio-system", "kubesphere*", "kube*"}
|
||||
config.InCluster = true
|
||||
|
||||
// Set default prometheus service url
|
||||
config.ExternalServices.PrometheusServiceURL = s.ServicemeshPrometheusServiceUrl
|
||||
config.ExternalServices.PrometheusServiceURL = s.ServiceMeshOptions.ServicemeshPrometheusHost
|
||||
config.ExternalServices.PrometheusCustomMetricsURL = config.ExternalServices.PrometheusServiceURL
|
||||
|
||||
// Set istio pilot discovery service url
|
||||
config.ExternalServices.Istio.UrlServiceVersion = s.IstioPilotServiceURL
|
||||
config.ExternalServices.Istio.UrlServiceVersion = s.ServiceMeshOptions.IstioPilotHost
|
||||
|
||||
kconfig.Set(config)
|
||||
}
|
||||
@@ -134,7 +131,7 @@ func initializeESClientConfig() {
|
||||
// List all outputs
|
||||
outputs, err := logging.GetFluentbitOutputFromConfigMap()
|
||||
if err != nil {
|
||||
glog.Errorln(err)
|
||||
klog.Errorln(err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -147,54 +144,164 @@ func initializeESClientConfig() {
|
||||
}
|
||||
}
|
||||
|
||||
func waitForResourceSync() {
|
||||
stopChan := signals.SetupSignalHandler()
|
||||
//
|
||||
func CreateAPIServer(s *options.ServerRunOptions) error {
|
||||
var err error
|
||||
|
||||
container := runtime.Container
|
||||
container.DoNotRecover(false)
|
||||
container.Filter(filter.Logging)
|
||||
container.RecoverHandler(server.LogStackOnRecover)
|
||||
|
||||
// install config api
|
||||
apiserverconfig.InstallAPI(container)
|
||||
|
||||
for _, webservice := range container.RegisteredWebServices() {
|
||||
for _, route := range webservice.Routes() {
|
||||
klog.V(0).Info(route.Method, route.Path)
|
||||
}
|
||||
}
|
||||
|
||||
if s.GenericServerRunOptions.InsecurePort != 0 {
|
||||
err = http.ListenAndServe(fmt.Sprintf("%s:%d", s.GenericServerRunOptions.BindAddress, s.GenericServerRunOptions.InsecurePort), container)
|
||||
if err != nil {
|
||||
klog.Infof("Server listening on %d.", s.GenericServerRunOptions.InsecurePort)
|
||||
}
|
||||
}
|
||||
|
||||
if s.GenericServerRunOptions.SecurePort != 0 && len(s.GenericServerRunOptions.TlsCertFile) > 0 && len(s.GenericServerRunOptions.TlsPrivateKey) > 0 {
|
||||
klog.Infof("Server listening on %d.", s.GenericServerRunOptions.SecurePort)
|
||||
err = http.ListenAndServeTLS(fmt.Sprintf("%s:%d", s.GenericServerRunOptions.BindAddress, s.GenericServerRunOptions.SecurePort), s.GenericServerRunOptions.TlsCertFile, s.GenericServerRunOptions.TlsPrivateKey, container)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func CreateClientSet(conf *apiserverconfig.Config, stopCh <-chan struct{}) error {
|
||||
csop := &client.ClientSetOptions{}
|
||||
|
||||
csop.SetDevopsOptions(conf.DevopsOptions).
|
||||
SetKubernetesOptions(conf.KubernetesOptions).
|
||||
SetMySQLOptions(conf.MySQLOptions)
|
||||
|
||||
client.NewClientSetFactory(csop, stopCh)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func WaitForResourceSync(stopCh <-chan struct{}) error {
|
||||
|
||||
//apis.AddToScheme(scheme.Scheme)
|
||||
|
||||
informerFactory := informers.SharedInformerFactory()
|
||||
informerFactory.Rbac().V1().Roles().Lister()
|
||||
informerFactory.Rbac().V1().RoleBindings().Lister()
|
||||
informerFactory.Rbac().V1().ClusterRoles().Lister()
|
||||
informerFactory.Rbac().V1().ClusterRoleBindings().Lister()
|
||||
|
||||
informerFactory.Storage().V1().StorageClasses().Lister()
|
||||
// resources we have to create informer first
|
||||
k8sGVRs := []schema.GroupVersionResource{
|
||||
{Group: "", Version: "v1", Resource: "namespaces"},
|
||||
{Group: "", Version: "v1", Resource: "nodes"},
|
||||
{Group: "", Version: "v1", Resource: "resourcequotas"},
|
||||
{Group: "", Version: "v1", Resource: "pods"},
|
||||
{Group: "", Version: "v1", Resource: "services"},
|
||||
{Group: "", Version: "v1", Resource: "persistentvolumeclaims"},
|
||||
{Group: "", Version: "v1", Resource: "secrets"},
|
||||
{Group: "", Version: "v1", Resource: "configmaps"},
|
||||
|
||||
informerFactory.Core().V1().Namespaces().Lister()
|
||||
informerFactory.Core().V1().Nodes().Lister()
|
||||
informerFactory.Core().V1().ResourceQuotas().Lister()
|
||||
informerFactory.Core().V1().Pods().Lister()
|
||||
informerFactory.Core().V1().Services().Lister()
|
||||
informerFactory.Core().V1().PersistentVolumeClaims().Lister()
|
||||
informerFactory.Core().V1().Secrets().Lister()
|
||||
informerFactory.Core().V1().ConfigMaps().Lister()
|
||||
{Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "roles"},
|
||||
{Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "rolebindings"},
|
||||
{Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "clusterroles"},
|
||||
{Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "clusterrolebindings"},
|
||||
|
||||
informerFactory.Apps().V1().ControllerRevisions().Lister()
|
||||
informerFactory.Apps().V1().StatefulSets().Lister()
|
||||
informerFactory.Apps().V1().Deployments().Lister()
|
||||
informerFactory.Apps().V1().DaemonSets().Lister()
|
||||
informerFactory.Apps().V1().ReplicaSets().Lister()
|
||||
{Group: "apps", Version: "v1", Resource: "deployments"},
|
||||
{Group: "apps", Version: "v1", Resource: "daemonsets"},
|
||||
{Group: "apps", Version: "v1", Resource: "replicasets"},
|
||||
{Group: "apps", Version: "v1", Resource: "statefulsets"},
|
||||
{Group: "apps", Version: "v1", Resource: "controllerrevisions"},
|
||||
|
||||
informerFactory.Batch().V1().Jobs().Lister()
|
||||
informerFactory.Batch().V1beta1().CronJobs().Lister()
|
||||
informerFactory.Extensions().V1beta1().Ingresses().Lister()
|
||||
informerFactory.Autoscaling().V2beta2().HorizontalPodAutoscalers().Lister()
|
||||
{Group: "batch", Version: "v1", Resource: "jobs"},
|
||||
{Group: "batch", Version: "v1beta1", Resource: "cronjobs"},
|
||||
|
||||
informerFactory.Start(stopChan)
|
||||
informerFactory.WaitForCacheSync(stopChan)
|
||||
{Group: "extensions", Version: "v1beta1", Resource: "ingresses"},
|
||||
|
||||
{Group: "autoscaling", Version: "v2beta2", Resource: "horizontalpodautoscalers"},
|
||||
}
|
||||
|
||||
for _, gvr := range k8sGVRs {
|
||||
_, err := informerFactory.ForResource(gvr)
|
||||
if err != nil {
|
||||
klog.Errorf("cannot create informer for %s", gvr)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
informerFactory.Start(stopCh)
|
||||
informerFactory.WaitForCacheSync(stopCh)
|
||||
|
||||
s2iInformerFactory := informers.S2iSharedInformerFactory()
|
||||
s2iInformerFactory.Devops().V1alpha1().S2iBuilderTemplates().Lister()
|
||||
s2iInformerFactory.Devops().V1alpha1().S2iRuns().Lister()
|
||||
s2iInformerFactory.Devops().V1alpha1().S2iBuilders().Lister()
|
||||
|
||||
s2iInformerFactory.Start(stopChan)
|
||||
s2iInformerFactory.WaitForCacheSync(stopChan)
|
||||
s2iGVRs := []schema.GroupVersionResource{
|
||||
{Group: "devops.kubesphere.io", Version: "v1alpha1", Resource: "s2ibuildertemplates"},
|
||||
{Group: "devops.kubesphere.io", Version: "v1alpha1", Resource: "s2iruns"},
|
||||
{Group: "devops.kubesphere.io", Version: "v1alpha1", Resource: "s2ibuilders"},
|
||||
}
|
||||
|
||||
for _, gvr := range s2iGVRs {
|
||||
_, err := s2iInformerFactory.ForResource(gvr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
s2iInformerFactory.Start(stopCh)
|
||||
s2iInformerFactory.WaitForCacheSync(stopCh)
|
||||
|
||||
ksInformerFactory := informers.KsSharedInformerFactory()
|
||||
ksInformerFactory.Tenant().V1alpha1().Workspaces().Lister()
|
||||
ksInformerFactory.Devops().V1alpha1().S2iBinaries().Lister()
|
||||
|
||||
ksInformerFactory.Start(stopChan)
|
||||
ksInformerFactory.WaitForCacheSync(stopChan)
|
||||
ksGVRs := []schema.GroupVersionResource{
|
||||
{Group: "tenant.kubesphere.io", Version: "v1alpha1", Resource: "workspaces"},
|
||||
{Group: "devops.kubesphere.io", Version: "v1alpha1", Resource: "s2ibinaries"},
|
||||
|
||||
{Group: "servicemesh.kubesphere.io", Version: "v1alpha2", Resource: "strategies"},
|
||||
{Group: "servicemesh.kubesphere.io", Version: "v1alpha2", Resource: "servicepolicies"},
|
||||
}
|
||||
|
||||
for _, gvr := range ksGVRs {
|
||||
_, err := ksInformerFactory.ForResource(gvr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
ksInformerFactory.Start(stopCh)
|
||||
ksInformerFactory.WaitForCacheSync(stopCh)
|
||||
|
||||
return nil
|
||||
|
||||
log.Println("resources sync success")
|
||||
}
|
||||
|
||||
// apply server run options to configuration
|
||||
func Complete(s *options.ServerRunOptions) error {
|
||||
|
||||
// loading configuration file
|
||||
conf := apiserverconfig.Get()
|
||||
|
||||
conf.Apply(&apiserverconfig.Config{
|
||||
MySQLOptions: s.MySQLOptions,
|
||||
DevopsOptions: s.DevopsOptions,
|
||||
SonarQubeOptions: s.SonarQubeOptions,
|
||||
KubernetesOptions: s.KubernetesOptions,
|
||||
ServiceMeshOptions: s.ServiceMeshOptions,
|
||||
MonitoringOptions: s.MonitoringOptions,
|
||||
})
|
||||
|
||||
s = &options.ServerRunOptions{
|
||||
GenericServerRunOptions: s.GenericServerRunOptions,
|
||||
KubernetesOptions: conf.KubernetesOptions,
|
||||
DevopsOptions: conf.DevopsOptions,
|
||||
SonarQubeOptions: conf.SonarQubeOptions,
|
||||
ServiceMeshOptions: conf.ServiceMeshOptions,
|
||||
MySQLOptions: conf.MySQLOptions,
|
||||
MonitoringOptions: conf.MonitoringOptions,
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user