update notification manager to v2.0 (#5030)
Signed-off-by: wanjunlei <wanjunlei@kubesphere.io>
This commit is contained in:
@@ -36,14 +36,14 @@ import (
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog"
|
||||
"kubesphere.io/api/cluster/v1alpha1"
|
||||
"kubesphere.io/api/notification/v2beta2"
|
||||
"kubesphere.io/api/types/v1beta1"
|
||||
"kubesphere.io/api/types/v1beta2"
|
||||
"sigs.k8s.io/controller-runtime/pkg/cache"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
||||
|
||||
"kubesphere.io/api/cluster/v1alpha1"
|
||||
"kubesphere.io/api/notification/v2beta1"
|
||||
"kubesphere.io/api/types/v1beta1"
|
||||
|
||||
"kubesphere.io/kubesphere/pkg/constants"
|
||||
)
|
||||
|
||||
@@ -100,8 +100,10 @@ func (c *Controller) setEventHandlers() error {
|
||||
if c.reconciledObjs != nil && len(c.reconciledObjs) > 0 {
|
||||
c.reconciledObjs = c.reconciledObjs[:0]
|
||||
}
|
||||
c.reconciledObjs = append(c.reconciledObjs, &v2beta1.Config{})
|
||||
c.reconciledObjs = append(c.reconciledObjs, &v2beta1.Receiver{})
|
||||
c.reconciledObjs = append(c.reconciledObjs, &v2beta2.Config{})
|
||||
c.reconciledObjs = append(c.reconciledObjs, &v2beta2.Receiver{})
|
||||
c.reconciledObjs = append(c.reconciledObjs, &v2beta2.Router{})
|
||||
c.reconciledObjs = append(c.reconciledObjs, &v2beta2.Silence{})
|
||||
c.reconciledObjs = append(c.reconciledObjs, &corev1.Secret{})
|
||||
|
||||
if c.informerSynced != nil && len(c.informerSynced) > 0 {
|
||||
@@ -280,10 +282,14 @@ func (c *Controller) multiClusterSync(ctx context.Context, obj client.Object) er
|
||||
}
|
||||
|
||||
switch obj := obj.(type) {
|
||||
case *v2beta1.Config:
|
||||
case *v2beta2.Config:
|
||||
return c.syncFederatedConfig(obj)
|
||||
case *v2beta1.Receiver:
|
||||
case *v2beta2.Receiver:
|
||||
return c.syncFederatedReceiver(obj)
|
||||
case *v2beta2.Router:
|
||||
return c.syncFederatedRouter(obj)
|
||||
case *v2beta2.Silence:
|
||||
return c.syncFederatedSilence(obj)
|
||||
case *corev1.Secret:
|
||||
return c.syncFederatedSecret(obj)
|
||||
default:
|
||||
@@ -292,28 +298,28 @@ func (c *Controller) multiClusterSync(ctx context.Context, obj client.Object) er
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) syncFederatedConfig(obj *v2beta1.Config) error {
|
||||
func (c *Controller) syncFederatedConfig(obj *v2beta2.Config) error {
|
||||
|
||||
fedObj := &v1beta1.FederatedNotificationConfig{}
|
||||
fedObj := &v1beta2.FederatedNotificationConfig{}
|
||||
err := c.Get(context.Background(), client.ObjectKey{Name: obj.Name}, fedObj)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
fedObj = &v1beta1.FederatedNotificationConfig{
|
||||
fedObj = &v1beta2.FederatedNotificationConfig{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: v1beta1.FederatedNotificationConfigKind,
|
||||
APIVersion: v1beta1.SchemeGroupVersion.String(),
|
||||
Kind: v1beta2.FederatedNotificationConfigKind,
|
||||
APIVersion: v1beta2.SchemeGroupVersion.String(),
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: obj.Name,
|
||||
},
|
||||
Spec: v1beta1.FederatedNotificationConfigSpec{
|
||||
Template: v1beta1.NotificationConfigTemplate{
|
||||
Spec: v1beta2.FederatedNotificationConfigSpec{
|
||||
Template: v1beta2.NotificationConfigTemplate{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: obj.Labels,
|
||||
},
|
||||
Spec: obj.Spec,
|
||||
},
|
||||
Placement: v1beta1.GenericPlacementFields{
|
||||
Placement: v1beta2.GenericPlacementFields{
|
||||
ClusterSelector: &metav1.LabelSelector{},
|
||||
},
|
||||
},
|
||||
@@ -350,28 +356,28 @@ func (c *Controller) syncFederatedConfig(obj *v2beta1.Config) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) syncFederatedReceiver(obj *v2beta1.Receiver) error {
|
||||
func (c *Controller) syncFederatedReceiver(obj *v2beta2.Receiver) error {
|
||||
|
||||
fedObj := &v1beta1.FederatedNotificationReceiver{}
|
||||
fedObj := &v1beta2.FederatedNotificationReceiver{}
|
||||
err := c.Get(context.Background(), client.ObjectKey{Name: obj.Name}, fedObj)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
fedObj = &v1beta1.FederatedNotificationReceiver{
|
||||
fedObj = &v1beta2.FederatedNotificationReceiver{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: v1beta1.FederatedNotificationReceiverKind,
|
||||
APIVersion: v1beta1.SchemeGroupVersion.String(),
|
||||
Kind: v1beta2.FederatedNotificationReceiverKind,
|
||||
APIVersion: v1beta2.SchemeGroupVersion.String(),
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: obj.Name,
|
||||
},
|
||||
Spec: v1beta1.FederatedNotificationReceiverSpec{
|
||||
Template: v1beta1.NotificationReceiverTemplate{
|
||||
Spec: v1beta2.FederatedNotificationReceiverSpec{
|
||||
Template: v1beta2.NotificationReceiverTemplate{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: obj.Labels,
|
||||
},
|
||||
Spec: obj.Spec,
|
||||
},
|
||||
Placement: v1beta1.GenericPlacementFields{
|
||||
Placement: v1beta2.GenericPlacementFields{
|
||||
ClusterSelector: &metav1.LabelSelector{},
|
||||
},
|
||||
},
|
||||
@@ -408,6 +414,122 @@ func (c *Controller) syncFederatedReceiver(obj *v2beta1.Receiver) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) syncFederatedRouter(obj *v2beta2.Router) error {
|
||||
|
||||
fedObj := &v1beta2.FederatedNotificationRouter{}
|
||||
err := c.Get(context.Background(), client.ObjectKey{Name: obj.Name}, fedObj)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
fedObj = &v1beta2.FederatedNotificationRouter{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: v1beta2.FederatedNotificationReceiverKind,
|
||||
APIVersion: v1beta2.SchemeGroupVersion.String(),
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: obj.Name,
|
||||
},
|
||||
Spec: v1beta2.FederatedNotificationRouterSpec{
|
||||
Template: v1beta2.NotificationRouterTemplate{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: obj.Labels,
|
||||
},
|
||||
Spec: obj.Spec,
|
||||
},
|
||||
Placement: v1beta2.GenericPlacementFields{
|
||||
ClusterSelector: &metav1.LabelSelector{},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err = controllerutil.SetControllerReference(obj, fedObj, scheme.Scheme)
|
||||
if err != nil {
|
||||
klog.Errorf("FederatedNotificationRouter '%s' SetControllerReference failed, %s", obj.Name, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if err = c.Create(context.Background(), fedObj); err != nil {
|
||||
klog.Errorf("create FederatedNotificationRouter '%s' failed, %s", obj.Name, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
klog.Errorf("get FederatedNotificationRouter '%s' failed, %s", obj.Name, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(fedObj.Spec.Template.Labels, obj.Labels) || !reflect.DeepEqual(fedObj.Spec.Template.Spec, obj.Spec) {
|
||||
|
||||
fedObj.Spec.Template.Spec = obj.Spec
|
||||
fedObj.Spec.Template.Labels = obj.Labels
|
||||
|
||||
if err := c.Update(context.Background(), fedObj); err != nil {
|
||||
klog.Errorf("update FederatedNotificationRouter '%s' failed, %s", obj.Name, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) syncFederatedSilence(obj *v2beta2.Silence) error {
|
||||
|
||||
fedObj := &v1beta2.FederatedNotificationSilence{}
|
||||
err := c.Get(context.Background(), client.ObjectKey{Name: obj.Name}, fedObj)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
fedObj = &v1beta2.FederatedNotificationSilence{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: v1beta2.FederatedNotificationReceiverKind,
|
||||
APIVersion: v1beta2.SchemeGroupVersion.String(),
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: obj.Name,
|
||||
},
|
||||
Spec: v1beta2.FederatedNotificationSilenceSpec{
|
||||
Template: v1beta2.NotificationSilenceTemplate{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: obj.Labels,
|
||||
},
|
||||
Spec: obj.Spec,
|
||||
},
|
||||
Placement: v1beta2.GenericPlacementFields{
|
||||
ClusterSelector: &metav1.LabelSelector{},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err = controllerutil.SetControllerReference(obj, fedObj, scheme.Scheme)
|
||||
if err != nil {
|
||||
klog.Errorf("FederatedNotificationSilence '%s' SetControllerReference failed, %s", obj.Name, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if err = c.Create(context.Background(), fedObj); err != nil {
|
||||
klog.Errorf("create FederatedNotificationSilence '%s' failed, %s", obj.Name, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
klog.Errorf("get FederatedNotificationSilence '%s' failed, %s", obj.Name, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(fedObj.Spec.Template.Labels, obj.Labels) || !reflect.DeepEqual(fedObj.Spec.Template.Spec, obj.Spec) {
|
||||
|
||||
fedObj.Spec.Template.Spec = obj.Spec
|
||||
fedObj.Spec.Template.Labels = obj.Labels
|
||||
|
||||
if err := c.Update(context.Background(), fedObj); err != nil {
|
||||
klog.Errorf("update FederatedNotificationSilence '%s' failed, %s", obj.Name, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) syncFederatedSecret(obj *corev1.Secret) error {
|
||||
|
||||
fedObj := &v1beta1.FederatedSecret{}
|
||||
|
||||
@@ -26,20 +26,19 @@ import (
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
k8sinformers "k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
fakek8s "k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"kubesphere.io/api/cluster/v1alpha1"
|
||||
"kubesphere.io/api/notification/v2beta2"
|
||||
"kubesphere.io/api/types/v1beta1"
|
||||
"kubesphere.io/api/types/v1beta2"
|
||||
"sigs.k8s.io/controller-runtime/pkg/cache"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||
"sigs.k8s.io/controller-runtime/pkg/envtest/printer"
|
||||
|
||||
"kubesphere.io/api/cluster/v1alpha1"
|
||||
"kubesphere.io/api/notification/v2beta1"
|
||||
"kubesphere.io/api/types/v1beta1"
|
||||
|
||||
k8sinformers "k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
|
||||
"kubesphere.io/kubesphere/pkg/apis"
|
||||
"kubesphere.io/kubesphere/pkg/constants"
|
||||
)
|
||||
@@ -52,12 +51,13 @@ func TestSource(t *testing.T) {
|
||||
|
||||
var (
|
||||
_ = Describe("Secret", func() {
|
||||
v2beta1.AddToScheme(scheme.Scheme)
|
||||
apis.AddToScheme(scheme.Scheme)
|
||||
_ = v2beta2.AddToScheme(scheme.Scheme)
|
||||
_ = apis.AddToScheme(scheme.Scheme)
|
||||
_ = v1beta2.AddToScheme(scheme.Scheme)
|
||||
|
||||
secret := &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "foo",
|
||||
Name: "secret-foo",
|
||||
Namespace: constants.NotificationSecretNamespace,
|
||||
Labels: map[string]string{
|
||||
constants.NotificationManagedLabel: "true",
|
||||
@@ -65,24 +65,39 @@ var (
|
||||
},
|
||||
}
|
||||
|
||||
config := &v2beta1.Config{
|
||||
config := &v2beta2.Config{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "foo",
|
||||
Name: "config-foo",
|
||||
Labels: map[string]string{
|
||||
"type": "global",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
receiver := &v2beta1.Receiver{
|
||||
receiver := &v2beta2.Receiver{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "foo",
|
||||
Name: "receiver-foo",
|
||||
Labels: map[string]string{
|
||||
"type": "default",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
router := &v2beta2.Router{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "router-foo",
|
||||
},
|
||||
}
|
||||
|
||||
silence := &v2beta2.Silence{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "silence-foo",
|
||||
Labels: map[string]string{
|
||||
"type": "global",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
host := &v1alpha1.Cluster{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "host",
|
||||
@@ -144,7 +159,7 @@ var (
|
||||
Expect(cl.Create(context.Background(), config)).Should(Succeed())
|
||||
Expect(r.reconcile(config)).Should(Succeed())
|
||||
|
||||
fedConfig := &v1beta1.FederatedNotificationConfig{}
|
||||
fedConfig := &v1beta2.FederatedNotificationConfig{}
|
||||
By("Expecting to create federated object successfully")
|
||||
err = ksCache.Get(context.Background(), client.ObjectKey{Name: config.Name}, fedConfig)
|
||||
Expect(err).Should(Succeed())
|
||||
@@ -166,7 +181,7 @@ var (
|
||||
Expect(cl.Create(context.Background(), receiver)).Should(Succeed())
|
||||
Expect(r.reconcile(receiver)).Should(Succeed())
|
||||
|
||||
fedReceiver := &v1beta1.FederatedNotificationReceiver{}
|
||||
fedReceiver := &v1beta2.FederatedNotificationReceiver{}
|
||||
By("Expecting to create federated object successfully")
|
||||
err = ksCache.Get(context.Background(), client.ObjectKey{Name: receiver.Name}, fedReceiver)
|
||||
Expect(err).Should(Succeed())
|
||||
@@ -185,6 +200,52 @@ var (
|
||||
Expect(err).Should(Succeed())
|
||||
Expect(fedReceiver.Spec.Template.Labels["foo"]).Should(Equal("bar"))
|
||||
|
||||
// Create a router
|
||||
Expect(cl.Create(context.Background(), router)).Should(Succeed())
|
||||
Expect(r.reconcile(router)).Should(Succeed())
|
||||
|
||||
fedRouter := &v1beta2.FederatedNotificationRouter{}
|
||||
By("Expecting to create federated object successfully")
|
||||
err = ksCache.Get(context.Background(), client.ObjectKey{Name: router.Name}, fedRouter)
|
||||
Expect(err).Should(Succeed())
|
||||
Expect(fedRouter.Name).Should(Equal(router.Name))
|
||||
|
||||
// Update a receiver
|
||||
err = ksCache.Get(context.Background(), client.ObjectKey{Name: router.Name}, router)
|
||||
Expect(err).Should(Succeed())
|
||||
router.Labels = map[string]string{"foo": "bar"}
|
||||
Expect(cl.Update(context.Background(), router)).Should(Succeed())
|
||||
Expect(r.reconcile(router)).Should(Succeed())
|
||||
|
||||
By("Expecting to update federated object successfully")
|
||||
|
||||
err = ksCache.Get(context.Background(), client.ObjectKey{Name: router.Name}, fedRouter)
|
||||
Expect(err).Should(Succeed())
|
||||
Expect(fedRouter.Spec.Template.Labels["foo"]).Should(Equal("bar"))
|
||||
|
||||
// Create a receiver
|
||||
Expect(cl.Create(context.Background(), silence)).Should(Succeed())
|
||||
Expect(r.reconcile(silence)).Should(Succeed())
|
||||
|
||||
fedSilence := &v1beta2.FederatedNotificationSilence{}
|
||||
By("Expecting to create federated object successfully")
|
||||
err = ksCache.Get(context.Background(), client.ObjectKey{Name: silence.Name}, fedSilence)
|
||||
Expect(err).Should(Succeed())
|
||||
Expect(fedSilence.Name).Should(Equal(silence.Name))
|
||||
|
||||
// Update a receiver
|
||||
err = ksCache.Get(context.Background(), client.ObjectKey{Name: silence.Name}, silence)
|
||||
Expect(err).Should(Succeed())
|
||||
silence.Labels = map[string]string{"foo": "bar"}
|
||||
Expect(cl.Update(context.Background(), silence)).Should(Succeed())
|
||||
Expect(r.reconcile(silence)).Should(Succeed())
|
||||
|
||||
By("Expecting to update federated object successfully")
|
||||
|
||||
err = ksCache.Get(context.Background(), client.ObjectKey{Name: silence.Name}, fedSilence)
|
||||
Expect(err).Should(Succeed())
|
||||
Expect(fedSilence.Spec.Template.Labels["foo"]).Should(Equal("bar"))
|
||||
|
||||
// Add a cluster
|
||||
Expect(cl.Create(informerCacheCtx, host)).Should(Succeed())
|
||||
Expect(r.reconcile(secret)).Should(Succeed())
|
||||
@@ -220,24 +281,24 @@ type fakeCache struct {
|
||||
}
|
||||
|
||||
// GetInformerForKind returns the informer for the GroupVersionKind
|
||||
func (f *fakeCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (cache.Informer, error) {
|
||||
func (f *fakeCache) GetInformerForKind(_ context.Context, _ schema.GroupVersionKind) (cache.Informer, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// GetInformer returns the informer for the obj
|
||||
func (f *fakeCache) GetInformer(ctx context.Context, obj client.Object) (cache.Informer, error) {
|
||||
func (f *fakeCache) GetInformer(_ context.Context, _ client.Object) (cache.Informer, error) {
|
||||
fakeInformerFactory := k8sinformers.NewSharedInformerFactory(f.K8sClient, defaultResync)
|
||||
return fakeInformerFactory.Core().V1().Namespaces().Informer(), nil
|
||||
}
|
||||
|
||||
func (f *fakeCache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
|
||||
func (f *fakeCache) IndexField(_ context.Context, _ client.Object, _ string, _ client.IndexerFunc) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeCache) Start(ctx context.Context) error {
|
||||
func (f *fakeCache) Start(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeCache) WaitForCacheSync(ctx context.Context) bool {
|
||||
func (f *fakeCache) WaitForCacheSync(_ context.Context) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user