support configurable cluster resync perioud
Signed-off-by: yuswift <yuswiftli@yunify.com>
This commit is contained in:
@@ -52,6 +52,7 @@ import (
|
|||||||
"kubesphere.io/kubesphere/pkg/simple/client/devops"
|
"kubesphere.io/kubesphere/pkg/simple/client/devops"
|
||||||
"kubesphere.io/kubesphere/pkg/simple/client/k8s"
|
"kubesphere.io/kubesphere/pkg/simple/client/k8s"
|
||||||
ldapclient "kubesphere.io/kubesphere/pkg/simple/client/ldap"
|
ldapclient "kubesphere.io/kubesphere/pkg/simple/client/ldap"
|
||||||
|
"kubesphere.io/kubesphere/pkg/simple/client/multicluster"
|
||||||
"kubesphere.io/kubesphere/pkg/simple/client/network"
|
"kubesphere.io/kubesphere/pkg/simple/client/network"
|
||||||
ippoolclient "kubesphere.io/kubesphere/pkg/simple/client/network/ippool"
|
ippoolclient "kubesphere.io/kubesphere/pkg/simple/client/network/ippool"
|
||||||
calicoclient "kubesphere.io/kubesphere/pkg/simple/client/network/ippool/calico"
|
calicoclient "kubesphere.io/kubesphere/pkg/simple/client/network/ippool/calico"
|
||||||
@@ -71,7 +72,7 @@ func addControllers(
|
|||||||
options *k8s.KubernetesOptions,
|
options *k8s.KubernetesOptions,
|
||||||
authenticationOptions *authoptions.AuthenticationOptions,
|
authenticationOptions *authoptions.AuthenticationOptions,
|
||||||
openpitrixClient openpitrix.Client,
|
openpitrixClient openpitrix.Client,
|
||||||
multiClusterEnabled bool,
|
multiClusterOptions *multicluster.Options,
|
||||||
networkOptions *network.Options,
|
networkOptions *network.Options,
|
||||||
serviceMeshEnabled bool,
|
serviceMeshEnabled bool,
|
||||||
kubectlImage string,
|
kubectlImage string,
|
||||||
@@ -82,6 +83,8 @@ func addControllers(
|
|||||||
kubesphereInformer := informerFactory.KubeSphereSharedInformerFactory()
|
kubesphereInformer := informerFactory.KubeSphereSharedInformerFactory()
|
||||||
applicationInformer := informerFactory.ApplicationSharedInformerFactory()
|
applicationInformer := informerFactory.ApplicationSharedInformerFactory()
|
||||||
|
|
||||||
|
multiClusterEnabled := multiClusterOptions.Enable
|
||||||
|
|
||||||
var vsController, drController manager.Runnable
|
var vsController, drController manager.Runnable
|
||||||
if serviceMeshEnabled {
|
if serviceMeshEnabled {
|
||||||
vsController = virtualservice.NewVirtualServiceController(kubernetesInformer.Core().V1().Services(),
|
vsController = virtualservice.NewVirtualServiceController(kubernetesInformer.Core().V1().Services(),
|
||||||
@@ -278,7 +281,8 @@ func addControllers(
|
|||||||
client.Config(),
|
client.Config(),
|
||||||
kubesphereInformer.Cluster().V1alpha1().Clusters(),
|
kubesphereInformer.Cluster().V1alpha1().Clusters(),
|
||||||
client.KubeSphere().ClusterV1alpha1().Clusters(),
|
client.KubeSphere().ClusterV1alpha1().Clusters(),
|
||||||
openpitrixClient)
|
openpitrixClient,
|
||||||
|
multiClusterOptions.ClusterControllerResyncSecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
var nsnpController manager.Runnable
|
var nsnpController manager.Runnable
|
||||||
|
|||||||
@@ -217,7 +217,7 @@ func run(s *options.KubeSphereControllerManagerOptions, stopCh <-chan struct{})
|
|||||||
s.KubernetesOptions,
|
s.KubernetesOptions,
|
||||||
s.AuthenticationOptions,
|
s.AuthenticationOptions,
|
||||||
openpitrixClient,
|
openpitrixClient,
|
||||||
s.MultiClusterOptions.Enable,
|
s.MultiClusterOptions,
|
||||||
s.NetworkOptions,
|
s.NetworkOptions,
|
||||||
servicemeshEnabled,
|
servicemeshEnabled,
|
||||||
s.AuthenticationOptions.KubectlImage, stopCh); err != nil {
|
s.AuthenticationOptions.KubectlImage, stopCh); err != nil {
|
||||||
|
|||||||
@@ -157,6 +157,8 @@ type clusterController struct {
|
|||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
|
|
||||||
clusterMap map[string]*clusterData
|
clusterMap map[string]*clusterData
|
||||||
|
|
||||||
|
resyncPeriod time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClusterController(
|
func NewClusterController(
|
||||||
@@ -165,6 +167,7 @@ func NewClusterController(
|
|||||||
clusterInformer clusterinformer.ClusterInformer,
|
clusterInformer clusterinformer.ClusterInformer,
|
||||||
clusterClient clusterclient.ClusterInterface,
|
clusterClient clusterclient.ClusterInterface,
|
||||||
openpitrixClient openpitrix.Client,
|
openpitrixClient openpitrix.Client,
|
||||||
|
resyncPeriod time.Duration,
|
||||||
) *clusterController {
|
) *clusterController {
|
||||||
|
|
||||||
broadcaster := record.NewBroadcaster()
|
broadcaster := record.NewBroadcaster()
|
||||||
@@ -184,23 +187,18 @@ func NewClusterController(
|
|||||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cluster"),
|
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cluster"),
|
||||||
workerLoopPeriod: time.Second,
|
workerLoopPeriod: time.Second,
|
||||||
clusterMap: make(map[string]*clusterData),
|
clusterMap: make(map[string]*clusterData),
|
||||||
|
resyncPeriod: resyncPeriod,
|
||||||
}
|
}
|
||||||
|
|
||||||
c.clusterLister = clusterInformer.Lister()
|
c.clusterLister = clusterInformer.Lister()
|
||||||
c.clusterHasSynced = clusterInformer.Informer().HasSynced
|
c.clusterHasSynced = clusterInformer.Informer().HasSynced
|
||||||
|
|
||||||
clusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
clusterInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: c.addCluster,
|
AddFunc: c.addCluster,
|
||||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||||
newCluster := newObj.(*clusterv1alpha1.Cluster)
|
|
||||||
oldCluster := oldObj.(*clusterv1alpha1.Cluster)
|
|
||||||
if newCluster.ResourceVersion == oldCluster.ResourceVersion {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.addCluster(newObj)
|
c.addCluster(newObj)
|
||||||
},
|
},
|
||||||
DeleteFunc: c.addCluster,
|
DeleteFunc: c.addCluster,
|
||||||
})
|
}, resyncPeriod)
|
||||||
|
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
@@ -226,15 +224,11 @@ func (c *clusterController) Run(workers int, stopCh <-chan struct{}) error {
|
|||||||
|
|
||||||
// refresh cluster configz every 2 minutes
|
// refresh cluster configz every 2 minutes
|
||||||
go wait.Until(func() {
|
go wait.Until(func() {
|
||||||
if err := c.syncStatus(); err != nil {
|
|
||||||
klog.Errorf("Error periodically sync cluster status, %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := c.reconcileHostCluster(); err != nil {
|
if err := c.reconcileHostCluster(); err != nil {
|
||||||
klog.Errorf("Error create host cluster, error %v", err)
|
klog.Errorf("Error create host cluster, error %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
}, 2*time.Minute, stopCh)
|
}, c.resyncPeriod, stopCh)
|
||||||
|
|
||||||
<-stopCh
|
<-stopCh
|
||||||
return nil
|
return nil
|
||||||
@@ -354,6 +348,7 @@ func (c *clusterController) reconcileHostCluster() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *clusterController) syncCluster(key string) error {
|
func (c *clusterController) syncCluster(key string) error {
|
||||||
|
klog.V(5).Infof("starting to sync cluster %s", key)
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
|
||||||
_, name, err := cache.SplitMetaNamespaceKey(key)
|
_, name, err := cache.SplitMetaNamespaceKey(key)
|
||||||
|
|||||||
@@ -16,7 +16,13 @@ limitations under the License.
|
|||||||
|
|
||||||
package multicluster
|
package multicluster
|
||||||
|
|
||||||
import "github.com/spf13/pflag"
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/spf13/pflag"
|
||||||
|
)
|
||||||
|
|
||||||
|
const DefaultResyncPeriod = time.Duration(120) * time.Second
|
||||||
|
|
||||||
type Options struct {
|
type Options struct {
|
||||||
// Enable
|
// Enable
|
||||||
@@ -36,16 +42,20 @@ type Options struct {
|
|||||||
|
|
||||||
// AgentImage is the image used when generating deployment for all cluster agents.
|
// AgentImage is the image used when generating deployment for all cluster agents.
|
||||||
AgentImage string `json:"agentImage,omitempty"`
|
AgentImage string `json:"agentImage,omitempty"`
|
||||||
|
|
||||||
|
// ClusterControllerResyncSecond is the resync period used by cluster controller.
|
||||||
|
ClusterControllerResyncSecond time.Duration `json:"clusterControllerResyncSecond,omitempty" yaml:"clusterControllerResyncSecond"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOptions() returns a default nil options
|
// NewOptions() returns a default nil options
|
||||||
func NewOptions() *Options {
|
func NewOptions() *Options {
|
||||||
return &Options{
|
return &Options{
|
||||||
Enable: false,
|
Enable: false,
|
||||||
EnableFederation: false,
|
EnableFederation: false,
|
||||||
ProxyPublishAddress: "",
|
ProxyPublishAddress: "",
|
||||||
ProxyPublishService: "",
|
ProxyPublishService: "",
|
||||||
AgentImage: "kubesphere/tower:v1.0",
|
AgentImage: "kubesphere/tower:v1.0",
|
||||||
|
ClusterControllerResyncSecond: DefaultResyncPeriod,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -67,4 +77,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet, s *Options) {
|
|||||||
|
|
||||||
fs.StringVar(&o.AgentImage, "agent-image", s.AgentImage, ""+
|
fs.StringVar(&o.AgentImage, "agent-image", s.AgentImage, ""+
|
||||||
"This field is used when generating deployment yaml for agent.")
|
"This field is used when generating deployment yaml for agent.")
|
||||||
|
|
||||||
|
fs.DurationVar(&o.ClusterControllerResyncSecond, "cluster-controller-resync-second", s.ClusterControllerResyncSecond,
|
||||||
|
"Cluster controller resync second to sync cluster resource.")
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user