login record CRD (#2565)

* Signed-off-by: hongming <talonwan@yunify.com>

support ldap identity provider

Signed-off-by: hongming <talonwan@yunify.com>

* add login record

Signed-off-by: Jeff <zw0948@gmail.com>

Co-authored-by: hongming <talonwan@yunify.com>
This commit is contained in:
zryfish
2020-07-23 22:10:39 +08:00
committed by GitHub
parent 50a6c7b2b5
commit 3d74bb0589
51 changed files with 2163 additions and 548 deletions

View File

@@ -38,6 +38,7 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
iamv1alpha2 "kubesphere.io/kubesphere/pkg/apis/iam/v1alpha2"
authoptions "kubesphere.io/kubesphere/pkg/apiserver/authentication/options"
kubesphere "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
kubespherescheme "kubesphere.io/kubesphere/pkg/client/clientset/versioned/scheme"
iamv1alpha2informers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/iam/v1alpha2"
@@ -58,19 +59,25 @@ const (
// is synced successfully
messageResourceSynced = "User synced successfully"
controllerName = "user-controller"
// user finalizer
finalizer = "finalizers.kubesphere.io/users"
)
type Controller struct {
k8sClient kubernetes.Interface
ksClient kubesphere.Interface
kubeconfig kubeconfig.Interface
userInformer iamv1alpha2informers.UserInformer
userLister iamv1alpha2listers.UserLister
userSynced cache.InformerSynced
cmSynced cache.InformerSynced
fedUserCache cache.Store
fedUserController cache.Controller
ldapClient ldapclient.Interface
k8sClient kubernetes.Interface
ksClient kubesphere.Interface
kubeconfig kubeconfig.Interface
userInformer iamv1alpha2informers.UserInformer
userLister iamv1alpha2listers.UserLister
userSynced cache.InformerSynced
loginRecordInformer iamv1alpha2informers.LoginRecordInformer
loginRecordLister iamv1alpha2listers.LoginRecordLister
loginRecordSynced cache.InformerSynced
cmSynced cache.InformerSynced
fedUserCache cache.Store
fedUserController cache.Controller
ldapClient ldapclient.Interface
// workqueue is a rate limited work queue. This is used to queue work to be
// processed instead of performing it as soon as a change happens. This
// means we can ensure we only process a fixed amount of resources at a
@@ -79,15 +86,19 @@ type Controller struct {
workqueue workqueue.RateLimitingInterface
// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
recorder record.EventRecorder
multiClusterEnabled bool
recorder record.EventRecorder
authenticationOptions *authoptions.AuthenticationOptions
multiClusterEnabled bool
}
func NewController(k8sClient kubernetes.Interface, ksClient kubesphere.Interface,
func NewUserController(k8sClient kubernetes.Interface, ksClient kubesphere.Interface,
config *rest.Config, userInformer iamv1alpha2informers.UserInformer,
fedUserCache cache.Store, fedUserController cache.Controller,
loginRecordInformer iamv1alpha2informers.LoginRecordInformer,
configMapInformer corev1informers.ConfigMapInformer,
ldapClient ldapclient.Interface, multiClusterEnabled bool) *Controller {
ldapClient ldapclient.Interface,
authenticationOptions *authoptions.AuthenticationOptions,
multiClusterEnabled bool) *Controller {
// Create event broadcaster
// Add sample-controller types to the default Kubernetes Scheme so Events can be
// logged for sample-controller types.
@@ -103,19 +114,23 @@ func NewController(k8sClient kubernetes.Interface, ksClient kubesphere.Interface
kubeconfigOperator = kubeconfig.NewOperator(k8sClient, configMapInformer, config)
}
ctl := &Controller{
k8sClient: k8sClient,
ksClient: ksClient,
kubeconfig: kubeconfigOperator,
userInformer: userInformer,
userLister: userInformer.Lister(),
userSynced: userInformer.Informer().HasSynced,
cmSynced: configMapInformer.Informer().HasSynced,
fedUserCache: fedUserCache,
fedUserController: fedUserController,
ldapClient: ldapClient,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Users"),
recorder: recorder,
multiClusterEnabled: multiClusterEnabled,
k8sClient: k8sClient,
ksClient: ksClient,
kubeconfig: kubeconfigOperator,
userInformer: userInformer,
userLister: userInformer.Lister(),
userSynced: userInformer.Informer().HasSynced,
loginRecordInformer: loginRecordInformer,
loginRecordLister: loginRecordInformer.Lister(),
loginRecordSynced: loginRecordInformer.Informer().HasSynced,
cmSynced: configMapInformer.Informer().HasSynced,
fedUserCache: fedUserCache,
fedUserController: fedUserController,
ldapClient: ldapClient,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Users"),
recorder: recorder,
multiClusterEnabled: multiClusterEnabled,
authenticationOptions: authenticationOptions,
}
klog.Info("Setting up event handlers")
userInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -125,6 +140,18 @@ func NewController(k8sClient kubernetes.Interface, ksClient kubesphere.Interface
},
DeleteFunc: ctl.enqueueUser,
})
loginRecordInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(new interface{}) {
if username := new.(*iamv1alpha2.LoginRecord).Labels[iamv1alpha2.UserReferenceLabel]; username != "" {
ctl.workqueue.Add(username)
}
},
DeleteFunc: func(obj interface{}) {
if username := obj.(*iamv1alpha2.LoginRecord).Labels[iamv1alpha2.UserReferenceLabel]; username != "" {
ctl.workqueue.Add(username)
}
},
})
return ctl
}
@@ -139,7 +166,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
klog.Info("Waiting for informer caches to sync")
synced := make([]cache.InformerSynced, 0)
synced = append(synced, c.userSynced, c.cmSynced)
synced = append(synced, c.userSynced, c.loginRecordSynced, c.cmSynced)
if c.multiClusterEnabled {
synced = append(synced, c.fedUserController.HasSynced)
}
@@ -182,39 +209,19 @@ func (c *Controller) processNextWorkItem() bool {
return false
}
// We wrap this block in a func so we can defer c.workqueue.Done.
err := func(obj interface{}) error {
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.workqueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workqueue.
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
// Run the reconcile, passing it the namespace/name string of the
// Foo resource to be synced.
if err := c.reconcile(key); err != nil {
// Put the item back on the workqueue to handle any transient errors.
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
klog.Infof("Successfully synced %s:%s", "key", key)
return nil
@@ -246,9 +253,6 @@ func (c *Controller) reconcile(key string) error {
return err
}
// name of your custom finalizer
finalizer := "finalizers.kubesphere.io/users"
if user.ObjectMeta.DeletionTimestamp.IsZero() {
// The object is not being deleted, so if it does not have our finalizer,
// then lets add the finalizer and update the object.
@@ -274,12 +278,17 @@ func (c *Controller) reconcile(key string) error {
return err
}
if err = c.deleteLoginRecords(user); err != nil {
klog.Error(err)
return err
}
// remove our finalizer from the list and update it.
user.Finalizers = sliceutil.RemoveString(user.ObjectMeta.Finalizers, func(item string) bool {
return item == finalizer
})
if _, err := c.ksClient.IamV1alpha2().Users().Update(user); err != nil {
if user, err = c.ksClient.IamV1alpha2().Users().Update(user); err != nil {
return err
}
}
@@ -298,6 +307,11 @@ func (c *Controller) reconcile(key string) error {
return err
}
if user, err = c.syncUserStatus(user); err != nil {
klog.Error(err)
return err
}
if c.kubeconfig != nil {
// ensure user kubeconfig configmap is created
if err = c.kubeconfig.CreateKubeConfig(user); err != nil {
@@ -319,11 +333,11 @@ func (c *Controller) reconcile(key string) error {
}
func (c *Controller) Start(stopCh <-chan struct{}) error {
return c.Run(4, stopCh)
return c.Run(5, stopCh)
}
func (c *Controller) ensurePasswordIsEncrypted(user *iamv1alpha2.User) (*iamv1alpha2.User, error) {
encrypted, _ := strconv.ParseBool(user.Annotations[iamv1alpha2.PasswordEncryptedAnnotation])
encrypted := user.Annotations[iamv1alpha2.PasswordEncryptedAnnotation] == "true"
// password is not encrypted
if !encrypted {
password, err := encrypt(user.Spec.EncryptedPassword)
@@ -337,7 +351,10 @@ func (c *Controller) ensurePasswordIsEncrypted(user *iamv1alpha2.User) (*iamv1al
user.Annotations = make(map[string]string, 0)
}
user.Annotations[iamv1alpha2.PasswordEncryptedAnnotation] = "true"
user.Status.State = iamv1alpha2.UserActive
user.Status = iamv1alpha2.UserStatus{
State: iamv1alpha2.UserActive,
LastTransitionTime: &metav1.Time{Time: time.Now()},
}
return c.ksClient.IamV1alpha2().Users().Update(user)
}
@@ -382,15 +399,10 @@ func (c *Controller) multiClusterSync(user *iamv1alpha2.User) error {
}
if !reflect.DeepEqual(federatedUser.Spec.Template.Spec, user.Spec) ||
!reflect.DeepEqual(federatedUser.Spec.Template.Status, user.Status) ||
!reflect.DeepEqual(federatedUser.Labels, user.Labels) ||
!reflect.DeepEqual(federatedUser.Annotations, user.Annotations) {
!reflect.DeepEqual(federatedUser.Spec.Template.Status, user.Status) {
federatedUser.Labels = user.Labels
federatedUser.Spec.Template.Spec = user.Spec
federatedUser.Spec.Template.Status = user.Status
federatedUser.Spec.Template.Labels = user.Labels
federatedUser.Spec.Template.Annotations = user.Annotations
return c.updateFederatedUser(&federatedUser)
}
@@ -408,10 +420,6 @@ func (c *Controller) createFederatedUser(user *iamv1alpha2.User) error {
},
Spec: iamv1alpha2.FederatedUserSpec{
Template: iamv1alpha2.UserTemplate{
ObjectMeta: metav1.ObjectMeta{
Labels: user.Labels,
Annotations: user.Annotations,
},
Spec: user.Spec,
Status: user.Status,
},
@@ -531,6 +539,81 @@ func (c *Controller) deleteRoleBindings(user *iamv1alpha2.User) error {
return nil
}
func (c *Controller) deleteLoginRecords(user *iamv1alpha2.User) error {
listOptions := metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{iamv1alpha2.UserReferenceLabel: user.Name}).String(),
}
deleteOptions := metav1.NewDeleteOptions(0)
if err := c.ksClient.IamV1alpha2().LoginRecords().
DeleteCollection(deleteOptions, listOptions); err != nil {
klog.Error(err)
return err
}
return nil
}
// syncUserStatus will reconcile user state based on user login records
func (c *Controller) syncUserStatus(user *iamv1alpha2.User) (*iamv1alpha2.User, error) {
// disabled user, nothing to do
if user == nil || (user.Status.State == iamv1alpha2.UserDisabled) {
return user, nil
}
// blocked user, check if need to unblock user
if user.Status.State == iamv1alpha2.UserAuthLimitExceeded {
if user.Status.LastTransitionTime != nil &&
user.Status.LastTransitionTime.Add(c.authenticationOptions.AuthenticateRateLimiterDuration).After(time.Now()) {
expected := user.DeepCopy()
// unblock user
if user.Annotations[iamv1alpha2.PasswordEncryptedAnnotation] == "true" {
expected.Status = iamv1alpha2.UserStatus{
State: iamv1alpha2.UserActive,
LastTransitionTime: &metav1.Time{Time: time.Now()},
}
}
if !reflect.DeepEqual(expected.Status, user.Status) {
return c.ksClient.IamV1alpha2().Users().Update(expected)
}
}
}
// normal user, check user's login records see if we need to block
records, err := c.loginRecordLister.List(labels.SelectorFromSet(labels.Set{iamv1alpha2.UserReferenceLabel: user.Name}))
if err != nil {
klog.Error(err)
return nil, err
}
// count failed login attempts during last AuthenticateRateLimiterDuration
now := time.Now()
failedLoginAttempts := 0
for _, loginRecord := range records {
if loginRecord.Spec.Type == iamv1alpha2.LoginFailure &&
loginRecord.CreationTimestamp.Add(c.authenticationOptions.AuthenticateRateLimiterDuration).After(now) {
failedLoginAttempts++
}
}
// block user if failed login attempts exceeds maximum tries setting
if failedLoginAttempts >= c.authenticationOptions.AuthenticateRateLimiterMaxTries {
expect := user.DeepCopy()
expect.Status = iamv1alpha2.UserStatus{
State: iamv1alpha2.UserAuthLimitExceeded,
Reason: fmt.Sprintf("Failed login attempts exceed %d in last %s", failedLoginAttempts, c.authenticationOptions.AuthenticateRateLimiterDuration),
LastTransitionTime: &metav1.Time{Time: time.Now()},
}
// block user for AuthenticateRateLimiterDuration duration, after that put it back to the queue to unblock
c.workqueue.AddAfter(user.Name, c.authenticationOptions.AuthenticateRateLimiterDuration)
return c.ksClient.IamV1alpha2().Users().Update(expect)
}
return user, nil
}
func encrypt(password string) (string, error) {
// when user is already mapped to another identity, password is empty by default
// unable to log in directly until password reset