fix: cluster list granted to users is incorrect

This commit is contained in:
hongming
2022-05-20 17:12:55 +08:00
parent 32ac94a7e5
commit 382be8b16b
11 changed files with 176 additions and 142 deletions

View File

@@ -25,6 +25,7 @@ import (
"fmt"
"net/http"
"reflect"
"strings"
"time"
"gopkg.in/yaml.v2"
@@ -48,11 +49,13 @@ import (
fedv1b1 "sigs.k8s.io/kubefed/pkg/apis/core/v1beta1"
clusterv1alpha1 "kubesphere.io/api/cluster/v1alpha1"
iamv1alpha2 "kubesphere.io/api/iam/v1alpha2"
"kubesphere.io/kubesphere/pkg/apiserver/config"
clusterclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned/typed/cluster/v1alpha1"
kubesphere "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
clusterinformer "kubesphere.io/kubesphere/pkg/client/informers/externalversions/cluster/v1alpha1"
clusterlister "kubesphere.io/kubesphere/pkg/client/listers/cluster/v1alpha1"
iamv1alpha2listers "kubesphere.io/kubesphere/pkg/client/listers/iam/v1alpha2"
"kubesphere.io/kubesphere/pkg/constants"
"kubesphere.io/kubesphere/pkg/simple/client/multicluster"
"kubesphere.io/kubesphere/pkg/utils/k8sutil"
@@ -126,12 +129,13 @@ type clusterController struct {
eventRecorder record.EventRecorder
// build this only for host cluster
client kubernetes.Interface
k8sClient kubernetes.Interface
hostConfig *rest.Config
clusterClient clusterclient.ClusterInterface
ksClient kubesphere.Interface
clusterLister clusterlister.ClusterLister
userLister iamv1alpha2listers.UserLister
clusterHasSynced cache.InformerSynced
queue workqueue.RateLimitingInterface
@@ -144,10 +148,11 @@ type clusterController struct {
}
func NewClusterController(
client kubernetes.Interface,
k8sClient kubernetes.Interface,
ksClient kubesphere.Interface,
config *rest.Config,
clusterInformer clusterinformer.ClusterInformer,
clusterClient clusterclient.ClusterInterface,
userLister iamv1alpha2listers.UserLister,
resyncPeriod time.Duration,
hostClusterName string,
) *clusterController {
@@ -156,19 +161,20 @@ func NewClusterController(
broadcaster.StartLogging(func(format string, args ...interface{}) {
klog.Info(fmt.Sprintf(format, args))
})
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: client.CoreV1().Events("")})
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: k8sClient.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cluster-controller"})
c := &clusterController{
eventBroadcaster: broadcaster,
eventRecorder: recorder,
client: client,
k8sClient: k8sClient,
ksClient: ksClient,
hostConfig: config,
clusterClient: clusterClient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cluster"),
workerLoopPeriod: time.Second,
resyncPeriod: resyncPeriod,
hostClusterName: hostClusterName,
userLister: userLister,
}
c.clusterLister = clusterInformer.Lister()
c.clusterHasSynced = clusterInformer.Informer().HasSynced
@@ -213,7 +219,7 @@ func (c *clusterController) Run(workers int, stopCh <-chan struct{}) error {
klog.Errorf("Error create host cluster, error %v", err)
}
if err := c.probeClusters(); err != nil {
if err := c.resyncClusters(); err != nil {
klog.Errorf("failed to reconcile cluster ready status, err: %v", err)
}
}, c.resyncPeriod, stopCh)
@@ -256,7 +262,7 @@ func (c *clusterController) reconcileHostCluster() error {
if len(clusters) == 0 {
hostCluster.Spec.Connection.KubeConfig = hostKubeConfig
hostCluster.Name = c.hostClusterName
_, err = c.clusterClient.Create(context.TODO(), hostCluster, metav1.CreateOptions{})
_, err = c.ksClient.ClusterV1alpha1().Clusters().Create(context.TODO(), hostCluster, metav1.CreateOptions{})
return err
} else if len(clusters) > 1 {
return fmt.Errorf("there MUST not be more than one host clusters, while there are %d", len(clusters))
@@ -280,18 +286,20 @@ func (c *clusterController) reconcileHostCluster() error {
}
// update host cluster config
_, err = c.clusterClient.Update(context.TODO(), cluster, metav1.UpdateOptions{})
_, err = c.ksClient.ClusterV1alpha1().Clusters().Update(context.TODO(), cluster, metav1.UpdateOptions{})
return err
}
func (c *clusterController) probeClusters() error {
func (c *clusterController) resyncClusters() error {
clusters, err := c.clusterLister.List(labels.Everything())
if err != nil {
return err
}
for _, cluster := range clusters {
c.syncCluster(cluster.Name)
if err = c.syncCluster(cluster.Name); err != nil {
klog.Warningf("failed to sync cluster %s: %s", cluster.Name, err)
}
}
return nil
}
@@ -328,7 +336,7 @@ func (c *clusterController) syncCluster(key string) error {
// registering our finalizer.
if !sets.NewString(cluster.ObjectMeta.Finalizers...).Has(clusterv1alpha1.Finalizer) {
cluster.ObjectMeta.Finalizers = append(cluster.ObjectMeta.Finalizers, clusterv1alpha1.Finalizer)
if cluster, err = c.clusterClient.Update(context.TODO(), cluster, metav1.UpdateOptions{}); err != nil {
if cluster, err = c.ksClient.ClusterV1alpha1().Clusters().Update(context.TODO(), cluster, metav1.UpdateOptions{}); err != nil {
return err
}
}
@@ -338,17 +346,21 @@ func (c *clusterController) syncCluster(key string) error {
// need to unJoin federation first, before there are
// some cleanup work to do in member cluster which depends
// agent to proxy traffic
err = c.unJoinFederation(nil, name)
if err != nil {
if err = c.unJoinFederation(nil, name); err != nil {
klog.Errorf("Failed to unjoin federation for cluster %s, error %v", name, err)
return err
}
// cleanup after cluster has been deleted
if err := c.syncClusterMembers(nil, cluster); err != nil {
klog.Errorf("Failed to sync cluster members for %s: %v", name, err)
return err
}
// remove our cluster finalizer
finalizers := sets.NewString(cluster.ObjectMeta.Finalizers...)
finalizers.Delete(clusterv1alpha1.Finalizer)
cluster.ObjectMeta.Finalizers = finalizers.List()
if _, err = c.clusterClient.Update(context.TODO(), cluster, metav1.UpdateOptions{}); err != nil {
if _, err = c.ksClient.ClusterV1alpha1().Clusters().Update(context.TODO(), cluster, metav1.UpdateOptions{}); err != nil {
return err
}
}
@@ -407,7 +419,7 @@ func (c *clusterController) syncCluster(key string) error {
}
c.updateClusterCondition(cluster, federationNotReadyCondition)
_, err = c.clusterClient.Update(context.TODO(), cluster, metav1.UpdateOptions{})
_, err = c.ksClient.ClusterV1alpha1().Clusters().Update(context.TODO(), cluster, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update cluster status, %#v", err)
}
@@ -496,8 +508,8 @@ func (c *clusterController) syncCluster(key string) error {
return err
}
if !reflect.DeepEqual(oldCluster, cluster) {
_, err = c.clusterClient.Update(context.TODO(), cluster, metav1.UpdateOptions{})
if !reflect.DeepEqual(oldCluster.Status, cluster.Status) {
_, err = c.ksClient.ClusterV1alpha1().Clusters().Update(context.TODO(), cluster, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update cluster status, %#v", err)
return err
@@ -508,6 +520,10 @@ func (c *clusterController) syncCluster(key string) error {
return err
}
if err = c.syncClusterMembers(clusterClient, cluster); err != nil {
return fmt.Errorf("failed to sync cluster membership for %s: %s", cluster.Name, err)
}
return nil
}
@@ -541,7 +557,7 @@ func (c *clusterController) setClusterNameInConfigMap(client kubernetes.Interfac
}
func (c *clusterController) checkIfClusterIsHostCluster(memberClusterNodes *v1.NodeList) bool {
hostNodes, err := c.client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
hostNodes, err := c.k8sClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
return false
}
@@ -786,3 +802,55 @@ func (c *clusterController) updateKubeConfigExpirationDateCondition(cluster *clu
})
return nil
}
// syncClusterMembers Sync granted clusters for users periodically
func (c *clusterController) syncClusterMembers(clusterClient *kubernetes.Clientset, cluster *clusterv1alpha1.Cluster) error {
users, err := c.userLister.List(labels.Everything())
if err != nil {
return fmt.Errorf("failed to list users: %s", err)
}
grantedUsers := sets.NewString()
clusterName := cluster.Name
if cluster.DeletionTimestamp.IsZero() {
list, err := clusterClient.RbacV1().ClusterRoleBindings().List(context.Background(),
metav1.ListOptions{LabelSelector: iamv1alpha2.UserReferenceLabel})
if err != nil {
return fmt.Errorf("failed to list clusterrolebindings: %s", err)
}
for _, clusterRoleBinding := range list.Items {
for _, sub := range clusterRoleBinding.Subjects {
if sub.Kind == iamv1alpha2.ResourceKindUser {
grantedUsers.Insert(sub.Name)
}
}
}
}
for _, user := range users {
user = user.DeepCopy()
grantedClustersAnnotation := user.Annotations[iamv1alpha2.GrantedClustersAnnotation]
var grantedClusters sets.String
if len(grantedClustersAnnotation) > 0 {
grantedClusters = sets.NewString(strings.Split(grantedClustersAnnotation, ",")...)
} else {
grantedClusters = sets.NewString()
}
if grantedUsers.Has(user.Name) && !grantedClusters.Has(clusterName) {
grantedClusters.Insert(clusterName)
} else if !grantedUsers.Has(user.Name) && grantedClusters.Has(clusterName) {
grantedClusters.Delete(clusterName)
}
grantedClustersAnnotation = strings.Join(grantedClusters.List(), ",")
if user.Annotations[iamv1alpha2.GrantedClustersAnnotation] != grantedClustersAnnotation {
if user.Annotations == nil {
user.Annotations = make(map[string]string, 0)
}
user.Annotations[iamv1alpha2.GrantedClustersAnnotation] = grantedClustersAnnotation
if _, err := c.ksClient.IamV1alpha2().Users().Update(context.Background(), user, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update user %s: %s", user.Name, err)
}
}
}
return nil
}