add feature for live-reload when configuration changed
Signed-off-by: x893675 <x893675@icloud.com>
This commit is contained in:
@@ -22,6 +22,8 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
controllerconfig "kubesphere.io/kubesphere/pkg/apiserver/config"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
|
||||
"kubesphere.io/kubesphere/pkg/apiserver/authentication"
|
||||
@@ -217,3 +219,16 @@ func (s *KubeSphereControllerManagerOptions) bindLeaderElectionFlags(l *leaderel
|
||||
"The duration the clients should wait between attempting acquisition and renewal "+
|
||||
"of a leadership. This is only applicable if leader election is enabled.")
|
||||
}
|
||||
|
||||
func (s *KubeSphereControllerManagerOptions) MergeConfig(cfg *controllerconfig.Config) {
|
||||
s.KubernetesOptions = cfg.KubernetesOptions
|
||||
s.DevopsOptions = cfg.DevopsOptions
|
||||
s.S3Options = cfg.S3Options
|
||||
s.AuthenticationOptions = cfg.AuthenticationOptions
|
||||
s.LdapOptions = cfg.LdapOptions
|
||||
s.OpenPitrixOptions = cfg.OpenPitrixOptions
|
||||
s.NetworkOptions = cfg.NetworkOptions
|
||||
s.MultiClusterOptions = cfg.MultiClusterOptions
|
||||
s.ServiceMeshOptions = cfg.ServiceMeshOptions
|
||||
s.GatewayOptions = cfg.GatewayOptions
|
||||
}
|
||||
|
||||
@@ -48,7 +48,7 @@ import (
|
||||
|
||||
func NewControllerManagerCommand() *cobra.Command {
|
||||
s := options.NewKubeSphereControllerManagerOptions()
|
||||
conf, err := controllerconfig.TryLoadFromDisk()
|
||||
conf, configCh, err := controllerconfig.TryLoadFromDisk()
|
||||
if err == nil {
|
||||
// make sure LeaderElection is not nil
|
||||
s = &options.KubeSphereControllerManagerOptions{
|
||||
@@ -79,7 +79,7 @@ func NewControllerManagerCommand() *cobra.Command {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if err = run(s, signals.SetupSignalHandler()); err != nil {
|
||||
if err = Run(s, configCh, signals.SetupSignalHandler()); err != nil {
|
||||
klog.Error(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
@@ -114,6 +114,37 @@ func NewControllerManagerCommand() *cobra.Command {
|
||||
return cmd
|
||||
}
|
||||
|
||||
func Run(s *options.KubeSphereControllerManagerOptions, configCh <-chan controllerconfig.Config, ctx context.Context) error {
|
||||
ictx := controllerconfig.Context{}
|
||||
ictx.RenewContext(context.TODO())
|
||||
errCh := make(chan error)
|
||||
defer close(errCh)
|
||||
go func() {
|
||||
if err := run(s, ictx.GetContext()); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
ictx.CancelContext()
|
||||
return nil
|
||||
case cfg := <-configCh:
|
||||
ictx.CancelContext()
|
||||
s.MergeConfig(&cfg)
|
||||
ictx.RenewContext(context.TODO())
|
||||
go func() {
|
||||
if err := run(s, ictx.GetContext()); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
case err := <-errCh:
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func run(s *options.KubeSphereControllerManagerOptions, ctx context.Context) error {
|
||||
|
||||
kubernetesClient, err := k8s.NewKubernetesClient(s.KubernetesOptions)
|
||||
|
||||
@@ -19,6 +19,7 @@ package app
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
@@ -37,7 +38,7 @@ func NewAPIServerCommand() *cobra.Command {
|
||||
s := options.NewServerRunOptions()
|
||||
|
||||
// Load configuration from file
|
||||
conf, err := apiserverconfig.TryLoadFromDisk()
|
||||
conf, configCh, err := apiserverconfig.TryLoadFromDisk()
|
||||
if err == nil {
|
||||
s = &options.ServerRunOptions{
|
||||
GenericServerRunOptions: s.GenericServerRunOptions,
|
||||
@@ -56,8 +57,7 @@ cluster's shared state through which all other components interact.`,
|
||||
if errs := s.Validate(); len(errs) != 0 {
|
||||
return utilerrors.NewAggregate(errs)
|
||||
}
|
||||
|
||||
return Run(s, signals.SetupSignalHandler())
|
||||
return Run(s, configCh, signals.SetupSignalHandler())
|
||||
},
|
||||
SilenceUsage: true,
|
||||
}
|
||||
@@ -88,8 +88,38 @@ cluster's shared state through which all other components interact.`,
|
||||
return cmd
|
||||
}
|
||||
|
||||
func Run(s *options.ServerRunOptions, ctx context.Context) error {
|
||||
func Run(s *options.ServerRunOptions, configCh <-chan apiserverconfig.Config, ctx context.Context) error {
|
||||
ictx := apiserverconfig.Context{}
|
||||
ictx.RenewContext(context.TODO())
|
||||
errCh := make(chan error)
|
||||
defer close(errCh)
|
||||
go func() {
|
||||
if err := run(s, ictx.GetContext()); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
ictx.CancelContext()
|
||||
return nil
|
||||
case cfg := <-configCh:
|
||||
ictx.CancelContext()
|
||||
s.Config = &cfg
|
||||
ictx.RenewContext(context.TODO())
|
||||
go func() {
|
||||
if err := run(s, ictx.GetContext()); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
case err := <-errCh:
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func run(s *options.ServerRunOptions, ctx context.Context) error {
|
||||
apiserver, err := s.NewAPIServer(ctx.Done())
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -100,5 +130,9 @@ func Run(s *options.ServerRunOptions, ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
return apiserver.Run(ctx)
|
||||
err = apiserver.Run(ctx)
|
||||
if err == http.ErrServerClosed {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user