From f5bcbda0c9161490f698bafbd5ca5e2fb920fb63 Mon Sep 17 00:00:00 2001 From: x893675 Date: Tue, 22 Feb 2022 10:20:11 +0800 Subject: [PATCH] optimize config package Signed-off-by: x893675 --- cmd/controller-manager/app/options/options.go | 2 + cmd/controller-manager/app/server.go | 22 +-- cmd/ks-apiserver/app/server.go | 21 +-- pkg/apiserver/config/config.go | 125 +++++++++--------- pkg/apiserver/config/config_test.go | 2 +- 5 files changed, 90 insertions(+), 82 deletions(-) diff --git a/cmd/controller-manager/app/options/options.go b/cmd/controller-manager/app/options/options.go index eacaa3506..a22b151ac 100644 --- a/cmd/controller-manager/app/options/options.go +++ b/cmd/controller-manager/app/options/options.go @@ -220,6 +220,8 @@ func (s *KubeSphereControllerManagerOptions) bindLeaderElectionFlags(l *leaderel "of a leadership. This is only applicable if leader election is enabled.") } +// MergeConfig merge new config without validation +// When misconfigured, the app should just crash directly func (s *KubeSphereControllerManagerOptions) MergeConfig(cfg *controllerconfig.Config) { s.KubernetesOptions = cfg.KubernetesOptions s.DevopsOptions = cfg.DevopsOptions diff --git a/cmd/controller-manager/app/server.go b/cmd/controller-manager/app/server.go index 530bf9851..717bd3365 100644 --- a/cmd/controller-manager/app/server.go +++ b/cmd/controller-manager/app/server.go @@ -48,7 +48,7 @@ import ( func NewControllerManagerCommand() *cobra.Command { s := options.NewKubeSphereControllerManagerOptions() - conf, configCh, err := controllerconfig.TryLoadFromDisk() + conf, err := controllerconfig.TryLoadFromDisk() if err == nil { // make sure LeaderElection is not nil s = &options.KubeSphereControllerManagerOptions{ @@ -78,8 +78,7 @@ func NewControllerManagerCommand() *cobra.Command { klog.Error(utilerrors.NewAggregate(errs)) os.Exit(1) } - - if err = Run(s, configCh, signals.SetupSignalHandler()); err != nil { + if err = Run(s, controllerconfig.WatchConfigChange(), signals.SetupSignalHandler()); err != nil { klog.Error(err) os.Exit(1) } @@ -115,31 +114,34 @@ func NewControllerManagerCommand() *cobra.Command { } func Run(s *options.KubeSphereControllerManagerOptions, configCh <-chan controllerconfig.Config, ctx context.Context) error { - ictx := controllerconfig.Context{} - ictx.RenewContext(context.TODO()) + ictx, cancelFunc := context.WithCancel(context.TODO()) errCh := make(chan error) defer close(errCh) go func() { - if err := run(s, ictx.GetContext()); err != nil { + if err := run(s, ictx); err != nil { errCh <- err } }() + // The ctx (signals.SetupSignalHandler()) is to control the entire program life cycle, + // The ictx(internal context) is created here to control the life cycle of the controller-manager(all controllers, sharedInformer, webhook etc.) + // when config changed, stop server and renew context, start new server for { select { case <-ctx.Done(): - ictx.CancelContext() + cancelFunc() return nil case cfg := <-configCh: - ictx.CancelContext() + cancelFunc() s.MergeConfig(&cfg) - ictx.RenewContext(context.TODO()) + ictx, cancelFunc = context.WithCancel(context.TODO()) go func() { - if err := run(s, ictx.GetContext()); err != nil { + if err := run(s, ictx); err != nil { errCh <- err } }() case err := <-errCh: + cancelFunc() return err } } diff --git a/cmd/ks-apiserver/app/server.go b/cmd/ks-apiserver/app/server.go index 6449f3943..05d6b30da 100644 --- a/cmd/ks-apiserver/app/server.go +++ b/cmd/ks-apiserver/app/server.go @@ -38,7 +38,7 @@ func NewAPIServerCommand() *cobra.Command { s := options.NewServerRunOptions() // Load configuration from file - conf, configCh, err := apiserverconfig.TryLoadFromDisk() + conf, err := apiserverconfig.TryLoadFromDisk() if err == nil { s = &options.ServerRunOptions{ GenericServerRunOptions: s.GenericServerRunOptions, @@ -57,7 +57,7 @@ cluster's shared state through which all other components interact.`, if errs := s.Validate(); len(errs) != 0 { return utilerrors.NewAggregate(errs) } - return Run(s, configCh, signals.SetupSignalHandler()) + return Run(s, apiserverconfig.WatchConfigChange(), signals.SetupSignalHandler()) }, SilenceUsage: true, } @@ -89,31 +89,34 @@ cluster's shared state through which all other components interact.`, } func Run(s *options.ServerRunOptions, configCh <-chan apiserverconfig.Config, ctx context.Context) error { - ictx := apiserverconfig.Context{} - ictx.RenewContext(context.TODO()) + ictx, cancelFunc := context.WithCancel(context.TODO()) errCh := make(chan error) defer close(errCh) go func() { - if err := run(s, ictx.GetContext()); err != nil { + if err := run(s, ictx); err != nil { errCh <- err } }() + // The ctx (signals.SetupSignalHandler()) is to control the entire program life cycle, + // The ictx(internal context) is created here to control the life cycle of the ks-apiserver(http server, sharedInformer etc.) + // when config change, stop server and renew context, start new server for { select { case <-ctx.Done(): - ictx.CancelContext() + cancelFunc() return nil case cfg := <-configCh: - ictx.CancelContext() + cancelFunc() s.Config = &cfg - ictx.RenewContext(context.TODO()) + ictx, cancelFunc = context.WithCancel(context.TODO()) go func() { - if err := run(s, ictx.GetContext()); err != nil { + if err := run(s, ictx); err != nil { errCh <- err } }() case err := <-errCh: + cancelFunc() return err } } diff --git a/pkg/apiserver/config/config.go b/pkg/apiserver/config/config.go index 5cd61ef0d..780f153c9 100644 --- a/pkg/apiserver/config/config.go +++ b/pkg/apiserver/config/config.go @@ -17,7 +17,6 @@ limitations under the License. package config import ( - "context" "fmt" "reflect" "strings" @@ -86,8 +85,8 @@ import ( // will be ignored. var ( - once sync.Once - cfgChangeCh = make(chan Config) + // singleton instance of config package + _config = defaultConfig() ) const ( @@ -98,6 +97,61 @@ const ( defaultConfigurationPath = "/etc/kubesphere" ) +type config struct { + cfg *Config + cfgChangeCh chan Config + watchOnce sync.Once + loadOnce sync.Once +} + +func (c *config) watchConfig() <-chan Config { + c.watchOnce.Do(func() { + viper.WatchConfig() + viper.OnConfigChange(func(in fsnotify.Event) { + cfg := New() + if err := viper.Unmarshal(cfg); err != nil { + klog.Warning("config reload error", err) + } else { + c.cfgChangeCh <- *cfg + } + }) + }) + return c.cfgChangeCh +} + +func (c *config) loadFromDisk() (*Config, error) { + var err error + c.loadOnce.Do(func() { + if err = viper.ReadInConfig(); err != nil { + if _, ok := err.(viper.ConfigFileNotFoundError); !ok { + err = fmt.Errorf("error parsing configuration file %s", err) + } + } + err = viper.Unmarshal(c.cfg) + }) + return c.cfg, err +} + +func defaultConfig() *config { + viper.SetConfigName(defaultConfigurationName) + viper.AddConfigPath(defaultConfigurationPath) + + // Load from current working directory, only used for debugging + viper.AddConfigPath(".") + + // Load from Environment variables + viper.SetEnvPrefix("kubesphere") + viper.AutomaticEnv() + viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) + + return &config{ + cfg: New(), + cfgChangeCh: make(chan Config), + watchOnce: sync.Once{}, + loadOnce: sync.Once{}, + } +} + // Config defines everything needed for apiserver to deal with external services type Config struct { DevopsOptions *jenkins.Options `json:"devops,omitempty" yaml:"devops,omitempty" mapstructure:"devops"` @@ -158,44 +212,13 @@ func New() *Config { // TryLoadFromDisk loads configuration from default location after server startup // return nil error if configuration file not exists -func TryLoadFromDisk() (*Config, <-chan Config, error) { - once.Do(func() { - viper.SetConfigName(defaultConfigurationName) - viper.AddConfigPath(defaultConfigurationPath) +func TryLoadFromDisk() (*Config, error) { + return _config.loadFromDisk() +} - // Load from current working directory, only used for debugging - viper.AddConfigPath(".") - - // Load from Environment variables - viper.SetEnvPrefix("kubesphere") - viper.AutomaticEnv() - viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) - - viper.WatchConfig() - viper.OnConfigChange(func(in fsnotify.Event) { - cfg := New() - if err := viper.Unmarshal(cfg); err != nil { - klog.Warning("config reload error", err) - } else { - cfgChangeCh <- *cfg - } - }) - }) - - conf := New() - if err := viper.ReadInConfig(); err != nil { - if _, ok := err.(viper.ConfigFileNotFoundError); ok { - return nil, cfgChangeCh, err - } else { - return nil, cfgChangeCh, fmt.Errorf("error parsing configuration file %s", err) - } - } - - if err := viper.Unmarshal(conf); err != nil { - return nil, cfgChangeCh, err - } - - return conf, cfgChangeCh, nil +// WatchConfigChange return config change channel +func WatchConfigChange() <-chan Config { + return _config.watchConfig() } // convertToMap simply converts config to map[string]bool @@ -340,25 +363,3 @@ func (conf *Config) stripEmptyOptions() { conf.GPUOptions = nil } } - -type Context struct { - internalCtx context.Context - internalCtxCancel context.CancelFunc - mu sync.RWMutex -} - -func (c *Context) GetContext() context.Context { - c.mu.RLock() - defer c.mu.RUnlock() - return c.internalCtx -} - -func (c *Context) RenewContext(ctx context.Context) { - c.mu.Lock() - defer c.mu.Unlock() - c.internalCtx, c.internalCtxCancel = context.WithCancel(ctx) -} - -func (c *Context) CancelContext() { - c.internalCtxCancel() -} diff --git a/pkg/apiserver/config/config_test.go b/pkg/apiserver/config/config_test.go index 67206d5f0..24d61039c 100644 --- a/pkg/apiserver/config/config_test.go +++ b/pkg/apiserver/config/config_test.go @@ -240,7 +240,7 @@ func TestGet(t *testing.T) { conf.RedisOptions.Password = "P@88w0rd" os.Setenv("KUBESPHERE_REDIS_PASSWORD", "P@88w0rd") - conf2, _, err := TryLoadFromDisk() + conf2, err := TryLoadFromDisk() if err != nil { t.Fatal(err) }