From 2a521bb15731c108594b80660dc15793b61a9cda Mon Sep 17 00:00:00 2001 From: x893675 Date: Mon, 7 Feb 2022 16:47:43 +0800 Subject: [PATCH] add feature for live-reload when configuration changed Signed-off-by: x893675 --- cmd/controller-manager/app/options/options.go | 15 ++++ cmd/controller-manager/app/server.go | 35 ++++++++- cmd/ks-apiserver/app/server.go | 44 +++++++++-- pkg/apiserver/apiserver.go | 17 +++-- pkg/apiserver/config/config.go | 74 +++++++++++++++---- pkg/apiserver/config/config_test.go | 2 +- pkg/kapis/metering/v1alpha1/handler.go | 4 +- pkg/kapis/metering/v1alpha1/register.go | 4 +- pkg/kapis/monitoring/v1alpha3/handler.go | 4 +- pkg/kapis/monitoring/v1alpha3/helper_test.go | 2 +- pkg/kapis/monitoring/v1alpha3/register.go | 4 +- pkg/kapis/openpitrix/v1/handler.go | 4 +- pkg/kapis/openpitrix/v1/register.go | 4 +- pkg/kapis/tenant/v1alpha2/handler.go | 4 +- pkg/kapis/tenant/v1alpha2/register.go | 4 +- pkg/models/openpitrix/interface.go | 6 +- pkg/models/tenant/tenant.go | 4 +- pkg/models/tenant/tenent_test.go | 2 +- pkg/utils/metrics/metrics.go | 10 ++- tools/cmd/doc-gen/main.go | 6 +- 20 files changed, 189 insertions(+), 60 deletions(-) diff --git a/cmd/controller-manager/app/options/options.go b/cmd/controller-manager/app/options/options.go index be25b3ab9..eacaa3506 100644 --- a/cmd/controller-manager/app/options/options.go +++ b/cmd/controller-manager/app/options/options.go @@ -22,6 +22,8 @@ import ( "strings" "time" + controllerconfig "kubesphere.io/kubesphere/pkg/apiserver/config" + "k8s.io/apimachinery/pkg/util/sets" "kubesphere.io/kubesphere/pkg/apiserver/authentication" @@ -217,3 +219,16 @@ func (s *KubeSphereControllerManagerOptions) bindLeaderElectionFlags(l *leaderel "The duration the clients should wait between attempting acquisition and renewal "+ "of a leadership. This is only applicable if leader election is enabled.") } + +func (s *KubeSphereControllerManagerOptions) MergeConfig(cfg *controllerconfig.Config) { + s.KubernetesOptions = cfg.KubernetesOptions + s.DevopsOptions = cfg.DevopsOptions + s.S3Options = cfg.S3Options + s.AuthenticationOptions = cfg.AuthenticationOptions + s.LdapOptions = cfg.LdapOptions + s.OpenPitrixOptions = cfg.OpenPitrixOptions + s.NetworkOptions = cfg.NetworkOptions + s.MultiClusterOptions = cfg.MultiClusterOptions + s.ServiceMeshOptions = cfg.ServiceMeshOptions + s.GatewayOptions = cfg.GatewayOptions +} diff --git a/cmd/controller-manager/app/server.go b/cmd/controller-manager/app/server.go index 28cb04722..530bf9851 100644 --- a/cmd/controller-manager/app/server.go +++ b/cmd/controller-manager/app/server.go @@ -48,7 +48,7 @@ import ( func NewControllerManagerCommand() *cobra.Command { s := options.NewKubeSphereControllerManagerOptions() - conf, err := controllerconfig.TryLoadFromDisk() + conf, configCh, err := controllerconfig.TryLoadFromDisk() if err == nil { // make sure LeaderElection is not nil s = &options.KubeSphereControllerManagerOptions{ @@ -79,7 +79,7 @@ func NewControllerManagerCommand() *cobra.Command { os.Exit(1) } - if err = run(s, signals.SetupSignalHandler()); err != nil { + if err = Run(s, configCh, signals.SetupSignalHandler()); err != nil { klog.Error(err) os.Exit(1) } @@ -114,6 +114,37 @@ func NewControllerManagerCommand() *cobra.Command { return cmd } +func Run(s *options.KubeSphereControllerManagerOptions, configCh <-chan controllerconfig.Config, ctx context.Context) error { + ictx := controllerconfig.Context{} + ictx.RenewContext(context.TODO()) + errCh := make(chan error) + defer close(errCh) + go func() { + if err := run(s, ictx.GetContext()); err != nil { + errCh <- err + } + }() + + for { + select { + case <-ctx.Done(): + ictx.CancelContext() + return nil + case cfg := <-configCh: + ictx.CancelContext() + s.MergeConfig(&cfg) + ictx.RenewContext(context.TODO()) + go func() { + if err := run(s, ictx.GetContext()); err != nil { + errCh <- err + } + }() + case err := <-errCh: + return err + } + } +} + func run(s *options.KubeSphereControllerManagerOptions, ctx context.Context) error { kubernetesClient, err := k8s.NewKubernetesClient(s.KubernetesOptions) diff --git a/cmd/ks-apiserver/app/server.go b/cmd/ks-apiserver/app/server.go index 864f88e2e..6449f3943 100644 --- a/cmd/ks-apiserver/app/server.go +++ b/cmd/ks-apiserver/app/server.go @@ -19,6 +19,7 @@ package app import ( "context" "fmt" + "net/http" "github.com/spf13/cobra" utilerrors "k8s.io/apimachinery/pkg/util/errors" @@ -37,7 +38,7 @@ func NewAPIServerCommand() *cobra.Command { s := options.NewServerRunOptions() // Load configuration from file - conf, err := apiserverconfig.TryLoadFromDisk() + conf, configCh, err := apiserverconfig.TryLoadFromDisk() if err == nil { s = &options.ServerRunOptions{ GenericServerRunOptions: s.GenericServerRunOptions, @@ -56,8 +57,7 @@ cluster's shared state through which all other components interact.`, if errs := s.Validate(); len(errs) != 0 { return utilerrors.NewAggregate(errs) } - - return Run(s, signals.SetupSignalHandler()) + return Run(s, configCh, signals.SetupSignalHandler()) }, SilenceUsage: true, } @@ -88,8 +88,38 @@ cluster's shared state through which all other components interact.`, return cmd } -func Run(s *options.ServerRunOptions, ctx context.Context) error { +func Run(s *options.ServerRunOptions, configCh <-chan apiserverconfig.Config, ctx context.Context) error { + ictx := apiserverconfig.Context{} + ictx.RenewContext(context.TODO()) + errCh := make(chan error) + defer close(errCh) + go func() { + if err := run(s, ictx.GetContext()); err != nil { + errCh <- err + } + }() + for { + select { + case <-ctx.Done(): + ictx.CancelContext() + return nil + case cfg := <-configCh: + ictx.CancelContext() + s.Config = &cfg + ictx.RenewContext(context.TODO()) + go func() { + if err := run(s, ictx.GetContext()); err != nil { + errCh <- err + } + }() + case err := <-errCh: + return err + } + } +} + +func run(s *options.ServerRunOptions, ctx context.Context) error { apiserver, err := s.NewAPIServer(ctx.Done()) if err != nil { return err @@ -100,5 +130,9 @@ func Run(s *options.ServerRunOptions, ctx context.Context) error { return err } - return apiserver.Run(ctx) + err = apiserver.Run(ctx) + if err == http.ErrServerClosed { + return nil + } + return err } diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index a75a662ae..3157eaff7 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -23,6 +23,7 @@ import ( "net/http" rt "runtime" "strconv" + "sync" "time" "github.com/emicklei/go-restful" @@ -107,6 +108,8 @@ import ( "kubesphere.io/kubesphere/pkg/utils/metrics" ) +var initMetrics sync.Once + type APIServer struct { // number of kubesphere apiserver ServerCount int @@ -165,7 +168,7 @@ func (s *APIServer) PrepareRun(stopCh <-chan struct{}) error { logStackOnRecover(panicReason, httpWriter) }) - s.installKubeSphereAPIs() + s.installKubeSphereAPIs(stopCh) s.installCRDAPIs() s.installMetricsAPI() s.container.Filter(monitorRequest) @@ -193,14 +196,14 @@ func monitorRequest(r *restful.Request, response *restful.Response, chain *restf } func (s *APIServer) installMetricsAPI() { - registerMetrics() + initMetrics.Do(registerMetrics) metrics.Defaults.Install(s.container) } // Install all kubesphere api groups // Installation happens before all informers start to cache objects, so // any attempt to list objects using listers will get empty results. -func (s *APIServer) installKubeSphereAPIs() { +func (s *APIServer) installKubeSphereAPIs(stopCh <-chan struct{}) { imOperator := im.NewOperator(s.KubernetesClient.KubeSphere(), user.New(s.InformerFactory.KubeSphereSharedInformerFactory(), s.InformerFactory.KubernetesSharedInformerFactory()), @@ -214,15 +217,15 @@ func (s *APIServer) installKubeSphereAPIs() { urlruntime.Must(configv1alpha2.AddToContainer(s.container, s.Config)) urlruntime.Must(resourcev1alpha3.AddToContainer(s.container, s.InformerFactory, s.RuntimeCache)) - urlruntime.Must(monitoringv1alpha3.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.MetricsClient, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions, s.RuntimeClient)) - urlruntime.Must(meteringv1alpha1.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.RuntimeCache, s.Config.MeteringOptions, nil, s.RuntimeClient)) - urlruntime.Must(openpitrixv1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions)) + urlruntime.Must(monitoringv1alpha3.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.MetricsClient, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions, s.RuntimeClient, stopCh)) + urlruntime.Must(meteringv1alpha1.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.RuntimeCache, s.Config.MeteringOptions, nil, s.RuntimeClient, stopCh)) + urlruntime.Must(openpitrixv1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions, stopCh)) urlruntime.Must(openpitrixv2alpha1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions)) urlruntime.Must(operationsv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes())) urlruntime.Must(resourcesv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.InformerFactory, s.KubernetesClient.Master())) urlruntime.Must(tenantv1alpha2.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.Kubernetes(), - s.KubernetesClient.KubeSphere(), s.EventsClient, s.LoggingClient, s.AuditingClient, amOperator, rbacAuthorizer, s.MonitoringClient, s.RuntimeCache, s.Config.MeteringOptions)) + s.KubernetesClient.KubeSphere(), s.EventsClient, s.LoggingClient, s.AuditingClient, amOperator, rbacAuthorizer, s.MonitoringClient, s.RuntimeCache, s.Config.MeteringOptions, stopCh)) urlruntime.Must(terminalv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), rbacAuthorizer, s.KubernetesClient.Config(), s.Config.TerminalOptions)) urlruntime.Must(clusterkapisv1alpha1.AddToContainer(s.container, s.KubernetesClient.KubeSphere(), diff --git a/pkg/apiserver/config/config.go b/pkg/apiserver/config/config.go index 785f8f43c..5cd61ef0d 100644 --- a/pkg/apiserver/config/config.go +++ b/pkg/apiserver/config/config.go @@ -17,9 +17,15 @@ limitations under the License. package config import ( + "context" "fmt" "reflect" "strings" + "sync" + + "k8s.io/klog" + + "github.com/fsnotify/fsnotify" "kubesphere.io/kubesphere/pkg/apiserver/authentication" "kubesphere.io/kubesphere/pkg/apiserver/authorization" @@ -79,6 +85,11 @@ import ( // mysql-host is missing in command line flags, all other mysql command line flags // will be ignored. +var ( + once sync.Once + cfgChangeCh = make(chan Config) +) + const ( // DefaultConfigurationName is the default name of configuration defaultConfigurationName = "kubesphere" @@ -147,33 +158,44 @@ func New() *Config { // TryLoadFromDisk loads configuration from default location after server startup // return nil error if configuration file not exists -func TryLoadFromDisk() (*Config, error) { - viper.SetConfigName(defaultConfigurationName) - viper.AddConfigPath(defaultConfigurationPath) +func TryLoadFromDisk() (*Config, <-chan Config, error) { + once.Do(func() { + viper.SetConfigName(defaultConfigurationName) + viper.AddConfigPath(defaultConfigurationPath) - // Load from current working directory, only used for debugging - viper.AddConfigPath(".") + // Load from current working directory, only used for debugging + viper.AddConfigPath(".") - // Load from Environment variables - viper.SetEnvPrefix("kubesphere") - viper.AutomaticEnv() - viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) + // Load from Environment variables + viper.SetEnvPrefix("kubesphere") + viper.AutomaticEnv() + viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) + viper.WatchConfig() + viper.OnConfigChange(func(in fsnotify.Event) { + cfg := New() + if err := viper.Unmarshal(cfg); err != nil { + klog.Warning("config reload error", err) + } else { + cfgChangeCh <- *cfg + } + }) + }) + + conf := New() if err := viper.ReadInConfig(); err != nil { if _, ok := err.(viper.ConfigFileNotFoundError); ok { - return nil, err + return nil, cfgChangeCh, err } else { - return nil, fmt.Errorf("error parsing configuration file %s", err) + return nil, cfgChangeCh, fmt.Errorf("error parsing configuration file %s", err) } } - conf := New() - if err := viper.Unmarshal(conf); err != nil { - return nil, err + return nil, cfgChangeCh, err } - return conf, nil + return conf, cfgChangeCh, nil } // convertToMap simply converts config to map[string]bool @@ -318,3 +340,25 @@ func (conf *Config) stripEmptyOptions() { conf.GPUOptions = nil } } + +type Context struct { + internalCtx context.Context + internalCtxCancel context.CancelFunc + mu sync.RWMutex +} + +func (c *Context) GetContext() context.Context { + c.mu.RLock() + defer c.mu.RUnlock() + return c.internalCtx +} + +func (c *Context) RenewContext(ctx context.Context) { + c.mu.Lock() + defer c.mu.Unlock() + c.internalCtx, c.internalCtxCancel = context.WithCancel(ctx) +} + +func (c *Context) CancelContext() { + c.internalCtxCancel() +} diff --git a/pkg/apiserver/config/config_test.go b/pkg/apiserver/config/config_test.go index 24d61039c..67206d5f0 100644 --- a/pkg/apiserver/config/config_test.go +++ b/pkg/apiserver/config/config_test.go @@ -240,7 +240,7 @@ func TestGet(t *testing.T) { conf.RedisOptions.Password = "P@88w0rd" os.Setenv("KUBESPHERE_REDIS_PASSWORD", "P@88w0rd") - conf2, err := TryLoadFromDisk() + conf2, _, err := TryLoadFromDisk() if err != nil { t.Fatal(err) } diff --git a/pkg/kapis/metering/v1alpha1/handler.go b/pkg/kapis/metering/v1alpha1/handler.go index bc19ef670..4a5afb846 100644 --- a/pkg/kapis/metering/v1alpha1/handler.go +++ b/pkg/kapis/metering/v1alpha1/handler.go @@ -47,6 +47,6 @@ type meterHandler interface { HandlePVCMeterQuery(req *restful.Request, resp *restful.Response) } -func newHandler(k kubernetes.Interface, m monitoring.Interface, f informers.InformerFactory, ksClient versioned.Interface, resourceGetter *resourcev1alpha3.ResourceGetter, meteringOptions *meteringclient.Options, opOptions *openpitrixoptions.Options, rtClient runtimeclient.Client) meterHandler { - return monitorhle.NewHandler(k, m, nil, f, ksClient, resourceGetter, meteringOptions, opOptions, rtClient) +func newHandler(k kubernetes.Interface, m monitoring.Interface, f informers.InformerFactory, ksClient versioned.Interface, resourceGetter *resourcev1alpha3.ResourceGetter, meteringOptions *meteringclient.Options, opOptions *openpitrixoptions.Options, rtClient runtimeclient.Client, stopCh <-chan struct{}) meterHandler { + return monitorhle.NewHandler(k, m, nil, f, ksClient, resourceGetter, meteringOptions, opOptions, rtClient, stopCh) } diff --git a/pkg/kapis/metering/v1alpha1/register.go b/pkg/kapis/metering/v1alpha1/register.go index 145bf7b67..a589474c8 100644 --- a/pkg/kapis/metering/v1alpha1/register.go +++ b/pkg/kapis/metering/v1alpha1/register.go @@ -49,10 +49,10 @@ const ( var GroupVersion = schema.GroupVersion{Group: groupName, Version: "v1alpha1"} -func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteringClient monitoring.Interface, factory informers.InformerFactory, ksClient versioned.Interface, cache cache.Cache, meteringOptions *meteringclient.Options, opOptions *openpitrixoptions.Options, rtClient runtimeclient.Client) error { +func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteringClient monitoring.Interface, factory informers.InformerFactory, ksClient versioned.Interface, cache cache.Cache, meteringOptions *meteringclient.Options, opOptions *openpitrixoptions.Options, rtClient runtimeclient.Client, stopCh <-chan struct{}) error { ws := runtime.NewWebService(GroupVersion) - h := newHandler(k8sClient, meteringClient, factory, ksClient, resourcev1alpha3.NewResourceGetter(factory, cache), meteringOptions, opOptions, rtClient) + h := newHandler(k8sClient, meteringClient, factory, ksClient, resourcev1alpha3.NewResourceGetter(factory, cache), meteringOptions, opOptions, rtClient, stopCh) ws.Route(ws.GET("/cluster"). To(h.HandleClusterMeterQuery). diff --git a/pkg/kapis/monitoring/v1alpha3/handler.go b/pkg/kapis/monitoring/v1alpha3/handler.go index 5e468be6a..662dc0d22 100644 --- a/pkg/kapis/monitoring/v1alpha3/handler.go +++ b/pkg/kapis/monitoring/v1alpha3/handler.go @@ -60,7 +60,7 @@ type handler struct { rtClient runtimeclient.Client } -func NewHandler(k kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, f informers.InformerFactory, ksClient versioned.Interface, resourceGetter *resourcev1alpha3.ResourceGetter, meteringOptions *meteringclient.Options, opOptions *openpitrixoptions.Options, rtClient runtimeclient.Client) *handler { +func NewHandler(k kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, f informers.InformerFactory, ksClient versioned.Interface, resourceGetter *resourcev1alpha3.ResourceGetter, meteringOptions *meteringclient.Options, opOptions *openpitrixoptions.Options, rtClient runtimeclient.Client, stopCh <-chan struct{}) *handler { var opRelease openpitrix.Interface var s3Client s3.Interface if opOptions != nil && opOptions.S3Options != nil && len(opOptions.S3Options.Endpoint) != 0 { @@ -71,7 +71,7 @@ func NewHandler(k kubernetes.Interface, monitoringClient monitoring.Interface, m } } if ksClient != nil { - opRelease = openpitrix.NewOpenpitrixOperator(f, ksClient, s3Client) + opRelease = openpitrix.NewOpenpitrixOperator(f, ksClient, s3Client, stopCh) } if meteringOptions == nil || meteringOptions.RetentionDay == "" { meteringOptions = &meteringclient.DefaultMeteringOption diff --git a/pkg/kapis/monitoring/v1alpha3/helper_test.go b/pkg/kapis/monitoring/v1alpha3/helper_test.go index 8ed5dd8d6..7cadc261d 100644 --- a/pkg/kapis/monitoring/v1alpha3/helper_test.go +++ b/pkg/kapis/monitoring/v1alpha3/helper_test.go @@ -373,7 +373,7 @@ func TestParseRequestParams(t *testing.T) { fakeInformerFactory.KubeSphereSharedInformerFactory() - handler := NewHandler(client, nil, nil, fakeInformerFactory, ksClient, nil, nil, nil, nil) + handler := NewHandler(client, nil, nil, fakeInformerFactory, ksClient, nil, nil, nil, nil, nil) result, err := handler.makeQueryOptions(tt.params, tt.lvl) if err != nil { diff --git a/pkg/kapis/monitoring/v1alpha3/register.go b/pkg/kapis/monitoring/v1alpha3/register.go index 9394203d8..97fae3715 100644 --- a/pkg/kapis/monitoring/v1alpha3/register.go +++ b/pkg/kapis/monitoring/v1alpha3/register.go @@ -47,10 +47,10 @@ const ( var GroupVersion = schema.GroupVersion{Group: groupName, Version: "v1alpha3"} -func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, factory informers.InformerFactory, ksClient versioned.Interface, opOptions *openpitrixoptions.Options, rtClient runtimeclient.Client) error { +func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, factory informers.InformerFactory, ksClient versioned.Interface, opOptions *openpitrixoptions.Options, rtClient runtimeclient.Client, stopCh <-chan struct{}) error { ws := runtime.NewWebService(GroupVersion) - h := NewHandler(k8sClient, monitoringClient, metricsClient, factory, ksClient, nil, nil, opOptions, rtClient) + h := NewHandler(k8sClient, monitoringClient, metricsClient, factory, ksClient, nil, nil, opOptions, rtClient, stopCh) ws.Route(ws.GET("/kubesphere"). To(h.handleKubeSphereMetricsQuery). diff --git a/pkg/kapis/openpitrix/v1/handler.go b/pkg/kapis/openpitrix/v1/handler.go index 5a9ef099b..d84c6c67f 100644 --- a/pkg/kapis/openpitrix/v1/handler.go +++ b/pkg/kapis/openpitrix/v1/handler.go @@ -50,7 +50,7 @@ type openpitrixHandler struct { openpitrix openpitrix.Interface } -func newOpenpitrixHandler(ksInformers informers.InformerFactory, ksClient versioned.Interface, option *openpitrixoptions.Options) *openpitrixHandler { +func newOpenpitrixHandler(ksInformers informers.InformerFactory, ksClient versioned.Interface, option *openpitrixoptions.Options, stopCh <-chan struct{}) *openpitrixHandler { var s3Client s3.Interface if option != nil && option.S3Options != nil && len(option.S3Options.Endpoint) != 0 { var err error @@ -61,7 +61,7 @@ func newOpenpitrixHandler(ksInformers informers.InformerFactory, ksClient versio } return &openpitrixHandler{ - openpitrix.NewOpenpitrixOperator(ksInformers, ksClient, s3Client), + openpitrix.NewOpenpitrixOperator(ksInformers, ksClient, s3Client, stopCh), } } diff --git a/pkg/kapis/openpitrix/v1/register.go b/pkg/kapis/openpitrix/v1/register.go index 067862267..9515a19c5 100644 --- a/pkg/kapis/openpitrix/v1/register.go +++ b/pkg/kapis/openpitrix/v1/register.go @@ -38,11 +38,11 @@ const ( var GroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1"} -func AddToContainer(c *restful.Container, ksInfomrers informers.InformerFactory, ksClient versioned.Interface, options *openpitrixoptions.Options) error { +func AddToContainer(c *restful.Container, ksInfomrers informers.InformerFactory, ksClient versioned.Interface, options *openpitrixoptions.Options, stopCh <-chan struct{}) error { mimePatch := []string{restful.MIME_JSON, runtime.MimeJsonPatchJson, runtime.MimeMergePatchJson} webservice := runtime.NewWebService(GroupVersion) - handler := newOpenpitrixHandler(ksInfomrers, ksClient, options) + handler := newOpenpitrixHandler(ksInfomrers, ksClient, options, stopCh) webservice.Route(webservice.POST("/repos"). To(handler.CreateRepo). diff --git a/pkg/kapis/tenant/v1alpha2/handler.go b/pkg/kapis/tenant/v1alpha2/handler.go index 1b4c5fbc9..06ba37206 100644 --- a/pkg/kapis/tenant/v1alpha2/handler.go +++ b/pkg/kapis/tenant/v1alpha2/handler.go @@ -60,14 +60,14 @@ func newTenantHandler(factory informers.InformerFactory, k8sclient kubernetes.In evtsClient events.Client, loggingClient logging.Client, auditingclient auditing.Client, am am.AccessManagementInterface, authorizer authorizer.Authorizer, monitoringclient monitoringclient.Interface, resourceGetter *resourcev1alpha3.ResourceGetter, - meteringOptions *meteringclient.Options) *tenantHandler { + meteringOptions *meteringclient.Options, stopCh <-chan struct{}) *tenantHandler { if meteringOptions == nil || meteringOptions.RetentionDay == "" { meteringOptions = &meteringclient.DefaultMeteringOption } return &tenantHandler{ - tenant: tenant.New(factory, k8sclient, ksclient, evtsClient, loggingClient, auditingclient, am, authorizer, monitoringclient, resourceGetter), + tenant: tenant.New(factory, k8sclient, ksclient, evtsClient, loggingClient, auditingclient, am, authorizer, monitoringclient, resourceGetter, stopCh), meteringOptions: meteringOptions, } } diff --git a/pkg/kapis/tenant/v1alpha2/register.go b/pkg/kapis/tenant/v1alpha2/register.go index 14415c44a..ae1228cd5 100644 --- a/pkg/kapis/tenant/v1alpha2/register.go +++ b/pkg/kapis/tenant/v1alpha2/register.go @@ -66,11 +66,11 @@ func Resource(resource string) schema.GroupResource { func AddToContainer(c *restful.Container, factory informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient events.Client, loggingClient logging.Client, auditingclient auditing.Client, am am.AccessManagementInterface, authorizer authorizer.Authorizer, - monitoringclient monitoringclient.Interface, cache cache.Cache, meteringOptions *meteringclient.Options) error { + monitoringclient monitoringclient.Interface, cache cache.Cache, meteringOptions *meteringclient.Options, stopCh <-chan struct{}) error { mimePatch := []string{restful.MIME_JSON, runtime.MimeMergePatchJson, runtime.MimeJsonPatchJson} ws := runtime.NewWebService(GroupVersion) - handler := newTenantHandler(factory, k8sclient, ksclient, evtsClient, loggingClient, auditingclient, am, authorizer, monitoringclient, resourcev1alpha3.NewResourceGetter(factory, cache), meteringOptions) + handler := newTenantHandler(factory, k8sclient, ksclient, evtsClient, loggingClient, auditingclient, am, authorizer, monitoringclient, resourcev1alpha3.NewResourceGetter(factory, cache), meteringOptions, stopCh) ws.Route(ws.GET("/clusters"). To(handler.ListClusters). diff --git a/pkg/models/openpitrix/interface.go b/pkg/models/openpitrix/interface.go index ba2229d79..516b82b0d 100644 --- a/pkg/models/openpitrix/interface.go +++ b/pkg/models/openpitrix/interface.go @@ -19,7 +19,6 @@ package openpitrix import ( "sync" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/klog" @@ -55,7 +54,7 @@ func init() { cachedReposData = reposcache.NewReposCache() } -func NewOpenpitrixOperator(ksInformers ks_informers.InformerFactory, ksClient versioned.Interface, s3Client s3.Interface) Interface { +func NewOpenpitrixOperator(ksInformers ks_informers.InformerFactory, ksClient versioned.Interface, s3Client s3.Interface, stopCh <-chan struct{}) Interface { once.Do(func() { klog.Infof("start helm repo informer") helmReposInformer = ksInformers.KubeSphereSharedInformerFactory().Application().V1alpha1().HelmRepos().Informer() @@ -85,9 +84,6 @@ func NewOpenpitrixOperator(ksInformers ks_informers.InformerFactory, ksClient ve indexer := ctgInformer.GetIndexer() cachedReposData.SetCategoryIndexer(indexer) - - go ctgInformer.Run(wait.NeverStop) - go helmReposInformer.Run(wait.NeverStop) }) return &openpitrixOperator{ diff --git a/pkg/models/tenant/tenant.go b/pkg/models/tenant/tenant.go index 7c9bbaea1..fc4906ea4 100644 --- a/pkg/models/tenant/tenant.go +++ b/pkg/models/tenant/tenant.go @@ -113,10 +113,10 @@ type tenantOperator struct { opRelease openpitrix.ReleaseInterface } -func New(informers informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient eventsclient.Client, loggingClient loggingclient.Client, auditingclient auditingclient.Client, am am.AccessManagementInterface, authorizer authorizer.Authorizer, monitoringclient monitoringclient.Interface, resourceGetter *resourcev1alpha3.ResourceGetter) Interface { +func New(informers informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient eventsclient.Client, loggingClient loggingclient.Client, auditingclient auditingclient.Client, am am.AccessManagementInterface, authorizer authorizer.Authorizer, monitoringclient monitoringclient.Interface, resourceGetter *resourcev1alpha3.ResourceGetter, stopCh <-chan struct{}) Interface { var openpitrixRelease openpitrix.ReleaseInterface if ksclient != nil { - openpitrixRelease = openpitrix.NewOpenpitrixOperator(informers, ksclient, nil) + openpitrixRelease = openpitrix.NewOpenpitrixOperator(informers, ksclient, nil, stopCh) } return &tenantOperator{ diff --git a/pkg/models/tenant/tenent_test.go b/pkg/models/tenant/tenent_test.go index cfb5163fc..92f26a018 100644 --- a/pkg/models/tenant/tenent_test.go +++ b/pkg/models/tenant/tenent_test.go @@ -544,5 +544,5 @@ func prepare() Interface { amOperator := am.NewOperator(ksClient, k8sClient, fakeInformerFactory, nil) authorizer := rbac.NewRBACAuthorizer(amOperator) - return New(fakeInformerFactory, k8sClient, ksClient, nil, nil, nil, amOperator, authorizer, nil, nil) + return New(fakeInformerFactory, k8sClient, ksClient, nil, nil, nil, amOperator, authorizer, nil, nil, nil) } diff --git a/pkg/utils/metrics/metrics.go b/pkg/utils/metrics/metrics.go index 016398e44..a2f9c669e 100644 --- a/pkg/utils/metrics/metrics.go +++ b/pkg/utils/metrics/metrics.go @@ -19,6 +19,7 @@ package metrics import ( "net/http" + "sync" "github.com/emicklei/go-restful" "github.com/prometheus/client_golang/prometheus" @@ -30,6 +31,8 @@ import ( ) var ( + registerOnce sync.Once + Defaults DefaultMetrics defaultRegistry compbasemetrics.KubeRegistry // MustRegister registers registerable metrics but uses the defaultRegistry, panic upon the first registration that causes an error @@ -54,10 +57,13 @@ type DefaultMetrics struct{} // Install adds the DefaultMetrics handler func (m DefaultMetrics) Install(c *restful.Container) { + registerOnce.Do(m.registerMetrics) + c.Handle("/kapis/metrics", Handler()) +} + +func (m DefaultMetrics) registerMetrics() { RawMustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) RawMustRegister(prometheus.NewGoCollector()) - - c.Handle("/kapis/metrics", Handler()) } //Overwrite version.Get diff --git a/tools/cmd/doc-gen/main.go b/tools/cmd/doc-gen/main.go index 3d1744bf8..55b0e3d43 100644 --- a/tools/cmd/doc-gen/main.go +++ b/tools/cmd/doc-gen/main.go @@ -123,13 +123,13 @@ func generateSwaggerJson() []byte { urlruntime.Must(devopsv1alpha2.AddToContainer(container, "")) urlruntime.Must(devopsv1alpha3.AddToContainer(container, "")) urlruntime.Must(iamv1alpha2.AddToContainer(container, nil, nil, group.New(informerFactory, clientsets.KubeSphere(), clientsets.Kubernetes()), nil)) - urlruntime.Must(monitoringv1alpha3.AddToContainer(container, clientsets.Kubernetes(), nil, nil, informerFactory, nil, nil, nil)) - urlruntime.Must(openpitrixv1.AddToContainer(container, informerFactory, fake.NewSimpleClientset(), nil)) + urlruntime.Must(monitoringv1alpha3.AddToContainer(container, clientsets.Kubernetes(), nil, nil, informerFactory, nil, nil, nil, nil)) + urlruntime.Must(openpitrixv1.AddToContainer(container, informerFactory, fake.NewSimpleClientset(), nil, nil)) urlruntime.Must(openpitrixv2.AddToContainer(container, informerFactory, fake.NewSimpleClientset(), nil)) urlruntime.Must(operationsv1alpha2.AddToContainer(container, clientsets.Kubernetes())) urlruntime.Must(resourcesv1alpha2.AddToContainer(container, clientsets.Kubernetes(), informerFactory, "")) urlruntime.Must(resourcesv1alpha3.AddToContainer(container, informerFactory, nil)) - urlruntime.Must(tenantv1alpha2.AddToContainer(container, informerFactory, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)) + urlruntime.Must(tenantv1alpha2.AddToContainer(container, informerFactory, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)) urlruntime.Must(terminalv1alpha2.AddToContainer(container, clientsets.Kubernetes(), nil, nil, nil)) urlruntime.Must(metricsv1alpha2.AddToContainer(nil, container, clientsets.Kubernetes(), nil)) urlruntime.Must(networkv1alpha2.AddToContainer(container, ""))