Merge pull request #4659 from x893675/master
feat: live-reload when configuration changed
This commit is contained in:
@@ -22,6 +22,8 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
controllerconfig "kubesphere.io/kubesphere/pkg/apiserver/config"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
|
||||
"kubesphere.io/kubesphere/pkg/apiserver/authentication"
|
||||
@@ -217,3 +219,18 @@ func (s *KubeSphereControllerManagerOptions) bindLeaderElectionFlags(l *leaderel
|
||||
"The duration the clients should wait between attempting acquisition and renewal "+
|
||||
"of a leadership. This is only applicable if leader election is enabled.")
|
||||
}
|
||||
|
||||
// MergeConfig merge new config without validation
|
||||
// When misconfigured, the app should just crash directly
|
||||
func (s *KubeSphereControllerManagerOptions) MergeConfig(cfg *controllerconfig.Config) {
|
||||
s.KubernetesOptions = cfg.KubernetesOptions
|
||||
s.DevopsOptions = cfg.DevopsOptions
|
||||
s.S3Options = cfg.S3Options
|
||||
s.AuthenticationOptions = cfg.AuthenticationOptions
|
||||
s.LdapOptions = cfg.LdapOptions
|
||||
s.OpenPitrixOptions = cfg.OpenPitrixOptions
|
||||
s.NetworkOptions = cfg.NetworkOptions
|
||||
s.MultiClusterOptions = cfg.MultiClusterOptions
|
||||
s.ServiceMeshOptions = cfg.ServiceMeshOptions
|
||||
s.GatewayOptions = cfg.GatewayOptions
|
||||
}
|
||||
|
||||
@@ -78,8 +78,7 @@ func NewControllerManagerCommand() *cobra.Command {
|
||||
klog.Error(utilerrors.NewAggregate(errs))
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if err = run(s, signals.SetupSignalHandler()); err != nil {
|
||||
if err = Run(s, controllerconfig.WatchConfigChange(), signals.SetupSignalHandler()); err != nil {
|
||||
klog.Error(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
@@ -114,6 +113,40 @@ func NewControllerManagerCommand() *cobra.Command {
|
||||
return cmd
|
||||
}
|
||||
|
||||
func Run(s *options.KubeSphereControllerManagerOptions, configCh <-chan controllerconfig.Config, ctx context.Context) error {
|
||||
ictx, cancelFunc := context.WithCancel(context.TODO())
|
||||
errCh := make(chan error)
|
||||
defer close(errCh)
|
||||
go func() {
|
||||
if err := run(s, ictx); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
|
||||
// The ctx (signals.SetupSignalHandler()) is to control the entire program life cycle,
|
||||
// The ictx(internal context) is created here to control the life cycle of the controller-manager(all controllers, sharedInformer, webhook etc.)
|
||||
// when config changed, stop server and renew context, start new server
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
cancelFunc()
|
||||
return nil
|
||||
case cfg := <-configCh:
|
||||
cancelFunc()
|
||||
s.MergeConfig(&cfg)
|
||||
ictx, cancelFunc = context.WithCancel(context.TODO())
|
||||
go func() {
|
||||
if err := run(s, ictx); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
case err := <-errCh:
|
||||
cancelFunc()
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func run(s *options.KubeSphereControllerManagerOptions, ctx context.Context) error {
|
||||
|
||||
kubernetesClient, err := k8s.NewKubernetesClient(s.KubernetesOptions)
|
||||
|
||||
@@ -19,6 +19,7 @@ package app
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
@@ -56,8 +57,7 @@ cluster's shared state through which all other components interact.`,
|
||||
if errs := s.Validate(); len(errs) != 0 {
|
||||
return utilerrors.NewAggregate(errs)
|
||||
}
|
||||
|
||||
return Run(s, signals.SetupSignalHandler())
|
||||
return Run(s, apiserverconfig.WatchConfigChange(), signals.SetupSignalHandler())
|
||||
},
|
||||
SilenceUsage: true,
|
||||
}
|
||||
@@ -88,8 +88,41 @@ cluster's shared state through which all other components interact.`,
|
||||
return cmd
|
||||
}
|
||||
|
||||
func Run(s *options.ServerRunOptions, ctx context.Context) error {
|
||||
func Run(s *options.ServerRunOptions, configCh <-chan apiserverconfig.Config, ctx context.Context) error {
|
||||
ictx, cancelFunc := context.WithCancel(context.TODO())
|
||||
errCh := make(chan error)
|
||||
defer close(errCh)
|
||||
go func() {
|
||||
if err := run(s, ictx); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
|
||||
// The ctx (signals.SetupSignalHandler()) is to control the entire program life cycle,
|
||||
// The ictx(internal context) is created here to control the life cycle of the ks-apiserver(http server, sharedInformer etc.)
|
||||
// when config change, stop server and renew context, start new server
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
cancelFunc()
|
||||
return nil
|
||||
case cfg := <-configCh:
|
||||
cancelFunc()
|
||||
s.Config = &cfg
|
||||
ictx, cancelFunc = context.WithCancel(context.TODO())
|
||||
go func() {
|
||||
if err := run(s, ictx); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
case err := <-errCh:
|
||||
cancelFunc()
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func run(s *options.ServerRunOptions, ctx context.Context) error {
|
||||
apiserver, err := s.NewAPIServer(ctx.Done())
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -100,5 +133,9 @@ func Run(s *options.ServerRunOptions, ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
return apiserver.Run(ctx)
|
||||
err = apiserver.Run(ctx)
|
||||
if err == http.ErrServerClosed {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"net/http"
|
||||
rt "runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/emicklei/go-restful"
|
||||
@@ -107,6 +108,8 @@ import (
|
||||
"kubesphere.io/kubesphere/pkg/utils/metrics"
|
||||
)
|
||||
|
||||
var initMetrics sync.Once
|
||||
|
||||
type APIServer struct {
|
||||
// number of kubesphere apiserver
|
||||
ServerCount int
|
||||
@@ -165,7 +168,7 @@ func (s *APIServer) PrepareRun(stopCh <-chan struct{}) error {
|
||||
logStackOnRecover(panicReason, httpWriter)
|
||||
})
|
||||
|
||||
s.installKubeSphereAPIs()
|
||||
s.installKubeSphereAPIs(stopCh)
|
||||
s.installCRDAPIs()
|
||||
s.installMetricsAPI()
|
||||
s.container.Filter(monitorRequest)
|
||||
@@ -193,14 +196,14 @@ func monitorRequest(r *restful.Request, response *restful.Response, chain *restf
|
||||
}
|
||||
|
||||
func (s *APIServer) installMetricsAPI() {
|
||||
registerMetrics()
|
||||
initMetrics.Do(registerMetrics)
|
||||
metrics.Defaults.Install(s.container)
|
||||
}
|
||||
|
||||
// Install all kubesphere api groups
|
||||
// Installation happens before all informers start to cache objects, so
|
||||
// any attempt to list objects using listers will get empty results.
|
||||
func (s *APIServer) installKubeSphereAPIs() {
|
||||
func (s *APIServer) installKubeSphereAPIs(stopCh <-chan struct{}) {
|
||||
imOperator := im.NewOperator(s.KubernetesClient.KubeSphere(),
|
||||
user.New(s.InformerFactory.KubeSphereSharedInformerFactory(),
|
||||
s.InformerFactory.KubernetesSharedInformerFactory()),
|
||||
@@ -214,15 +217,15 @@ func (s *APIServer) installKubeSphereAPIs() {
|
||||
|
||||
urlruntime.Must(configv1alpha2.AddToContainer(s.container, s.Config))
|
||||
urlruntime.Must(resourcev1alpha3.AddToContainer(s.container, s.InformerFactory, s.RuntimeCache))
|
||||
urlruntime.Must(monitoringv1alpha3.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.MetricsClient, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions, s.RuntimeClient))
|
||||
urlruntime.Must(meteringv1alpha1.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.RuntimeCache, s.Config.MeteringOptions, nil, s.RuntimeClient))
|
||||
urlruntime.Must(openpitrixv1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions))
|
||||
urlruntime.Must(monitoringv1alpha3.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.MetricsClient, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions, s.RuntimeClient, stopCh))
|
||||
urlruntime.Must(meteringv1alpha1.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.RuntimeCache, s.Config.MeteringOptions, nil, s.RuntimeClient, stopCh))
|
||||
urlruntime.Must(openpitrixv1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions, stopCh))
|
||||
urlruntime.Must(openpitrixv2alpha1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions))
|
||||
urlruntime.Must(operationsv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes()))
|
||||
urlruntime.Must(resourcesv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.InformerFactory,
|
||||
s.KubernetesClient.Master()))
|
||||
urlruntime.Must(tenantv1alpha2.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.Kubernetes(),
|
||||
s.KubernetesClient.KubeSphere(), s.EventsClient, s.LoggingClient, s.AuditingClient, amOperator, rbacAuthorizer, s.MonitoringClient, s.RuntimeCache, s.Config.MeteringOptions))
|
||||
s.KubernetesClient.KubeSphere(), s.EventsClient, s.LoggingClient, s.AuditingClient, amOperator, rbacAuthorizer, s.MonitoringClient, s.RuntimeCache, s.Config.MeteringOptions, stopCh))
|
||||
urlruntime.Must(terminalv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), rbacAuthorizer, s.KubernetesClient.Config(), s.Config.TerminalOptions))
|
||||
urlruntime.Must(clusterkapisv1alpha1.AddToContainer(s.container,
|
||||
s.KubernetesClient.KubeSphere(),
|
||||
|
||||
@@ -20,6 +20,11 @@ import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"k8s.io/klog"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
|
||||
"kubesphere.io/kubesphere/pkg/apiserver/authentication"
|
||||
"kubesphere.io/kubesphere/pkg/apiserver/authorization"
|
||||
@@ -79,6 +84,11 @@ import (
|
||||
// mysql-host is missing in command line flags, all other mysql command line flags
|
||||
// will be ignored.
|
||||
|
||||
var (
|
||||
// singleton instance of config package
|
||||
_config = defaultConfig()
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultConfigurationName is the default name of configuration
|
||||
defaultConfigurationName = "kubesphere"
|
||||
@@ -87,6 +97,61 @@ const (
|
||||
defaultConfigurationPath = "/etc/kubesphere"
|
||||
)
|
||||
|
||||
type config struct {
|
||||
cfg *Config
|
||||
cfgChangeCh chan Config
|
||||
watchOnce sync.Once
|
||||
loadOnce sync.Once
|
||||
}
|
||||
|
||||
func (c *config) watchConfig() <-chan Config {
|
||||
c.watchOnce.Do(func() {
|
||||
viper.WatchConfig()
|
||||
viper.OnConfigChange(func(in fsnotify.Event) {
|
||||
cfg := New()
|
||||
if err := viper.Unmarshal(cfg); err != nil {
|
||||
klog.Warning("config reload error", err)
|
||||
} else {
|
||||
c.cfgChangeCh <- *cfg
|
||||
}
|
||||
})
|
||||
})
|
||||
return c.cfgChangeCh
|
||||
}
|
||||
|
||||
func (c *config) loadFromDisk() (*Config, error) {
|
||||
var err error
|
||||
c.loadOnce.Do(func() {
|
||||
if err = viper.ReadInConfig(); err != nil {
|
||||
if _, ok := err.(viper.ConfigFileNotFoundError); !ok {
|
||||
err = fmt.Errorf("error parsing configuration file %s", err)
|
||||
}
|
||||
}
|
||||
err = viper.Unmarshal(c.cfg)
|
||||
})
|
||||
return c.cfg, err
|
||||
}
|
||||
|
||||
func defaultConfig() *config {
|
||||
viper.SetConfigName(defaultConfigurationName)
|
||||
viper.AddConfigPath(defaultConfigurationPath)
|
||||
|
||||
// Load from current working directory, only used for debugging
|
||||
viper.AddConfigPath(".")
|
||||
|
||||
// Load from Environment variables
|
||||
viper.SetEnvPrefix("kubesphere")
|
||||
viper.AutomaticEnv()
|
||||
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
|
||||
|
||||
return &config{
|
||||
cfg: New(),
|
||||
cfgChangeCh: make(chan Config),
|
||||
watchOnce: sync.Once{},
|
||||
loadOnce: sync.Once{},
|
||||
}
|
||||
}
|
||||
|
||||
// Config defines everything needed for apiserver to deal with external services
|
||||
type Config struct {
|
||||
DevopsOptions *jenkins.Options `json:"devops,omitempty" yaml:"devops,omitempty" mapstructure:"devops"`
|
||||
@@ -148,32 +213,12 @@ func New() *Config {
|
||||
// TryLoadFromDisk loads configuration from default location after server startup
|
||||
// return nil error if configuration file not exists
|
||||
func TryLoadFromDisk() (*Config, error) {
|
||||
viper.SetConfigName(defaultConfigurationName)
|
||||
viper.AddConfigPath(defaultConfigurationPath)
|
||||
return _config.loadFromDisk()
|
||||
}
|
||||
|
||||
// Load from current working directory, only used for debugging
|
||||
viper.AddConfigPath(".")
|
||||
|
||||
// Load from Environment variables
|
||||
viper.SetEnvPrefix("kubesphere")
|
||||
viper.AutomaticEnv()
|
||||
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
|
||||
|
||||
if err := viper.ReadInConfig(); err != nil {
|
||||
if _, ok := err.(viper.ConfigFileNotFoundError); ok {
|
||||
return nil, err
|
||||
} else {
|
||||
return nil, fmt.Errorf("error parsing configuration file %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
conf := New()
|
||||
|
||||
if err := viper.Unmarshal(conf); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return conf, nil
|
||||
// WatchConfigChange return config change channel
|
||||
func WatchConfigChange() <-chan Config {
|
||||
return _config.watchConfig()
|
||||
}
|
||||
|
||||
// convertToMap simply converts config to map[string]bool
|
||||
|
||||
@@ -47,6 +47,6 @@ type meterHandler interface {
|
||||
HandlePVCMeterQuery(req *restful.Request, resp *restful.Response)
|
||||
}
|
||||
|
||||
func newHandler(k kubernetes.Interface, m monitoring.Interface, f informers.InformerFactory, ksClient versioned.Interface, resourceGetter *resourcev1alpha3.ResourceGetter, meteringOptions *meteringclient.Options, opOptions *openpitrixoptions.Options, rtClient runtimeclient.Client) meterHandler {
|
||||
return monitorhle.NewHandler(k, m, nil, f, ksClient, resourceGetter, meteringOptions, opOptions, rtClient)
|
||||
func newHandler(k kubernetes.Interface, m monitoring.Interface, f informers.InformerFactory, ksClient versioned.Interface, resourceGetter *resourcev1alpha3.ResourceGetter, meteringOptions *meteringclient.Options, opOptions *openpitrixoptions.Options, rtClient runtimeclient.Client, stopCh <-chan struct{}) meterHandler {
|
||||
return monitorhle.NewHandler(k, m, nil, f, ksClient, resourceGetter, meteringOptions, opOptions, rtClient, stopCh)
|
||||
}
|
||||
|
||||
@@ -49,10 +49,10 @@ const (
|
||||
|
||||
var GroupVersion = schema.GroupVersion{Group: groupName, Version: "v1alpha1"}
|
||||
|
||||
func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteringClient monitoring.Interface, factory informers.InformerFactory, ksClient versioned.Interface, cache cache.Cache, meteringOptions *meteringclient.Options, opOptions *openpitrixoptions.Options, rtClient runtimeclient.Client) error {
|
||||
func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteringClient monitoring.Interface, factory informers.InformerFactory, ksClient versioned.Interface, cache cache.Cache, meteringOptions *meteringclient.Options, opOptions *openpitrixoptions.Options, rtClient runtimeclient.Client, stopCh <-chan struct{}) error {
|
||||
ws := runtime.NewWebService(GroupVersion)
|
||||
|
||||
h := newHandler(k8sClient, meteringClient, factory, ksClient, resourcev1alpha3.NewResourceGetter(factory, cache), meteringOptions, opOptions, rtClient)
|
||||
h := newHandler(k8sClient, meteringClient, factory, ksClient, resourcev1alpha3.NewResourceGetter(factory, cache), meteringOptions, opOptions, rtClient, stopCh)
|
||||
|
||||
ws.Route(ws.GET("/cluster").
|
||||
To(h.HandleClusterMeterQuery).
|
||||
|
||||
@@ -60,7 +60,7 @@ type handler struct {
|
||||
rtClient runtimeclient.Client
|
||||
}
|
||||
|
||||
func NewHandler(k kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, f informers.InformerFactory, ksClient versioned.Interface, resourceGetter *resourcev1alpha3.ResourceGetter, meteringOptions *meteringclient.Options, opOptions *openpitrixoptions.Options, rtClient runtimeclient.Client) *handler {
|
||||
func NewHandler(k kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, f informers.InformerFactory, ksClient versioned.Interface, resourceGetter *resourcev1alpha3.ResourceGetter, meteringOptions *meteringclient.Options, opOptions *openpitrixoptions.Options, rtClient runtimeclient.Client, stopCh <-chan struct{}) *handler {
|
||||
var opRelease openpitrix.Interface
|
||||
var s3Client s3.Interface
|
||||
if opOptions != nil && opOptions.S3Options != nil && len(opOptions.S3Options.Endpoint) != 0 {
|
||||
@@ -71,7 +71,7 @@ func NewHandler(k kubernetes.Interface, monitoringClient monitoring.Interface, m
|
||||
}
|
||||
}
|
||||
if ksClient != nil {
|
||||
opRelease = openpitrix.NewOpenpitrixOperator(f, ksClient, s3Client)
|
||||
opRelease = openpitrix.NewOpenpitrixOperator(f, ksClient, s3Client, stopCh)
|
||||
}
|
||||
if meteringOptions == nil || meteringOptions.RetentionDay == "" {
|
||||
meteringOptions = &meteringclient.DefaultMeteringOption
|
||||
|
||||
@@ -373,7 +373,7 @@ func TestParseRequestParams(t *testing.T) {
|
||||
|
||||
fakeInformerFactory.KubeSphereSharedInformerFactory()
|
||||
|
||||
handler := NewHandler(client, nil, nil, fakeInformerFactory, ksClient, nil, nil, nil, nil)
|
||||
handler := NewHandler(client, nil, nil, fakeInformerFactory, ksClient, nil, nil, nil, nil, nil)
|
||||
|
||||
result, err := handler.makeQueryOptions(tt.params, tt.lvl)
|
||||
if err != nil {
|
||||
|
||||
@@ -47,10 +47,10 @@ const (
|
||||
|
||||
var GroupVersion = schema.GroupVersion{Group: groupName, Version: "v1alpha3"}
|
||||
|
||||
func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, factory informers.InformerFactory, ksClient versioned.Interface, opOptions *openpitrixoptions.Options, rtClient runtimeclient.Client) error {
|
||||
func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, factory informers.InformerFactory, ksClient versioned.Interface, opOptions *openpitrixoptions.Options, rtClient runtimeclient.Client, stopCh <-chan struct{}) error {
|
||||
ws := runtime.NewWebService(GroupVersion)
|
||||
|
||||
h := NewHandler(k8sClient, monitoringClient, metricsClient, factory, ksClient, nil, nil, opOptions, rtClient)
|
||||
h := NewHandler(k8sClient, monitoringClient, metricsClient, factory, ksClient, nil, nil, opOptions, rtClient, stopCh)
|
||||
|
||||
ws.Route(ws.GET("/kubesphere").
|
||||
To(h.handleKubeSphereMetricsQuery).
|
||||
|
||||
@@ -50,7 +50,7 @@ type openpitrixHandler struct {
|
||||
openpitrix openpitrix.Interface
|
||||
}
|
||||
|
||||
func newOpenpitrixHandler(ksInformers informers.InformerFactory, ksClient versioned.Interface, option *openpitrixoptions.Options) *openpitrixHandler {
|
||||
func newOpenpitrixHandler(ksInformers informers.InformerFactory, ksClient versioned.Interface, option *openpitrixoptions.Options, stopCh <-chan struct{}) *openpitrixHandler {
|
||||
var s3Client s3.Interface
|
||||
if option != nil && option.S3Options != nil && len(option.S3Options.Endpoint) != 0 {
|
||||
var err error
|
||||
@@ -61,7 +61,7 @@ func newOpenpitrixHandler(ksInformers informers.InformerFactory, ksClient versio
|
||||
}
|
||||
|
||||
return &openpitrixHandler{
|
||||
openpitrix.NewOpenpitrixOperator(ksInformers, ksClient, s3Client),
|
||||
openpitrix.NewOpenpitrixOperator(ksInformers, ksClient, s3Client, stopCh),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -38,11 +38,11 @@ const (
|
||||
|
||||
var GroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1"}
|
||||
|
||||
func AddToContainer(c *restful.Container, ksInfomrers informers.InformerFactory, ksClient versioned.Interface, options *openpitrixoptions.Options) error {
|
||||
func AddToContainer(c *restful.Container, ksInfomrers informers.InformerFactory, ksClient versioned.Interface, options *openpitrixoptions.Options, stopCh <-chan struct{}) error {
|
||||
mimePatch := []string{restful.MIME_JSON, runtime.MimeJsonPatchJson, runtime.MimeMergePatchJson}
|
||||
webservice := runtime.NewWebService(GroupVersion)
|
||||
|
||||
handler := newOpenpitrixHandler(ksInfomrers, ksClient, options)
|
||||
handler := newOpenpitrixHandler(ksInfomrers, ksClient, options, stopCh)
|
||||
|
||||
webservice.Route(webservice.POST("/repos").
|
||||
To(handler.CreateRepo).
|
||||
|
||||
@@ -60,14 +60,14 @@ func newTenantHandler(factory informers.InformerFactory, k8sclient kubernetes.In
|
||||
evtsClient events.Client, loggingClient logging.Client, auditingclient auditing.Client,
|
||||
am am.AccessManagementInterface, authorizer authorizer.Authorizer,
|
||||
monitoringclient monitoringclient.Interface, resourceGetter *resourcev1alpha3.ResourceGetter,
|
||||
meteringOptions *meteringclient.Options) *tenantHandler {
|
||||
meteringOptions *meteringclient.Options, stopCh <-chan struct{}) *tenantHandler {
|
||||
|
||||
if meteringOptions == nil || meteringOptions.RetentionDay == "" {
|
||||
meteringOptions = &meteringclient.DefaultMeteringOption
|
||||
}
|
||||
|
||||
return &tenantHandler{
|
||||
tenant: tenant.New(factory, k8sclient, ksclient, evtsClient, loggingClient, auditingclient, am, authorizer, monitoringclient, resourceGetter),
|
||||
tenant: tenant.New(factory, k8sclient, ksclient, evtsClient, loggingClient, auditingclient, am, authorizer, monitoringclient, resourceGetter, stopCh),
|
||||
meteringOptions: meteringOptions,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,11 +66,11 @@ func Resource(resource string) schema.GroupResource {
|
||||
func AddToContainer(c *restful.Container, factory informers.InformerFactory, k8sclient kubernetes.Interface,
|
||||
ksclient kubesphere.Interface, evtsClient events.Client, loggingClient logging.Client,
|
||||
auditingclient auditing.Client, am am.AccessManagementInterface, authorizer authorizer.Authorizer,
|
||||
monitoringclient monitoringclient.Interface, cache cache.Cache, meteringOptions *meteringclient.Options) error {
|
||||
monitoringclient monitoringclient.Interface, cache cache.Cache, meteringOptions *meteringclient.Options, stopCh <-chan struct{}) error {
|
||||
mimePatch := []string{restful.MIME_JSON, runtime.MimeMergePatchJson, runtime.MimeJsonPatchJson}
|
||||
|
||||
ws := runtime.NewWebService(GroupVersion)
|
||||
handler := newTenantHandler(factory, k8sclient, ksclient, evtsClient, loggingClient, auditingclient, am, authorizer, monitoringclient, resourcev1alpha3.NewResourceGetter(factory, cache), meteringOptions)
|
||||
handler := newTenantHandler(factory, k8sclient, ksclient, evtsClient, loggingClient, auditingclient, am, authorizer, monitoringclient, resourcev1alpha3.NewResourceGetter(factory, cache), meteringOptions, stopCh)
|
||||
|
||||
ws.Route(ws.GET("/clusters").
|
||||
To(handler.ListClusters).
|
||||
|
||||
@@ -19,7 +19,6 @@ package openpitrix
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/klog"
|
||||
|
||||
@@ -55,7 +54,7 @@ func init() {
|
||||
cachedReposData = reposcache.NewReposCache()
|
||||
}
|
||||
|
||||
func NewOpenpitrixOperator(ksInformers ks_informers.InformerFactory, ksClient versioned.Interface, s3Client s3.Interface) Interface {
|
||||
func NewOpenpitrixOperator(ksInformers ks_informers.InformerFactory, ksClient versioned.Interface, s3Client s3.Interface, stopCh <-chan struct{}) Interface {
|
||||
once.Do(func() {
|
||||
klog.Infof("start helm repo informer")
|
||||
helmReposInformer = ksInformers.KubeSphereSharedInformerFactory().Application().V1alpha1().HelmRepos().Informer()
|
||||
@@ -85,9 +84,6 @@ func NewOpenpitrixOperator(ksInformers ks_informers.InformerFactory, ksClient ve
|
||||
indexer := ctgInformer.GetIndexer()
|
||||
|
||||
cachedReposData.SetCategoryIndexer(indexer)
|
||||
|
||||
go ctgInformer.Run(wait.NeverStop)
|
||||
go helmReposInformer.Run(wait.NeverStop)
|
||||
})
|
||||
|
||||
return &openpitrixOperator{
|
||||
|
||||
@@ -113,10 +113,10 @@ type tenantOperator struct {
|
||||
opRelease openpitrix.ReleaseInterface
|
||||
}
|
||||
|
||||
func New(informers informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient eventsclient.Client, loggingClient loggingclient.Client, auditingclient auditingclient.Client, am am.AccessManagementInterface, authorizer authorizer.Authorizer, monitoringclient monitoringclient.Interface, resourceGetter *resourcev1alpha3.ResourceGetter) Interface {
|
||||
func New(informers informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient eventsclient.Client, loggingClient loggingclient.Client, auditingclient auditingclient.Client, am am.AccessManagementInterface, authorizer authorizer.Authorizer, monitoringclient monitoringclient.Interface, resourceGetter *resourcev1alpha3.ResourceGetter, stopCh <-chan struct{}) Interface {
|
||||
var openpitrixRelease openpitrix.ReleaseInterface
|
||||
if ksclient != nil {
|
||||
openpitrixRelease = openpitrix.NewOpenpitrixOperator(informers, ksclient, nil)
|
||||
openpitrixRelease = openpitrix.NewOpenpitrixOperator(informers, ksclient, nil, stopCh)
|
||||
}
|
||||
|
||||
return &tenantOperator{
|
||||
|
||||
@@ -544,5 +544,5 @@ func prepare() Interface {
|
||||
amOperator := am.NewOperator(ksClient, k8sClient, fakeInformerFactory, nil)
|
||||
authorizer := rbac.NewRBACAuthorizer(amOperator)
|
||||
|
||||
return New(fakeInformerFactory, k8sClient, ksClient, nil, nil, nil, amOperator, authorizer, nil, nil)
|
||||
return New(fakeInformerFactory, k8sClient, ksClient, nil, nil, nil, amOperator, authorizer, nil, nil, nil)
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ package metrics
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/emicklei/go-restful"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
@@ -30,6 +31,8 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
registerOnce sync.Once
|
||||
|
||||
Defaults DefaultMetrics
|
||||
defaultRegistry compbasemetrics.KubeRegistry
|
||||
// MustRegister registers registerable metrics but uses the defaultRegistry, panic upon the first registration that causes an error
|
||||
@@ -54,10 +57,13 @@ type DefaultMetrics struct{}
|
||||
|
||||
// Install adds the DefaultMetrics handler
|
||||
func (m DefaultMetrics) Install(c *restful.Container) {
|
||||
registerOnce.Do(m.registerMetrics)
|
||||
c.Handle("/kapis/metrics", Handler())
|
||||
}
|
||||
|
||||
func (m DefaultMetrics) registerMetrics() {
|
||||
RawMustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
|
||||
RawMustRegister(prometheus.NewGoCollector())
|
||||
|
||||
c.Handle("/kapis/metrics", Handler())
|
||||
}
|
||||
|
||||
//Overwrite version.Get
|
||||
|
||||
@@ -123,13 +123,13 @@ func generateSwaggerJson() []byte {
|
||||
urlruntime.Must(devopsv1alpha2.AddToContainer(container, ""))
|
||||
urlruntime.Must(devopsv1alpha3.AddToContainer(container, ""))
|
||||
urlruntime.Must(iamv1alpha2.AddToContainer(container, nil, nil, group.New(informerFactory, clientsets.KubeSphere(), clientsets.Kubernetes()), nil))
|
||||
urlruntime.Must(monitoringv1alpha3.AddToContainer(container, clientsets.Kubernetes(), nil, nil, informerFactory, nil, nil, nil))
|
||||
urlruntime.Must(openpitrixv1.AddToContainer(container, informerFactory, fake.NewSimpleClientset(), nil))
|
||||
urlruntime.Must(monitoringv1alpha3.AddToContainer(container, clientsets.Kubernetes(), nil, nil, informerFactory, nil, nil, nil, nil))
|
||||
urlruntime.Must(openpitrixv1.AddToContainer(container, informerFactory, fake.NewSimpleClientset(), nil, nil))
|
||||
urlruntime.Must(openpitrixv2.AddToContainer(container, informerFactory, fake.NewSimpleClientset(), nil))
|
||||
urlruntime.Must(operationsv1alpha2.AddToContainer(container, clientsets.Kubernetes()))
|
||||
urlruntime.Must(resourcesv1alpha2.AddToContainer(container, clientsets.Kubernetes(), informerFactory, ""))
|
||||
urlruntime.Must(resourcesv1alpha3.AddToContainer(container, informerFactory, nil))
|
||||
urlruntime.Must(tenantv1alpha2.AddToContainer(container, informerFactory, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil))
|
||||
urlruntime.Must(tenantv1alpha2.AddToContainer(container, informerFactory, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil))
|
||||
urlruntime.Must(terminalv1alpha2.AddToContainer(container, clientsets.Kubernetes(), nil, nil, nil))
|
||||
urlruntime.Must(metricsv1alpha2.AddToContainer(nil, container, clientsets.Kubernetes(), nil))
|
||||
urlruntime.Must(networkv1alpha2.AddToContainer(container, ""))
|
||||
|
||||
Reference in New Issue
Block a user