@@ -220,6 +220,8 @@ func (s *KubeSphereControllerManagerOptions) bindLeaderElectionFlags(l *leaderel
|
|||||||
"of a leadership. This is only applicable if leader election is enabled.")
|
"of a leadership. This is only applicable if leader election is enabled.")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MergeConfig merge new config without validation
|
||||||
|
// When misconfigured, the app should just crash directly
|
||||||
func (s *KubeSphereControllerManagerOptions) MergeConfig(cfg *controllerconfig.Config) {
|
func (s *KubeSphereControllerManagerOptions) MergeConfig(cfg *controllerconfig.Config) {
|
||||||
s.KubernetesOptions = cfg.KubernetesOptions
|
s.KubernetesOptions = cfg.KubernetesOptions
|
||||||
s.DevopsOptions = cfg.DevopsOptions
|
s.DevopsOptions = cfg.DevopsOptions
|
||||||
|
|||||||
@@ -48,7 +48,7 @@ import (
|
|||||||
|
|
||||||
func NewControllerManagerCommand() *cobra.Command {
|
func NewControllerManagerCommand() *cobra.Command {
|
||||||
s := options.NewKubeSphereControllerManagerOptions()
|
s := options.NewKubeSphereControllerManagerOptions()
|
||||||
conf, configCh, err := controllerconfig.TryLoadFromDisk()
|
conf, err := controllerconfig.TryLoadFromDisk()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// make sure LeaderElection is not nil
|
// make sure LeaderElection is not nil
|
||||||
s = &options.KubeSphereControllerManagerOptions{
|
s = &options.KubeSphereControllerManagerOptions{
|
||||||
@@ -78,8 +78,7 @@ func NewControllerManagerCommand() *cobra.Command {
|
|||||||
klog.Error(utilerrors.NewAggregate(errs))
|
klog.Error(utilerrors.NewAggregate(errs))
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
if err = Run(s, controllerconfig.WatchConfigChange(), signals.SetupSignalHandler()); err != nil {
|
||||||
if err = Run(s, configCh, signals.SetupSignalHandler()); err != nil {
|
|
||||||
klog.Error(err)
|
klog.Error(err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
@@ -115,31 +114,34 @@ func NewControllerManagerCommand() *cobra.Command {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func Run(s *options.KubeSphereControllerManagerOptions, configCh <-chan controllerconfig.Config, ctx context.Context) error {
|
func Run(s *options.KubeSphereControllerManagerOptions, configCh <-chan controllerconfig.Config, ctx context.Context) error {
|
||||||
ictx := controllerconfig.Context{}
|
ictx, cancelFunc := context.WithCancel(context.TODO())
|
||||||
ictx.RenewContext(context.TODO())
|
|
||||||
errCh := make(chan error)
|
errCh := make(chan error)
|
||||||
defer close(errCh)
|
defer close(errCh)
|
||||||
go func() {
|
go func() {
|
||||||
if err := run(s, ictx.GetContext()); err != nil {
|
if err := run(s, ictx); err != nil {
|
||||||
errCh <- err
|
errCh <- err
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// The ctx (signals.SetupSignalHandler()) is to control the entire program life cycle,
|
||||||
|
// The ictx(internal context) is created here to control the life cycle of the controller-manager(all controllers, sharedInformer, webhook etc.)
|
||||||
|
// when config changed, stop server and renew context, start new server
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
ictx.CancelContext()
|
cancelFunc()
|
||||||
return nil
|
return nil
|
||||||
case cfg := <-configCh:
|
case cfg := <-configCh:
|
||||||
ictx.CancelContext()
|
cancelFunc()
|
||||||
s.MergeConfig(&cfg)
|
s.MergeConfig(&cfg)
|
||||||
ictx.RenewContext(context.TODO())
|
ictx, cancelFunc = context.WithCancel(context.TODO())
|
||||||
go func() {
|
go func() {
|
||||||
if err := run(s, ictx.GetContext()); err != nil {
|
if err := run(s, ictx); err != nil {
|
||||||
errCh <- err
|
errCh <- err
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
case err := <-errCh:
|
case err := <-errCh:
|
||||||
|
cancelFunc()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -38,7 +38,7 @@ func NewAPIServerCommand() *cobra.Command {
|
|||||||
s := options.NewServerRunOptions()
|
s := options.NewServerRunOptions()
|
||||||
|
|
||||||
// Load configuration from file
|
// Load configuration from file
|
||||||
conf, configCh, err := apiserverconfig.TryLoadFromDisk()
|
conf, err := apiserverconfig.TryLoadFromDisk()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
s = &options.ServerRunOptions{
|
s = &options.ServerRunOptions{
|
||||||
GenericServerRunOptions: s.GenericServerRunOptions,
|
GenericServerRunOptions: s.GenericServerRunOptions,
|
||||||
@@ -57,7 +57,7 @@ cluster's shared state through which all other components interact.`,
|
|||||||
if errs := s.Validate(); len(errs) != 0 {
|
if errs := s.Validate(); len(errs) != 0 {
|
||||||
return utilerrors.NewAggregate(errs)
|
return utilerrors.NewAggregate(errs)
|
||||||
}
|
}
|
||||||
return Run(s, configCh, signals.SetupSignalHandler())
|
return Run(s, apiserverconfig.WatchConfigChange(), signals.SetupSignalHandler())
|
||||||
},
|
},
|
||||||
SilenceUsage: true,
|
SilenceUsage: true,
|
||||||
}
|
}
|
||||||
@@ -89,31 +89,34 @@ cluster's shared state through which all other components interact.`,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func Run(s *options.ServerRunOptions, configCh <-chan apiserverconfig.Config, ctx context.Context) error {
|
func Run(s *options.ServerRunOptions, configCh <-chan apiserverconfig.Config, ctx context.Context) error {
|
||||||
ictx := apiserverconfig.Context{}
|
ictx, cancelFunc := context.WithCancel(context.TODO())
|
||||||
ictx.RenewContext(context.TODO())
|
|
||||||
errCh := make(chan error)
|
errCh := make(chan error)
|
||||||
defer close(errCh)
|
defer close(errCh)
|
||||||
go func() {
|
go func() {
|
||||||
if err := run(s, ictx.GetContext()); err != nil {
|
if err := run(s, ictx); err != nil {
|
||||||
errCh <- err
|
errCh <- err
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// The ctx (signals.SetupSignalHandler()) is to control the entire program life cycle,
|
||||||
|
// The ictx(internal context) is created here to control the life cycle of the ks-apiserver(http server, sharedInformer etc.)
|
||||||
|
// when config change, stop server and renew context, start new server
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
ictx.CancelContext()
|
cancelFunc()
|
||||||
return nil
|
return nil
|
||||||
case cfg := <-configCh:
|
case cfg := <-configCh:
|
||||||
ictx.CancelContext()
|
cancelFunc()
|
||||||
s.Config = &cfg
|
s.Config = &cfg
|
||||||
ictx.RenewContext(context.TODO())
|
ictx, cancelFunc = context.WithCancel(context.TODO())
|
||||||
go func() {
|
go func() {
|
||||||
if err := run(s, ictx.GetContext()); err != nil {
|
if err := run(s, ictx); err != nil {
|
||||||
errCh <- err
|
errCh <- err
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
case err := <-errCh:
|
case err := <-errCh:
|
||||||
|
cancelFunc()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,7 +17,6 @@ limitations under the License.
|
|||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -86,8 +85,8 @@ import (
|
|||||||
// will be ignored.
|
// will be ignored.
|
||||||
|
|
||||||
var (
|
var (
|
||||||
once sync.Once
|
// singleton instance of config package
|
||||||
cfgChangeCh = make(chan Config)
|
_config = defaultConfig()
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -98,6 +97,61 @@ const (
|
|||||||
defaultConfigurationPath = "/etc/kubesphere"
|
defaultConfigurationPath = "/etc/kubesphere"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type config struct {
|
||||||
|
cfg *Config
|
||||||
|
cfgChangeCh chan Config
|
||||||
|
watchOnce sync.Once
|
||||||
|
loadOnce sync.Once
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *config) watchConfig() <-chan Config {
|
||||||
|
c.watchOnce.Do(func() {
|
||||||
|
viper.WatchConfig()
|
||||||
|
viper.OnConfigChange(func(in fsnotify.Event) {
|
||||||
|
cfg := New()
|
||||||
|
if err := viper.Unmarshal(cfg); err != nil {
|
||||||
|
klog.Warning("config reload error", err)
|
||||||
|
} else {
|
||||||
|
c.cfgChangeCh <- *cfg
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
return c.cfgChangeCh
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *config) loadFromDisk() (*Config, error) {
|
||||||
|
var err error
|
||||||
|
c.loadOnce.Do(func() {
|
||||||
|
if err = viper.ReadInConfig(); err != nil {
|
||||||
|
if _, ok := err.(viper.ConfigFileNotFoundError); !ok {
|
||||||
|
err = fmt.Errorf("error parsing configuration file %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err = viper.Unmarshal(c.cfg)
|
||||||
|
})
|
||||||
|
return c.cfg, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func defaultConfig() *config {
|
||||||
|
viper.SetConfigName(defaultConfigurationName)
|
||||||
|
viper.AddConfigPath(defaultConfigurationPath)
|
||||||
|
|
||||||
|
// Load from current working directory, only used for debugging
|
||||||
|
viper.AddConfigPath(".")
|
||||||
|
|
||||||
|
// Load from Environment variables
|
||||||
|
viper.SetEnvPrefix("kubesphere")
|
||||||
|
viper.AutomaticEnv()
|
||||||
|
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
|
||||||
|
|
||||||
|
return &config{
|
||||||
|
cfg: New(),
|
||||||
|
cfgChangeCh: make(chan Config),
|
||||||
|
watchOnce: sync.Once{},
|
||||||
|
loadOnce: sync.Once{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Config defines everything needed for apiserver to deal with external services
|
// Config defines everything needed for apiserver to deal with external services
|
||||||
type Config struct {
|
type Config struct {
|
||||||
DevopsOptions *jenkins.Options `json:"devops,omitempty" yaml:"devops,omitempty" mapstructure:"devops"`
|
DevopsOptions *jenkins.Options `json:"devops,omitempty" yaml:"devops,omitempty" mapstructure:"devops"`
|
||||||
@@ -158,44 +212,13 @@ func New() *Config {
|
|||||||
|
|
||||||
// TryLoadFromDisk loads configuration from default location after server startup
|
// TryLoadFromDisk loads configuration from default location after server startup
|
||||||
// return nil error if configuration file not exists
|
// return nil error if configuration file not exists
|
||||||
func TryLoadFromDisk() (*Config, <-chan Config, error) {
|
func TryLoadFromDisk() (*Config, error) {
|
||||||
once.Do(func() {
|
return _config.loadFromDisk()
|
||||||
viper.SetConfigName(defaultConfigurationName)
|
}
|
||||||
viper.AddConfigPath(defaultConfigurationPath)
|
|
||||||
|
|
||||||
// Load from current working directory, only used for debugging
|
// WatchConfigChange return config change channel
|
||||||
viper.AddConfigPath(".")
|
func WatchConfigChange() <-chan Config {
|
||||||
|
return _config.watchConfig()
|
||||||
// Load from Environment variables
|
|
||||||
viper.SetEnvPrefix("kubesphere")
|
|
||||||
viper.AutomaticEnv()
|
|
||||||
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
|
|
||||||
|
|
||||||
viper.WatchConfig()
|
|
||||||
viper.OnConfigChange(func(in fsnotify.Event) {
|
|
||||||
cfg := New()
|
|
||||||
if err := viper.Unmarshal(cfg); err != nil {
|
|
||||||
klog.Warning("config reload error", err)
|
|
||||||
} else {
|
|
||||||
cfgChangeCh <- *cfg
|
|
||||||
}
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
conf := New()
|
|
||||||
if err := viper.ReadInConfig(); err != nil {
|
|
||||||
if _, ok := err.(viper.ConfigFileNotFoundError); ok {
|
|
||||||
return nil, cfgChangeCh, err
|
|
||||||
} else {
|
|
||||||
return nil, cfgChangeCh, fmt.Errorf("error parsing configuration file %s", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := viper.Unmarshal(conf); err != nil {
|
|
||||||
return nil, cfgChangeCh, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return conf, cfgChangeCh, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// convertToMap simply converts config to map[string]bool
|
// convertToMap simply converts config to map[string]bool
|
||||||
@@ -340,25 +363,3 @@ func (conf *Config) stripEmptyOptions() {
|
|||||||
conf.GPUOptions = nil
|
conf.GPUOptions = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type Context struct {
|
|
||||||
internalCtx context.Context
|
|
||||||
internalCtxCancel context.CancelFunc
|
|
||||||
mu sync.RWMutex
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Context) GetContext() context.Context {
|
|
||||||
c.mu.RLock()
|
|
||||||
defer c.mu.RUnlock()
|
|
||||||
return c.internalCtx
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Context) RenewContext(ctx context.Context) {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
c.internalCtx, c.internalCtxCancel = context.WithCancel(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Context) CancelContext() {
|
|
||||||
c.internalCtxCancel()
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -240,7 +240,7 @@ func TestGet(t *testing.T) {
|
|||||||
conf.RedisOptions.Password = "P@88w0rd"
|
conf.RedisOptions.Password = "P@88w0rd"
|
||||||
os.Setenv("KUBESPHERE_REDIS_PASSWORD", "P@88w0rd")
|
os.Setenv("KUBESPHERE_REDIS_PASSWORD", "P@88w0rd")
|
||||||
|
|
||||||
conf2, _, err := TryLoadFromDisk()
|
conf2, err := TryLoadFromDisk()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user