Add proxy devops APIs request to ks-devops

move devops controllers into ks-devops

Signed-off-by: rick <1450685+LinuxSuRen@users.noreply.github.com>
This commit is contained in:
rick
2021-08-05 19:03:46 +08:00
parent 418a2a09c7
commit 6d9cf166c6
27 changed files with 4340 additions and 12342 deletions

View File

@@ -1,333 +0,0 @@
/*
Copyright 2020 KubeSphere Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package devopscredential
import (
"context"
"fmt"
"net/http"
"reflect"
"strings"
"time"
"github.com/emicklei/go-restful"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
corev1informer "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corev1lister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
devopsv1alpha3 "kubesphere.io/api/devops/v1alpha3"
kubesphereclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
"kubesphere.io/kubesphere/pkg/constants"
"kubesphere.io/kubesphere/pkg/controller/utils"
modelsdevops "kubesphere.io/kubesphere/pkg/models/devops"
devopsClient "kubesphere.io/kubesphere/pkg/simple/client/devops"
"kubesphere.io/kubesphere/pkg/utils/k8sutil"
"kubesphere.io/kubesphere/pkg/utils/sliceutil"
)
/**
DevOps project controller is used to maintain the state of the DevOps project.
*/
type Controller struct {
client clientset.Interface
kubesphereClient kubesphereclient.Interface
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
secretLister corev1lister.SecretLister
secretSynced cache.InformerSynced
namespaceLister corev1lister.NamespaceLister
namespaceSynced cache.InformerSynced
workqueue workqueue.RateLimitingInterface
workerLoopPeriod time.Duration
devopsClient devopsClient.Interface
}
func NewController(client clientset.Interface,
devopsClient devopsClient.Interface,
namespaceInformer corev1informer.NamespaceInformer,
secretInformer corev1informer.SecretInformer) *Controller {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(func(format string, args ...interface{}) {
klog.Info(fmt.Sprintf(format, args))
})
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "devopscredential-controller"})
v := &Controller{
client: client,
devopsClient: devopsClient,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "devopscredential"),
secretLister: secretInformer.Lister(),
secretSynced: secretInformer.Informer().HasSynced,
namespaceLister: namespaceInformer.Lister(),
namespaceSynced: namespaceInformer.Informer().HasSynced,
workerLoopPeriod: time.Second,
}
v.eventBroadcaster = broadcaster
v.eventRecorder = recorder
secretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
secret, ok := obj.(*v1.Secret)
if ok && strings.HasPrefix(string(secret.Type), devopsv1alpha3.DevOpsCredentialPrefix) {
v.enqueueSecret(obj)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
old, ook := oldObj.(*v1.Secret)
new, nok := newObj.(*v1.Secret)
if ook && nok && old.ResourceVersion == new.ResourceVersion {
return
}
if ook && nok && strings.HasPrefix(string(new.Type), devopsv1alpha3.DevOpsCredentialPrefix) {
v.enqueueSecret(newObj)
}
},
DeleteFunc: func(obj interface{}) {
secret, ok := obj.(*v1.Secret)
if ok && strings.HasPrefix(string(secret.Type), devopsv1alpha3.DevOpsCredentialPrefix) {
v.enqueueSecret(obj)
}
},
})
return v
}
// enqueueSecret takes a Foo resource and converts it into a namespace/name
// string which is then put onto the work workqueue. This method should *not* be
// passed resources of any type other than DevOpsProject.
func (c *Controller) enqueueSecret(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(key)
}
func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer c.workqueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.syncHandler(key); err != nil {
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.workqueue.Forget(obj)
klog.V(5).Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
klog.Error(err, "could not reconcile devopsProject")
utilruntime.HandleError(err)
return true
}
return true
}
func (c *Controller) worker() {
for c.processNextWorkItem() {
}
}
func (c *Controller) Start(stopCh <-chan struct{}) error {
return c.Run(1, stopCh)
}
func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
klog.Info("starting devopscredential controller")
defer klog.Info("shutting down devopscredential controller")
if !cache.WaitForCacheSync(stopCh, c.secretSynced) {
return fmt.Errorf("failed to wait for caches to sync")
}
for i := 0; i < workers; i++ {
go wait.Until(c.worker, c.workerLoopPeriod, stopCh)
}
<-stopCh
return nil
}
// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the secret resource
// with the current status of the resource.
func (c *Controller) syncHandler(key string) error {
nsName, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.Error(err, fmt.Sprintf("could not split copySecret meta %s ", key))
return nil
}
namespace, err := c.namespaceLister.Get(nsName)
if err != nil {
if errors.IsNotFound(err) {
klog.Info(fmt.Sprintf("namespace '%s' in work queue no longer exists ", key))
return nil
}
klog.Error(err, fmt.Sprintf("could not get namespace %s ", key))
return err
}
if !isDevOpsProjectAdminNamespace(namespace) {
err := fmt.Errorf("cound not create or update credential '%s' in normal namespaces %s", name, namespace.Name)
klog.Warning(err)
return err
}
secret, err := c.secretLister.Secrets(nsName).Get(name)
if err != nil {
if errors.IsNotFound(err) {
klog.Info(fmt.Sprintf("secret '%s' in work queue no longer exists ", key))
return nil
}
klog.Error(err, fmt.Sprintf("could not get secret %s ", key))
return err
}
copySecret := secret.DeepCopy()
// DeletionTimestamp.IsZero() means copySecret has not been deleted.
if copySecret.ObjectMeta.DeletionTimestamp.IsZero() {
// make sure Annotations is not nil
if copySecret.Annotations == nil {
copySecret.Annotations = map[string]string{}
}
//If the sync is successful, return handle
if state, ok := copySecret.Annotations[devopsv1alpha3.CredentialSyncStatusAnnoKey]; ok && state == modelsdevops.StatusSuccessful {
specHash := utils.ComputeHash(copySecret.Data)
oldHash, _ := copySecret.Annotations[devopsv1alpha3.DevOpsCredentialDataHash] // don't need to check if it's nil, only compare if they're different
if specHash == oldHash {
// it was synced successfully, and there's any change with the Pipeline spec, skip this round
return nil
} else {
copySecret.Annotations[devopsv1alpha3.DevOpsCredentialDataHash] = specHash
}
}
// https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/#finalizers
if !sliceutil.HasString(secret.ObjectMeta.Finalizers, devopsv1alpha3.CredentialFinalizerName) {
copySecret.ObjectMeta.Finalizers = append(copySecret.ObjectMeta.Finalizers, devopsv1alpha3.CredentialFinalizerName)
}
// Check secret config exists, otherwise we will create it.
// if secret exists, update config
_, err := c.devopsClient.GetCredentialInProject(nsName, copySecret.Name)
if err == nil {
if _, ok := copySecret.Annotations[devopsv1alpha3.CredentialAutoSyncAnnoKey]; ok {
_, err := c.devopsClient.UpdateCredentialInProject(nsName, copySecret)
if err != nil {
klog.V(8).Info(err, fmt.Sprintf("failed to update secret %s ", key))
return err
}
}
} else {
_, err = c.devopsClient.CreateCredentialInProject(nsName, copySecret)
if err != nil {
klog.V(8).Info(err, fmt.Sprintf("failed to create secret %s ", key))
return err
}
}
//If there is no early return, then the sync is successful.
copySecret.Annotations[devopsv1alpha3.CredentialSyncStatusAnnoKey] = modelsdevops.StatusSuccessful
} else {
// Finalizers processing logic
if sliceutil.HasString(copySecret.ObjectMeta.Finalizers, devopsv1alpha3.CredentialFinalizerName) {
delSuccess := false
if _, err := c.devopsClient.DeleteCredentialInProject(nsName, secret.Name); err != nil {
// the status code should be 404 if the credential does not exists
if srvErr, ok := err.(restful.ServiceError); ok {
delSuccess = srvErr.Code == http.StatusNotFound
} else if srvErr, ok := err.(*devopsClient.ErrorResponse); ok {
delSuccess = srvErr.Response.StatusCode == http.StatusNotFound
} else {
klog.Error(fmt.Sprintf("unexpected error type: %v, should be *restful.ServiceError", err))
}
klog.V(8).Info(err, fmt.Sprintf("failed to delete secret %s in devops", key))
} else {
delSuccess = true
}
if delSuccess {
copySecret.ObjectMeta.Finalizers = sliceutil.RemoveString(copySecret.ObjectMeta.Finalizers, func(item string) bool {
return item == devopsv1alpha3.CredentialFinalizerName
})
} else {
// make sure the corresponding Jenkins credentials can be clean
// You can remove the finalizer via kubectl manually in a very special case that Jenkins might be not able to available anymore
return fmt.Errorf("failed to remove devops credential finalizer due to bad communication with Jenkins")
}
}
}
if !reflect.DeepEqual(secret, copySecret) {
_, err = c.client.CoreV1().Secrets(nsName).Update(context.Background(), copySecret, metav1.UpdateOptions{})
if err != nil {
klog.V(8).Info(err, fmt.Sprintf("failed to update secret %s ", key))
return err
}
}
return nil
}
func isDevOpsProjectAdminNamespace(namespace *v1.Namespace) bool {
_, ok := namespace.Labels[constants.DevOpsProjectLabelKey]
return ok && k8sutil.IsControlledBy(namespace.OwnerReferences,
devopsv1alpha3.ResourceKindDevOpsProject, "")
}

View File

@@ -1,403 +0,0 @@
/*
Copyright 2020 KubeSphere Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package devopscredential
import (
"reflect"
"testing"
"time"
modelsdevops "kubesphere.io/kubesphere/pkg/models/devops"
v1 "k8s.io/api/core/v1"
"kubesphere.io/kubesphere/pkg/constants"
fakeDevOps "kubesphere.io/kubesphere/pkg/simple/client/devops/fake"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
kubeinformers "k8s.io/client-go/informers"
k8sfake "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
devops "kubesphere.io/api/devops/v1alpha3"
)
var (
alwaysReady = func() bool { return true }
noResyncPeriodFunc = func() time.Duration { return 0 }
)
type fixture struct {
t *testing.T
kubeclient *k8sfake.Clientset
namespaceLister []*v1.Namespace
secretLister []*v1.Secret
kubeactions []core.Action
kubeobjects []runtime.Object
// Objects from here preloaded into NewSimpleFake.
objects []runtime.Object
// Objects from here preloaded into devops
initDevOpsProject string
initCredential []*v1.Secret
expectCredential []*v1.Secret
}
func newFixture(t *testing.T) *fixture {
f := &fixture{}
f.t = t
f.objects = []runtime.Object{}
return f
}
func newNamespace(name string, projectName string) *v1.Namespace {
ns := &v1.Namespace{
TypeMeta: metav1.TypeMeta{
Kind: "Namespace",
APIVersion: v1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{constants.DevOpsProjectLabelKey: projectName},
},
}
TRUE := true
ns.ObjectMeta.OwnerReferences = []metav1.OwnerReference{
{
APIVersion: devops.SchemeGroupVersion.String(),
Kind: devops.ResourceKindDevOpsProject,
Name: projectName,
BlockOwnerDeletion: &TRUE,
Controller: &TRUE,
},
}
return ns
}
func newSecret(namespace, name string, data map[string][]byte, withFinalizers bool, autoSync bool, syncOk bool) *v1.Secret {
secret := &v1.Secret{
TypeMeta: metav1.TypeMeta{
Kind: devops.ResourceKindPipeline,
APIVersion: devops.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: name,
Annotations: map[string]string{},
},
Data: data,
Type: devops.DevOpsCredentialPrefix + "test",
}
if withFinalizers {
secret.Finalizers = append(secret.Finalizers, devops.CredentialFinalizerName)
}
if autoSync {
secret.Annotations[devops.CredentialAutoSyncAnnoKey] = "true"
}
if syncOk {
secret.Annotations[devops.CredentialSyncStatusAnnoKey] = modelsdevops.StatusSuccessful
}
return secret
}
func newDeletingSecret(namespace, name string) *v1.Secret {
now := metav1.Now()
pipeline := &v1.Secret{
TypeMeta: metav1.TypeMeta{
Kind: devops.ResourceKindPipeline,
APIVersion: devops.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: name,
DeletionTimestamp: &now,
},
Type: devops.DevOpsCredentialPrefix + "test",
}
pipeline.Finalizers = append(pipeline.Finalizers, devops.CredentialFinalizerName)
return pipeline
}
func (f *fixture) newController() (*Controller, kubeinformers.SharedInformerFactory, *fakeDevOps.Devops) {
f.kubeclient = k8sfake.NewSimpleClientset(f.kubeobjects...)
k8sI := kubeinformers.NewSharedInformerFactory(f.kubeclient, noResyncPeriodFunc())
dI := fakeDevOps.NewWithCredentials(f.initDevOpsProject, f.initCredential...)
c := NewController(f.kubeclient, dI, k8sI.Core().V1().Namespaces(),
k8sI.Core().V1().Secrets())
c.secretSynced = alwaysReady
c.eventRecorder = &record.FakeRecorder{}
for _, f := range f.secretLister {
k8sI.Core().V1().Secrets().Informer().GetIndexer().Add(f)
}
for _, d := range f.namespaceLister {
k8sI.Core().V1().Namespaces().Informer().GetIndexer().Add(d)
}
return c, k8sI, dI
}
func (f *fixture) run(fooName string) {
f.runController(fooName, true, false)
}
func (f *fixture) runExpectError(fooName string) {
f.runController(fooName, true, true)
}
func (f *fixture) runController(name string, startInformers bool, expectError bool) {
c, k8sI, dI := f.newController()
if startInformers {
stopCh := make(chan struct{})
defer close(stopCh)
k8sI.Start(stopCh)
}
err := c.syncHandler(name)
if !expectError && err != nil {
f.t.Errorf("error syncing foo: %v", err)
} else if expectError && err == nil {
f.t.Error("expected error syncing foo, got nil")
}
k8sActions := filterInformerActions(f.kubeclient.Actions())
if len(f.kubeactions) > len(k8sActions) {
f.t.Errorf("%d additional expected actions:%+v", len(f.kubeactions)-len(k8sActions), f.kubeactions[len(k8sActions):])
}
if len(dI.Credentials[f.initDevOpsProject]) != len(f.expectCredential) {
f.t.Errorf(" unexpected objects: %v", dI.Projects)
}
for _, credential := range f.expectCredential {
actualCredential := dI.Credentials[f.initDevOpsProject][credential.Name]
if !reflect.DeepEqual(actualCredential, credential) {
f.t.Errorf(" credential %+v not match \n %+v", credential, actualCredential)
}
}
}
// checkAction verifies that expected and actual actions are equal and both have
// same attached resources
func checkAction(expected, actual core.Action, t *testing.T) {
if !(expected.Matches(actual.GetVerb(), actual.GetResource().Resource) && actual.GetSubresource() == expected.GetSubresource()) {
t.Errorf("Expected\n\t%#v\ngot\n\t%#v", expected, actual)
return
}
if reflect.TypeOf(actual) != reflect.TypeOf(expected) {
t.Errorf("Action has wrong type. Expected: %t. Got: %t", expected, actual)
return
}
switch a := actual.(type) {
case core.CreateActionImpl:
e, _ := expected.(core.CreateActionImpl)
expObject := e.GetObject()
object := a.GetObject()
if !reflect.DeepEqual(expObject, object) {
t.Errorf("Action %s %s has wrong object\nDiff:\n %s",
a.GetVerb(), a.GetResource().Resource, diff.ObjectGoPrintSideBySide(expObject, object))
}
case core.UpdateActionImpl:
e, _ := expected.(core.UpdateActionImpl)
expObject := e.GetObject()
object := a.GetObject()
if !reflect.DeepEqual(expObject, object) {
t.Errorf("Action %s %s has wrong object\nDiff:\n %s",
a.GetVerb(), a.GetResource().Resource, diff.ObjectGoPrintSideBySide(expObject, object))
}
case core.PatchActionImpl:
e, _ := expected.(core.PatchActionImpl)
expPatch := e.GetPatch()
patch := a.GetPatch()
if !reflect.DeepEqual(expPatch, patch) {
t.Errorf("Action %s %s has wrong patch\nDiff:\n %s",
a.GetVerb(), a.GetResource().Resource, diff.ObjectGoPrintSideBySide(expPatch, patch))
}
default:
t.Errorf("Uncaptured Action %s %s, you should explicitly add a case to capture it",
actual.GetVerb(), actual.GetResource().Resource)
}
}
// filterInformerActions filters list and watch actions for testing resources.
// Since list and watch don't change resource state we can filter it to lower
// nose level in our tests.
func filterInformerActions(actions []core.Action) []core.Action {
ret := []core.Action{}
for _, action := range actions {
if len(action.GetNamespace()) == 0 &&
(action.Matches("list", "secrets") ||
action.Matches("watch", "secrets") ||
action.Matches("list", "namespaces") ||
action.Matches("watch", "namespaces")) {
continue
}
ret = append(ret, action)
}
return ret
}
func (f *fixture) expectUpdateSecretAction(p *v1.Secret) {
action := core.NewUpdateAction(schema.GroupVersionResource{
Version: "v1",
Resource: "secrets",
}, p.Namespace, p)
f.kubeactions = append(f.kubeactions, action)
}
func getKey(p *v1.Secret, t *testing.T) string {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(p)
if err != nil {
t.Errorf("Unexpected error getting key for pipeline %v: %v", p.Name, err)
return ""
}
return key
}
func TestDoNothing(t *testing.T) {
f := newFixture(t)
nsName := "test-123"
secretName := "test"
projectName := "test_project"
ns := newNamespace(nsName, projectName)
secret := newSecret(nsName, secretName, nil, true, true, false)
expectSecret := newSecret(nsName, secretName, nil, true, true, true)
f.secretLister = append(f.secretLister, secret)
f.namespaceLister = append(f.namespaceLister, ns)
f.kubeobjects = append(f.kubeobjects, secret)
f.initDevOpsProject = nsName
f.initCredential = []*v1.Secret{secret}
f.expectCredential = []*v1.Secret{expectSecret}
f.run(getKey(secret, t))
}
func TestAddCredentialFinalizers(t *testing.T) {
f := newFixture(t)
nsName := "test-123"
secretName := "test"
projectName := "test_project"
ns := newNamespace(nsName, projectName)
secret := newSecret(nsName, secretName, nil, false, true, false)
expectSecret := newSecret(nsName, secretName, nil, true, true, true)
f.secretLister = append(f.secretLister, secret)
f.namespaceLister = append(f.namespaceLister, ns)
f.kubeobjects = append(f.kubeobjects, secret)
f.initDevOpsProject = nsName
f.initCredential = []*v1.Secret{secret}
f.expectCredential = []*v1.Secret{expectSecret}
f.expectUpdateSecretAction(expectSecret)
f.run(getKey(secret, t))
}
func TestCreateCredential(t *testing.T) {
f := newFixture(t)
nsName := "test-123"
secretName := "test"
projectName := "test_project"
ns := newNamespace(nsName, projectName)
secret := newSecret(nsName, secretName, nil, true, true, false)
expectSecret := newSecret(nsName, secretName, nil, true, true, true)
f.secretLister = append(f.secretLister, secret)
f.namespaceLister = append(f.namespaceLister, ns)
f.kubeobjects = append(f.kubeobjects, secret)
f.initDevOpsProject = nsName
f.expectCredential = []*v1.Secret{expectSecret}
f.run(getKey(secret, t))
}
func TestDeleteCredential(t *testing.T) {
f := newFixture(t)
nsName := "test-123"
secretName := "test"
projectName := "test_project"
ns := newNamespace(nsName, projectName)
secret := newDeletingSecret(nsName, secretName)
expectSecret := secret.DeepCopy()
expectSecret.Finalizers = []string{}
f.secretLister = append(f.secretLister, secret)
f.namespaceLister = append(f.namespaceLister, ns)
f.kubeobjects = append(f.kubeobjects, secret)
f.initDevOpsProject = nsName
f.initCredential = []*v1.Secret{secret}
f.expectCredential = []*v1.Secret{}
f.expectUpdateSecretAction(expectSecret)
f.run(getKey(secret, t))
}
func TestUpdateCredential(t *testing.T) {
f := newFixture(t)
nsName := "test-123"
secretName := "test"
projectName := "test_project"
ns := newNamespace(nsName, projectName)
initSecret := newSecret(nsName, secretName, nil, true, true, false)
modifiedSecret := newSecret(nsName, secretName, map[string][]byte{"a": []byte("aa")}, true, true, false)
expectSecret := newSecret(nsName, secretName, map[string][]byte{"a": []byte("aa")}, true, true, true)
f.secretLister = append(f.secretLister, modifiedSecret)
f.namespaceLister = append(f.namespaceLister, ns)
f.kubeobjects = append(f.kubeobjects, modifiedSecret)
f.initDevOpsProject = nsName
f.initCredential = []*v1.Secret{initSecret}
f.expectCredential = []*v1.Secret{expectSecret}
f.run(getKey(modifiedSecret, t))
}
func TestNotUpdateCredential(t *testing.T) {
f := newFixture(t)
nsName := "test-123"
secretName := "test"
projectName := "test_project"
ns := newNamespace(nsName, projectName)
initSecret := newSecret(nsName, secretName, nil, true, false, false)
expectSecret := newSecret(nsName, secretName, map[string][]byte{"a": []byte("aa")}, true, false, true)
f.secretLister = append(f.secretLister, expectSecret)
f.namespaceLister = append(f.namespaceLister, ns)
f.kubeobjects = append(f.kubeobjects, expectSecret)
f.initDevOpsProject = nsName
f.initCredential = []*v1.Secret{initSecret}
f.expectCredential = []*v1.Secret{initSecret}
f.run(getKey(expectSecret, t))
}

View File

@@ -1,445 +0,0 @@
/*
Copyright 2020 KubeSphere Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package devopsproject
import (
"context"
"fmt"
"net/http"
"reflect"
"time"
"github.com/emicklei/go-restful"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
corev1informer "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corev1lister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
devopsv1alpha3 "kubesphere.io/api/devops/v1alpha3"
"kubesphere.io/kubesphere/pkg/client/clientset/versioned/scheme"
tenantv1alpha1informers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/tenant/v1alpha1"
tenantv1alpha1listers "kubesphere.io/kubesphere/pkg/client/listers/tenant/v1alpha1"
"kubesphere.io/kubesphere/pkg/constants"
modelsdevops "kubesphere.io/kubesphere/pkg/models/devops"
devopsClient "kubesphere.io/kubesphere/pkg/simple/client/devops"
"kubesphere.io/kubesphere/pkg/utils/k8sutil"
"kubesphere.io/kubesphere/pkg/utils/sliceutil"
kubesphereclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
devopsinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/devops/v1alpha3"
devopslisters "kubesphere.io/kubesphere/pkg/client/listers/devops/v1alpha3"
)
/**
DevOps project controller is used to maintain the state of the DevOps project.
*/
type Controller struct {
client clientset.Interface
kubesphereClient kubesphereclient.Interface
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
devOpsProjectLister devopslisters.DevOpsProjectLister
devOpsProjectSynced cache.InformerSynced
namespaceLister corev1lister.NamespaceLister
namespaceSynced cache.InformerSynced
workspaceLister tenantv1alpha1listers.WorkspaceLister
workspaceSynced cache.InformerSynced
workqueue workqueue.RateLimitingInterface
workerLoopPeriod time.Duration
devopsClient devopsClient.Interface
}
func NewController(client clientset.Interface,
kubesphereClient kubesphereclient.Interface,
devopsClinet devopsClient.Interface,
namespaceInformer corev1informer.NamespaceInformer,
devopsInformer devopsinformers.DevOpsProjectInformer,
workspaceInformer tenantv1alpha1informers.WorkspaceInformer) *Controller {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(func(format string, args ...interface{}) {
klog.Info(fmt.Sprintf(format, args))
})
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "devopsproject-controller"})
v := &Controller{
client: client,
devopsClient: devopsClinet,
kubesphereClient: kubesphereClient,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "devopsproject"),
devOpsProjectLister: devopsInformer.Lister(),
devOpsProjectSynced: devopsInformer.Informer().HasSynced,
namespaceLister: namespaceInformer.Lister(),
namespaceSynced: namespaceInformer.Informer().HasSynced,
workspaceLister: workspaceInformer.Lister(),
workspaceSynced: workspaceInformer.Informer().HasSynced,
workerLoopPeriod: time.Second,
}
v.eventBroadcaster = broadcaster
v.eventRecorder = recorder
devopsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: v.enqueueDevOpsProject,
UpdateFunc: func(oldObj, newObj interface{}) {
old := oldObj.(*devopsv1alpha3.DevOpsProject)
new := newObj.(*devopsv1alpha3.DevOpsProject)
if old.ResourceVersion == new.ResourceVersion {
return
}
v.enqueueDevOpsProject(newObj)
},
DeleteFunc: v.enqueueDevOpsProject,
})
return v
}
// enqueueDevOpsProject takes a Foo resource and converts it into a namespace/name
// string which is then put onto the work workqueue. This method should *not* be
// passed resources of any type other than DevOpsProject.
func (c *Controller) enqueueDevOpsProject(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(key)
}
func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer c.workqueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.syncHandler(key); err != nil {
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.workqueue.Forget(obj)
klog.V(5).Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
klog.Error(err, "could not reconcile devopsProject")
utilruntime.HandleError(err)
return true
}
return true
}
func (c *Controller) worker() {
for c.processNextWorkItem() {
}
}
func (c *Controller) Start(stopCh <-chan struct{}) error {
return c.Run(1, stopCh)
}
func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
klog.Info("starting devops project controller")
defer klog.Info("shutting down devops project controller")
if !cache.WaitForCacheSync(stopCh, c.devOpsProjectSynced, c.devOpsProjectSynced, c.workspaceSynced) {
return fmt.Errorf("failed to wait for caches to sync")
}
for i := 0; i < workers; i++ {
go wait.Until(c.worker, c.workerLoopPeriod, stopCh)
}
<-stopCh
return nil
}
// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the devopsproject resource
// with the current status of the resource.
func (c *Controller) syncHandler(key string) error {
project, err := c.devOpsProjectLister.Get(key)
if err != nil {
if errors.IsNotFound(err) {
klog.Info(fmt.Sprintf("devopsproject '%s' in work queue no longer exists ", key))
return nil
}
klog.V(8).Info(err, fmt.Sprintf("could not get devopsproject %s ", key))
return err
}
copyProject := project.DeepCopy()
// DeletionTimestamp.IsZero() means DevOps project has not been deleted.
if project.ObjectMeta.DeletionTimestamp.IsZero() {
//If the sync is successful, return handle
if state, ok := project.Annotations[devopsv1alpha3.DevOpeProjectSyncStatusAnnoKey]; ok && state == modelsdevops.StatusSuccessful {
return nil
}
// Use Finalizers to sync DevOps status when DevOps project was deleted
// https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/#finalizers
if !sliceutil.HasString(project.ObjectMeta.Finalizers, devopsv1alpha3.DevOpsProjectFinalizerName) {
copyProject.ObjectMeta.Finalizers = append(copyProject.ObjectMeta.Finalizers, devopsv1alpha3.DevOpsProjectFinalizerName)
}
if project.Status.AdminNamespace != "" {
ns, err := c.namespaceLister.Get(project.Status.AdminNamespace)
if err != nil && !errors.IsNotFound(err) {
klog.V(8).Info(err, fmt.Sprintf("faild to get namespace"))
return err
} else if errors.IsNotFound(err) {
// if admin ns is not found, clean project status, rerun reconcile
copyProject.Status.AdminNamespace = ""
_, err := c.kubesphereClient.DevopsV1alpha3().DevOpsProjects().Update(context.Background(), copyProject, metav1.UpdateOptions{})
if err != nil {
klog.V(8).Info(err, fmt.Sprintf("failed to update project %s ", key))
return err
}
c.enqueueDevOpsProject(key)
return nil
}
// If ns exists, but the associated attributes with the project are not set correctly,
// then reset the associated attributes
if k8sutil.IsControlledBy(ns.OwnerReferences,
devopsv1alpha3.ResourceKindDevOpsProject, project.Name) &&
ns.Labels[constants.DevOpsProjectLabelKey] == project.Name {
} else {
copyNs := ns.DeepCopy()
err := controllerutil.SetControllerReference(copyProject, copyNs, scheme.Scheme)
if err != nil {
klog.V(8).Info(err, fmt.Sprintf("failed to set ownerreference %s ", key))
return err
}
copyNs.Labels[constants.DevOpsProjectLabelKey] = project.Name
_, err = c.client.CoreV1().Namespaces().Update(context.Background(), copyNs, metav1.UpdateOptions{})
if err != nil {
klog.V(8).Info(err, fmt.Sprintf("failed to update ns %s ", key))
return err
}
}
} else {
// list ns by devops project
namespaces, err := c.namespaceLister.List(
labels.SelectorFromSet(labels.Set{constants.DevOpsProjectLabelKey: project.Name}))
if err != nil {
klog.V(8).Info(err, fmt.Sprintf("failed to list ns %s ", key))
return err
}
// if there is no ns, generate new one
if len(namespaces) == 0 {
ns := c.generateNewNamespace(project)
ns, err := c.client.CoreV1().Namespaces().Create(context.Background(), ns, metav1.CreateOptions{})
if err != nil {
// devops project name is conflict, cannot create admin namespace
if errors.IsAlreadyExists(err) {
klog.Errorf("Failed to create admin namespace for devopsproject %s, error %v", project.Name, err)
c.eventRecorder.Event(project, v1.EventTypeWarning, "CreateAdminNamespaceFailed", err.Error())
return err
}
klog.V(8).Info(err, fmt.Sprintf("failed to create ns %s ", key))
return err
}
copyProject.Status.AdminNamespace = ns.Name
} else if len(namespaces) != 0 {
ns := namespaces[0]
// reset ownerReferences
if !k8sutil.IsControlledBy(ns.OwnerReferences,
devopsv1alpha3.ResourceKindDevOpsProject, project.Name) {
copyNs := ns.DeepCopy()
err := controllerutil.SetControllerReference(copyProject, copyNs, scheme.Scheme)
if err != nil {
klog.V(8).Info(err, fmt.Sprintf("failed to set ownerreference %s ", key))
return err
}
copyNs.Labels[constants.DevOpsProjectLabelKey] = project.Name
_, err = c.client.CoreV1().Namespaces().Update(context.Background(), copyNs, metav1.UpdateOptions{})
if err != nil {
klog.V(8).Info(err, fmt.Sprintf("failed to update ns %s ", key))
return err
}
}
copyProject.Status.AdminNamespace = ns.Name
}
}
if copyProject, err = c.bindWorkspace(copyProject); err != nil {
klog.Error(err)
return err
}
// Check project exists, otherwise we will create it.
_, err := c.devopsClient.GetDevOpsProject(copyProject.Status.AdminNamespace)
if err != nil {
klog.Error(err, fmt.Sprintf("failed to get project %s ", key))
_, err := c.devopsClient.CreateDevOpsProject(copyProject.Status.AdminNamespace)
if err != nil {
klog.V(8).Info(err, fmt.Sprintf("failed to get project %s ", key))
return err
}
}
//If there is no early return, then the sync is successful.
if copyProject.Annotations == nil {
copyProject.Annotations = map[string]string{}
}
copyProject.Annotations[devopsv1alpha3.DevOpeProjectSyncStatusAnnoKey] = modelsdevops.StatusSuccessful
if !reflect.DeepEqual(copyProject, project) {
copyProject, err = c.kubesphereClient.DevopsV1alpha3().DevOpsProjects().Update(context.Background(), copyProject, metav1.UpdateOptions{})
if err != nil {
klog.V(8).Info(err, fmt.Sprintf("failed to update ns %s ", key))
return err
}
}
} else {
// Finalizers processing logic
if sliceutil.HasString(project.ObjectMeta.Finalizers, devopsv1alpha3.DevOpsProjectFinalizerName) {
delSuccess := false
if err := c.deleteDevOpsProjectInDevOps(project); err != nil {
// the status code should be 404 if the job does not exists
if srvErr, ok := err.(restful.ServiceError); ok {
delSuccess = srvErr.Code == http.StatusNotFound
} else if srvErr, ok := err.(*devopsClient.ErrorResponse); ok {
delSuccess = srvErr.Response.StatusCode == http.StatusNotFound
} else {
klog.Error(fmt.Sprintf("unexpected error type: %v, should be *restful.ServiceError", err))
}
klog.V(8).Info(err, fmt.Sprintf("failed to delete resource %s in devops", key))
} else {
delSuccess = true
}
if delSuccess {
project.ObjectMeta.Finalizers = sliceutil.RemoveString(project.ObjectMeta.Finalizers, func(item string) bool {
return item == devopsv1alpha3.DevOpsProjectFinalizerName
})
} else {
// make sure the corresponding Jenkins job can be clean
// You can remove the finalizer via kubectl manually in a very special case that Jenkins might be not able to available anymore
return fmt.Errorf("failed to remove devopsproject finalizer due to bad communication with Jenkins")
}
_, err = c.kubesphereClient.DevopsV1alpha3().DevOpsProjects().Update(context.Background(), project, metav1.UpdateOptions{})
if err != nil {
klog.V(8).Info(err, fmt.Sprintf("failed to update project %s ", key))
return err
}
}
}
return nil
}
func (c *Controller) bindWorkspace(project *devopsv1alpha3.DevOpsProject) (*devopsv1alpha3.DevOpsProject, error) {
workspaceName := project.Labels[constants.WorkspaceLabelKey]
if workspaceName == "" {
return project, nil
}
workspace, err := c.workspaceLister.Get(workspaceName)
if err != nil {
// skip if workspace not found
if errors.IsNotFound(err) {
return project, nil
}
klog.Error(err)
return nil, err
}
if !metav1.IsControlledBy(project, workspace) {
project.OwnerReferences = nil
if err := controllerutil.SetControllerReference(workspace, project, scheme.Scheme); err != nil {
klog.Error(err)
return nil, err
}
return c.kubesphereClient.DevopsV1alpha3().DevOpsProjects().Update(context.Background(), project, metav1.UpdateOptions{})
}
return project, nil
}
func (c *Controller) deleteDevOpsProjectInDevOps(project *devopsv1alpha3.DevOpsProject) (err error) {
err = c.devopsClient.DeleteDevOpsProject(project.Status.AdminNamespace)
return
}
func (c *Controller) generateNewNamespace(project *devopsv1alpha3.DevOpsProject) *v1.Namespace {
// devops project name and admin namespace name should be the same
// solve the access control problem of devops API v1alpha2 and v1alpha3
ns := &v1.Namespace{
TypeMeta: metav1.TypeMeta{
Kind: "Namespace",
APIVersion: v1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: project.Name,
Labels: map[string]string{
constants.DevOpsProjectLabelKey: project.Name,
},
},
}
if creator := project.Annotations[constants.CreatorAnnotationKey]; creator != "" {
ns.Annotations = map[string]string{constants.CreatorAnnotationKey: creator}
}
controllerutil.SetControllerReference(project, ns, scheme.Scheme)
return ns
}

View File

@@ -1,412 +0,0 @@
/*
Copyright 2020 KubeSphere Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package devopsproject
import (
"reflect"
"testing"
"time"
v1 "k8s.io/api/core/v1"
devopsprojects "kubesphere.io/api/devops/v1alpha3"
"kubesphere.io/kubesphere/pkg/constants"
fakeDevOps "kubesphere.io/kubesphere/pkg/simple/client/devops/fake"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
kubeinformers "k8s.io/client-go/informers"
k8sfake "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
devops "kubesphere.io/api/devops/v1alpha3"
"kubesphere.io/kubesphere/pkg/client/clientset/versioned/fake"
informers "kubesphere.io/kubesphere/pkg/client/informers/externalversions"
)
var (
alwaysReady = func() bool { return true }
noResyncPeriodFunc = func() time.Duration { return 0 }
)
type fixture struct {
t *testing.T
client *fake.Clientset
kubeclient *k8sfake.Clientset
// Objects to put in the store.
devopsProjectLister []*devops.DevOpsProject
namespaceLister []*v1.Namespace
actions []core.Action
kubeactions []core.Action
kubeobjects []runtime.Object
// Objects from here preloaded into NewSimpleFake.
objects []runtime.Object
// Objects from here preloaded into devops
initDevOpsProject []string
expectDevOpsProject []string
}
func newFixture(t *testing.T) *fixture {
f := &fixture{}
f.t = t
f.objects = []runtime.Object{}
return f
}
func newDevOpsProject(name string, nsName string, withFinalizers bool, withStatus bool) *devopsprojects.DevOpsProject {
project := &devopsprojects.DevOpsProject{
TypeMeta: metav1.TypeMeta{APIVersion: devopsprojects.SchemeGroupVersion.String()},
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
}
if withFinalizers {
project.Finalizers = []string{devopsprojects.DevOpsProjectFinalizerName}
}
if withStatus {
project.Status = devops.DevOpsProjectStatus{AdminNamespace: nsName}
}
return project
}
func newNamespace(name string, projectName string, useGenerateName, withOwnerReference bool) *v1.Namespace {
ns := &v1.Namespace{
TypeMeta: metav1.TypeMeta{
Kind: "Namespace",
APIVersion: v1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{constants.DevOpsProjectLabelKey: projectName},
},
}
if useGenerateName {
ns.ObjectMeta.Name = ""
ns.ObjectMeta.GenerateName = projectName
}
if withOwnerReference {
TRUE := true
ns.ObjectMeta.OwnerReferences = []metav1.OwnerReference{
{
APIVersion: devops.SchemeGroupVersion.String(),
Kind: devops.ResourceKindDevOpsProject,
Name: projectName,
BlockOwnerDeletion: &TRUE,
Controller: &TRUE,
},
}
}
return ns
}
func newDeletingDevOpsProject(name string) *devopsprojects.DevOpsProject {
now := metav1.Now()
return &devopsprojects.DevOpsProject{
TypeMeta: metav1.TypeMeta{APIVersion: devopsprojects.SchemeGroupVersion.String()},
ObjectMeta: metav1.ObjectMeta{
Name: name,
DeletionTimestamp: &now,
Finalizers: []string{devopsprojects.DevOpsProjectFinalizerName},
},
}
}
func (f *fixture) newController() (*Controller, informers.SharedInformerFactory, kubeinformers.SharedInformerFactory, *fakeDevOps.Devops) {
f.client = fake.NewSimpleClientset(f.objects...)
f.kubeclient = k8sfake.NewSimpleClientset(f.kubeobjects...)
i := informers.NewSharedInformerFactory(f.client, noResyncPeriodFunc())
k8sI := kubeinformers.NewSharedInformerFactory(f.kubeclient, noResyncPeriodFunc())
dI := fakeDevOps.New(f.initDevOpsProject...)
c := NewController(f.kubeclient, f.client, dI,
k8sI.Core().V1().Namespaces(),
i.Devops().V1alpha3().DevOpsProjects(),
i.Tenant().V1alpha1().Workspaces())
c.devOpsProjectSynced = alwaysReady
c.eventRecorder = &record.FakeRecorder{}
for _, f := range f.devopsProjectLister {
i.Devops().V1alpha3().DevOpsProjects().Informer().GetIndexer().Add(f)
}
for _, d := range f.namespaceLister {
k8sI.Core().V1().Namespaces().Informer().GetIndexer().Add(d)
}
return c, i, k8sI, dI
}
func (f *fixture) run(fooName string) {
f.runController(fooName, true, false)
}
func (f *fixture) runExpectError(fooName string) {
f.runController(fooName, true, true)
}
func (f *fixture) runController(projectName string, startInformers bool, expectError bool) {
c, i, k8sI, dI := f.newController()
if startInformers {
stopCh := make(chan struct{})
defer close(stopCh)
i.Start(stopCh)
k8sI.Start(stopCh)
}
err := c.syncHandler(projectName)
if !expectError && err != nil {
f.t.Errorf("error syncing foo: %v", err)
} else if expectError && err == nil {
f.t.Error("expected error syncing foo, got nil")
}
actions := filterInformerActions(f.client.Actions())
k8sActions := filterInformerActions(f.kubeclient.Actions())
if len(f.kubeactions) > len(k8sActions) {
f.t.Errorf("%d additional expected actions:%+v", len(f.kubeactions)-len(k8sActions), f.kubeactions[len(k8sActions):])
}
if len(f.actions) > len(actions) {
f.t.Errorf("%d additional expected actions:%+v", len(f.actions)-len(actions), f.actions[len(actions):])
}
if len(dI.Projects) != len(f.expectDevOpsProject) {
f.t.Errorf(" unexpected objects: %v", dI.Projects)
}
}
// checkAction verifies that expected and actual actions are equal and both have
// same attached resources
func checkAction(expected, actual core.Action, t *testing.T) {
if !(expected.Matches(actual.GetVerb(), actual.GetResource().Resource) && actual.GetSubresource() == expected.GetSubresource()) {
t.Errorf("Expected\n\t%#v\ngot\n\t%#v", expected, actual)
return
}
if reflect.TypeOf(actual) != reflect.TypeOf(expected) {
t.Errorf("Action has wrong type. Expected: %t. Got: %t", expected, actual)
return
}
switch a := actual.(type) {
case core.CreateActionImpl:
e, _ := expected.(core.CreateActionImpl)
expObject := e.GetObject()
object := a.GetObject()
if !reflect.DeepEqual(expObject, object) {
t.Errorf("Action %s %s has wrong object\nDiff:\n %s",
a.GetVerb(), a.GetResource().Resource, diff.ObjectGoPrintSideBySide(expObject, object))
}
case core.UpdateActionImpl:
e, _ := expected.(core.UpdateActionImpl)
expObject := e.GetObject()
object := a.GetObject()
if !reflect.DeepEqual(expObject, object) {
t.Errorf("Action %s %s has wrong object\nDiff:\n %s",
a.GetVerb(), a.GetResource().Resource, diff.ObjectGoPrintSideBySide(expObject, object))
}
case core.PatchActionImpl:
e, _ := expected.(core.PatchActionImpl)
expPatch := e.GetPatch()
patch := a.GetPatch()
if !reflect.DeepEqual(expPatch, patch) {
t.Errorf("Action %s %s has wrong patch\nDiff:\n %s",
a.GetVerb(), a.GetResource().Resource, diff.ObjectGoPrintSideBySide(expPatch, patch))
}
default:
t.Errorf("Uncaptured Action %s %s, you should explicitly add a case to capture it",
actual.GetVerb(), actual.GetResource().Resource)
}
}
// filterInformerActions filters list and watch actions for testing resources.
// Since list and watch don't change resource state we can filter it to lower
// nose level in our tests.
func filterInformerActions(actions []core.Action) []core.Action {
ret := []core.Action{}
for _, action := range actions {
if len(action.GetNamespace()) == 0 &&
(action.Matches("list", devopsprojects.ResourcePluralDevOpsProject) ||
action.Matches("watch", devopsprojects.ResourcePluralDevOpsProject) ||
action.Matches("list", "namespaces") ||
action.Matches("watch", "namespaces") ||
action.Matches("watch", "workspaces") ||
action.Matches("list", "workspaces")) {
continue
}
ret = append(ret, action)
}
return ret
}
func (f *fixture) expectUpdateDevOpsProjectAction(p *devopsprojects.DevOpsProject) {
action := core.NewUpdateAction(schema.GroupVersionResource{Resource: devopsprojects.ResourcePluralDevOpsProject},
p.Namespace, p)
f.actions = append(f.actions, action)
}
func (f *fixture) expectUpdateNamespaceAction(p *v1.Namespace) {
action := core.NewUpdateAction(schema.GroupVersionResource{
Version: "v1",
Resource: "namespaces",
}, p.Namespace, p)
f.kubeactions = append(f.kubeactions, action)
}
func (f *fixture) expectCreateNamespaceAction(p *v1.Namespace) {
action := core.NewCreateAction(schema.GroupVersionResource{
Version: "v1",
Resource: "namespaces",
}, p.Namespace, p)
f.kubeactions = append(f.kubeactions, action)
}
func getKey(p *devopsprojects.DevOpsProject, t *testing.T) string {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(p)
if err != nil {
t.Errorf("Unexpected error getting key for devopsprojects %v: %v", p.Name, err)
return ""
}
return key
}
func TestDoNothing(t *testing.T) {
f := newFixture(t)
nsName := "test-123"
projectName := "test"
project := newDevOpsProject(projectName, nsName, true, true)
ns := newNamespace(nsName, projectName, false, true)
f.devopsProjectLister = append(f.devopsProjectLister, project)
f.namespaceLister = append(f.namespaceLister, ns)
f.objects = append(f.objects, project)
f.initDevOpsProject = []string{ns.Name}
f.expectDevOpsProject = []string{ns.Name}
f.run(getKey(project, t))
}
func TestUpdateProjectFinalizers(t *testing.T) {
f := newFixture(t)
nsName := "test-123"
projectName := "test"
project := newDevOpsProject(projectName, nsName, false, true)
ns := newNamespace(nsName, projectName, false, true)
f.devopsProjectLister = append(f.devopsProjectLister, project)
f.namespaceLister = append(f.namespaceLister, ns)
f.objects = append(f.objects, project)
f.kubeobjects = append(f.kubeobjects, ns)
f.initDevOpsProject = []string{ns.Name}
f.expectDevOpsProject = []string{ns.Name}
expectUpdateProject := project.DeepCopy()
expectUpdateProject.Finalizers = []string{devops.DevOpsProjectFinalizerName}
f.expectUpdateDevOpsProjectAction(expectUpdateProject)
f.run(getKey(project, t))
}
func TestUpdateProjectStatus(t *testing.T) {
f := newFixture(t)
nsName := "test-123"
projectName := "test"
project := newDevOpsProject(projectName, nsName, true, false)
ns := newNamespace(nsName, projectName, false, true)
f.devopsProjectLister = append(f.devopsProjectLister, project)
f.namespaceLister = append(f.namespaceLister, ns)
f.objects = append(f.objects, project)
f.kubeobjects = append(f.kubeobjects, ns)
f.initDevOpsProject = []string{ns.Name}
f.expectDevOpsProject = []string{ns.Name}
expectUpdateProject := project.DeepCopy()
expectUpdateProject.Status.AdminNamespace = nsName
f.expectUpdateDevOpsProjectAction(expectUpdateProject)
f.run(getKey(project, t))
}
func TestUpdateNsOwnerReference(t *testing.T) {
f := newFixture(t)
nsName := "test-123"
projectName := "test"
project := newDevOpsProject(projectName, nsName, true, true)
ns := newNamespace(nsName, projectName, false, false)
f.devopsProjectLister = append(f.devopsProjectLister, project)
f.namespaceLister = append(f.namespaceLister, ns)
f.objects = append(f.objects, project)
f.kubeobjects = append(f.kubeobjects, ns)
f.initDevOpsProject = []string{ns.Name}
f.expectDevOpsProject = []string{ns.Name}
expectUpdateNs := newNamespace(nsName, projectName, false, true)
f.expectUpdateNamespaceAction(expectUpdateNs)
f.run(getKey(project, t))
}
func TestCreateDevOpsProjects(t *testing.T) {
f := newFixture(t)
project := newDevOpsProject("test", "", true, false)
ns := newNamespace("test", "test", false, true)
f.devopsProjectLister = append(f.devopsProjectLister, project)
f.objects = append(f.objects, project)
f.expectDevOpsProject = []string{""}
expect := project.DeepCopy()
expect.Status.AdminNamespace = "test"
f.expectUpdateDevOpsProjectAction(expect)
f.expectCreateNamespaceAction(ns)
f.run(getKey(project, t))
}
func TestDeleteDevOpsProjects(t *testing.T) {
f := newFixture(t)
project := newDeletingDevOpsProject("test")
f.devopsProjectLister = append(f.devopsProjectLister, project)
f.objects = append(f.objects, project)
f.initDevOpsProject = []string{project.Name}
f.expectDevOpsProject = []string{project.Name}
expectProject := project.DeepCopy()
expectProject.Finalizers = []string{}
f.expectUpdateDevOpsProjectAction(expectProject)
f.run(getKey(project, t))
}
func TestDeleteDevOpsProjectsWithNull(t *testing.T) {
f := newFixture(t)
project := newDeletingDevOpsProject("test")
f.devopsProjectLister = append(f.devopsProjectLister, project)
f.objects = append(f.objects, project)
f.expectDevOpsProject = []string{}
expectProject := project.DeepCopy()
expectProject.Finalizers = []string{}
f.expectUpdateDevOpsProjectAction(expectProject)
f.run(getKey(project, t))
}

View File

@@ -1,328 +0,0 @@
/*
Copyright 2020 KubeSphere Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pipeline
import (
"context"
"fmt"
"net/http"
"reflect"
"time"
"github.com/emicklei/go-restful"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
corev1informer "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corev1lister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
devopsv1alpha3 "kubesphere.io/api/devops/v1alpha3"
kubesphereclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
devopsinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/devops/v1alpha3"
devopslisters "kubesphere.io/kubesphere/pkg/client/listers/devops/v1alpha3"
"kubesphere.io/kubesphere/pkg/constants"
"kubesphere.io/kubesphere/pkg/controller/utils"
modelsdevops "kubesphere.io/kubesphere/pkg/models/devops"
devopsClient "kubesphere.io/kubesphere/pkg/simple/client/devops"
"kubesphere.io/kubesphere/pkg/utils/k8sutil"
"kubesphere.io/kubesphere/pkg/utils/sliceutil"
)
/**
DevOps project controller is used to maintain the state of the DevOps project.
*/
type Controller struct {
client clientset.Interface
kubesphereClient kubesphereclient.Interface
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
devOpsProjectLister devopslisters.PipelineLister
pipelineSynced cache.InformerSynced
namespaceLister corev1lister.NamespaceLister
namespaceSynced cache.InformerSynced
workqueue workqueue.RateLimitingInterface
workerLoopPeriod time.Duration
devopsClient devopsClient.Interface
}
func NewController(client clientset.Interface,
kubesphereClient kubesphereclient.Interface,
devopsClinet devopsClient.Interface,
namespaceInformer corev1informer.NamespaceInformer,
devopsInformer devopsinformers.PipelineInformer) *Controller {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(func(format string, args ...interface{}) {
klog.Info(fmt.Sprintf(format, args))
})
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "pipeline-controller"})
v := &Controller{
client: client,
devopsClient: devopsClinet,
kubesphereClient: kubesphereClient,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pipeline"),
devOpsProjectLister: devopsInformer.Lister(),
pipelineSynced: devopsInformer.Informer().HasSynced,
namespaceLister: namespaceInformer.Lister(),
namespaceSynced: namespaceInformer.Informer().HasSynced,
workerLoopPeriod: time.Second,
}
v.eventBroadcaster = broadcaster
v.eventRecorder = recorder
devopsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: v.enqueuePipeline,
UpdateFunc: func(oldObj, newObj interface{}) {
oldPipeline := oldObj.(*devopsv1alpha3.Pipeline)
newPipeline := newObj.(*devopsv1alpha3.Pipeline)
if oldPipeline.ResourceVersion == newPipeline.ResourceVersion {
return
}
v.enqueuePipeline(newObj)
},
DeleteFunc: v.enqueuePipeline,
})
return v
}
// enqueuePipeline takes a Foo resource and converts it into a namespace/name
// string which is then put onto the work workqueue. This method should *not* be
// passed resources of any type other than DevOpsProject.
func (c *Controller) enqueuePipeline(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(key)
}
func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer c.workqueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.syncHandler(key); err != nil {
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.workqueue.Forget(obj)
klog.V(5).Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
klog.Error(err, "could not reconcile devopsProject")
utilruntime.HandleError(err)
return true
}
return true
}
func (c *Controller) worker() {
for c.processNextWorkItem() {
}
}
func (c *Controller) Start(stopCh <-chan struct{}) error {
return c.Run(1, stopCh)
}
func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
klog.Info("starting pipeline controller")
defer klog.Info("shutting down pipeline controller")
if !cache.WaitForCacheSync(stopCh, c.pipelineSynced) {
return fmt.Errorf("failed to wait for caches to sync")
}
for i := 0; i < workers; i++ {
go wait.Until(c.worker, c.workerLoopPeriod, stopCh)
}
<-stopCh
return nil
}
// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the pipeline resource
// with the current status of the resource.
func (c *Controller) syncHandler(key string) error {
nsName, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.Error(err, fmt.Sprintf("could not split copyPipeline meta %s ", key))
return nil
}
namespace, err := c.namespaceLister.Get(nsName)
if err != nil {
if errors.IsNotFound(err) {
klog.Info(fmt.Sprintf("namespace '%s' in work queue no longer exists ", key))
return nil
}
klog.V(8).Info(err, fmt.Sprintf("could not get namespace %s ", key))
return err
}
if !isDevOpsProjectAdminNamespace(namespace) {
err := fmt.Errorf("cound not create copyPipeline in normal namespaces %s", namespace.Name)
klog.Warning(err)
return err
}
pipeline, err := c.devOpsProjectLister.Pipelines(nsName).Get(name)
if err != nil {
if errors.IsNotFound(err) {
klog.V(8).Info(fmt.Sprintf("copyPipeline '%s' in work queue no longer exists ", key))
return nil
}
klog.Error(err, fmt.Sprintf("could not get copyPipeline %s ", key))
return err
}
copyPipeline := pipeline.DeepCopy()
// DeletionTimestamp.IsZero() means copyPipeline has not been deleted.
if copyPipeline.ObjectMeta.DeletionTimestamp.IsZero() {
// make sure Annotations is not nil
if copyPipeline.Annotations == nil {
copyPipeline.Annotations = map[string]string{}
}
//If the sync is successful, return handle
if state, ok := copyPipeline.Annotations[devopsv1alpha3.PipelineSyncStatusAnnoKey]; ok && state == modelsdevops.StatusSuccessful {
specHash := utils.ComputeHash(copyPipeline.Spec)
oldHash, _ := copyPipeline.Annotations[devopsv1alpha3.PipelineSpecHash] // don't need to check if it's nil, only compare if they're different
if specHash == oldHash {
// it was synced successfully, and there's any change with the Pipeline spec, skip this round
return nil
} else {
copyPipeline.Annotations[devopsv1alpha3.PipelineSpecHash] = specHash
}
}
// https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/#finalizers
if !sliceutil.HasString(copyPipeline.ObjectMeta.Finalizers, devopsv1alpha3.PipelineFinalizerName) {
copyPipeline.ObjectMeta.Finalizers = append(copyPipeline.ObjectMeta.Finalizers, devopsv1alpha3.PipelineFinalizerName)
}
// Check pipeline config exists, otherwise we will create it.
// if pipeline exists, check & update config
jenkinsPipeline, err := c.devopsClient.GetProjectPipelineConfig(nsName, pipeline.Name)
if err == nil {
if !reflect.DeepEqual(jenkinsPipeline.Spec, copyPipeline.Spec) {
_, err := c.devopsClient.UpdateProjectPipeline(nsName, copyPipeline)
if err != nil {
klog.V(8).Info(err, fmt.Sprintf("failed to update pipeline config %s ", key))
return err
}
} else {
klog.V(8).Info(fmt.Sprintf("nothing was changed, pipeline '%v'", copyPipeline.Spec))
}
} else {
_, err := c.devopsClient.CreateProjectPipeline(nsName, copyPipeline)
if err != nil {
klog.V(8).Info(err, fmt.Sprintf("failed to create copyPipeline %s ", key))
return err
}
}
//If there is no early return, then the sync is successful.
copyPipeline.Annotations[devopsv1alpha3.PipelineSyncStatusAnnoKey] = modelsdevops.StatusSuccessful
} else {
// Finalizers processing logic
if sliceutil.HasString(copyPipeline.ObjectMeta.Finalizers, devopsv1alpha3.PipelineFinalizerName) {
delSuccess := false
if _, err := c.devopsClient.DeleteProjectPipeline(nsName, pipeline.Name); err != nil {
// the status code should be 404 if the job does not exists
if srvErr, ok := err.(restful.ServiceError); ok {
delSuccess = srvErr.Code == http.StatusNotFound
} else if srvErr, ok := err.(*devopsClient.ErrorResponse); ok {
delSuccess = srvErr.Response.StatusCode == http.StatusNotFound
} else {
klog.Error(fmt.Sprintf("unexpected error type: %v, should be *restful.ServiceError", err))
}
klog.V(8).Info(err, fmt.Sprintf("failed to delete pipeline %s in devops", key))
} else {
delSuccess = true
}
if delSuccess {
copyPipeline.ObjectMeta.Finalizers = sliceutil.RemoveString(copyPipeline.ObjectMeta.Finalizers, func(item string) bool {
return item == devopsv1alpha3.PipelineFinalizerName
})
} else {
// make sure the corresponding Jenkins job can be clean
// You can remove the finalizer via kubectl manually in a very special case that Jenkins might be not able to available anymore
return fmt.Errorf("failed to remove pipeline job finalizer due to bad communication with Jenkins")
}
}
}
if !reflect.DeepEqual(pipeline, copyPipeline) {
_, err = c.kubesphereClient.DevopsV1alpha3().Pipelines(nsName).Update(context.Background(), copyPipeline, metav1.UpdateOptions{})
if err != nil {
klog.V(8).Info(err, fmt.Sprintf("failed to update pipeline %s ", key))
return err
}
}
return nil
}
func isDevOpsProjectAdminNamespace(namespace *v1.Namespace) bool {
_, ok := namespace.Labels[constants.DevOpsProjectLabelKey]
return ok && k8sutil.IsControlledBy(namespace.OwnerReferences,
devopsv1alpha3.ResourceKindDevOpsProject, "")
}

View File

@@ -1,423 +0,0 @@
/*
Copyright 2020 KubeSphere Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pipeline
import (
"reflect"
"testing"
"time"
v1 "k8s.io/api/core/v1"
"kubesphere.io/kubesphere/pkg/constants"
modelsdevops "kubesphere.io/kubesphere/pkg/models/devops"
fakeDevOps "kubesphere.io/kubesphere/pkg/simple/client/devops/fake"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
kubeinformers "k8s.io/client-go/informers"
k8sfake "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
devops "kubesphere.io/api/devops/v1alpha3"
"kubesphere.io/kubesphere/pkg/client/clientset/versioned/fake"
informers "kubesphere.io/kubesphere/pkg/client/informers/externalversions"
)
var (
alwaysReady = func() bool { return true }
noResyncPeriodFunc = func() time.Duration { return 0 }
)
type fixture struct {
t *testing.T
client *fake.Clientset
kubeclient *k8sfake.Clientset
namespaceLister []*v1.Namespace
pipelineLister []*devops.Pipeline
actions []core.Action
kubeactions []core.Action
kubeobjects []runtime.Object
// Objects from here preloaded into NewSimpleFake.
objects []runtime.Object
// Objects from here preloaded into devops
initDevOpsProject string
initPipeline []*devops.Pipeline
expectPipeline []*devops.Pipeline
}
func newFixture(t *testing.T) *fixture {
f := &fixture{}
f.t = t
f.objects = []runtime.Object{}
return f
}
func newNamespace(name string, projectName string) *v1.Namespace {
ns := &v1.Namespace{
TypeMeta: metav1.TypeMeta{
Kind: "Namespace",
APIVersion: v1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{constants.DevOpsProjectLabelKey: projectName},
},
}
TRUE := true
ns.ObjectMeta.OwnerReferences = []metav1.OwnerReference{
{
APIVersion: devops.SchemeGroupVersion.String(),
Kind: devops.ResourceKindDevOpsProject,
Name: projectName,
BlockOwnerDeletion: &TRUE,
Controller: &TRUE,
},
}
return ns
}
func newPipeline(namespace, name string, spec devops.PipelineSpec, withFinalizers bool, syncOk bool) *devops.Pipeline {
pipeline := &devops.Pipeline{
TypeMeta: metav1.TypeMeta{
Kind: devops.ResourceKindPipeline,
APIVersion: devops.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: name,
Annotations: map[string]string{},
},
Spec: spec,
Status: devops.PipelineStatus{},
}
if withFinalizers {
pipeline.Finalizers = append(pipeline.Finalizers, devops.PipelineFinalizerName)
}
if syncOk {
pipeline.Annotations[devops.PipelineSyncStatusAnnoKey] = modelsdevops.StatusSuccessful
}
return pipeline
}
func newDeletingPipeline(namespace, name string) *devops.Pipeline {
now := metav1.Now()
pipeline := &devops.Pipeline{
TypeMeta: metav1.TypeMeta{
Kind: devops.ResourceKindPipeline,
APIVersion: devops.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: name,
DeletionTimestamp: &now,
},
}
pipeline.Finalizers = append(pipeline.Finalizers, devops.PipelineFinalizerName)
return pipeline
}
func (f *fixture) newController() (*Controller, informers.SharedInformerFactory, kubeinformers.SharedInformerFactory, *fakeDevOps.Devops) {
f.client = fake.NewSimpleClientset(f.objects...)
f.kubeclient = k8sfake.NewSimpleClientset(f.kubeobjects...)
i := informers.NewSharedInformerFactory(f.client, noResyncPeriodFunc())
k8sI := kubeinformers.NewSharedInformerFactory(f.kubeclient, noResyncPeriodFunc())
dI := fakeDevOps.NewWithPipelines(f.initDevOpsProject, f.initPipeline...)
c := NewController(f.kubeclient, f.client, dI, k8sI.Core().V1().Namespaces(),
i.Devops().V1alpha3().Pipelines())
c.pipelineSynced = alwaysReady
c.eventRecorder = &record.FakeRecorder{}
for _, f := range f.pipelineLister {
i.Devops().V1alpha3().Pipelines().Informer().GetIndexer().Add(f)
}
for _, d := range f.namespaceLister {
k8sI.Core().V1().Namespaces().Informer().GetIndexer().Add(d)
}
return c, i, k8sI, dI
}
func (f *fixture) run(fooName string) {
f.runController(fooName, true, false)
}
func (f *fixture) runExpectError(fooName string) {
f.runController(fooName, true, true)
}
func (f *fixture) runController(projectName string, startInformers bool, expectError bool) {
c, i, k8sI, dI := f.newController()
if startInformers {
stopCh := make(chan struct{})
defer close(stopCh)
i.Start(stopCh)
k8sI.Start(stopCh)
}
err := c.syncHandler(projectName)
if !expectError && err != nil {
f.t.Errorf("error syncing foo: %v", err)
} else if expectError && err == nil {
f.t.Error("expected error syncing foo, got nil")
}
actions := filterInformerActions(f.client.Actions())
k8sActions := filterInformerActions(f.kubeclient.Actions())
for i, action := range k8sActions {
if len(f.kubeactions) < i+1 {
f.t.Errorf("%d unexpected actions: %+v", len(k8sActions)-len(f.kubeactions), k8sActions[i:])
break
}
expectedAction := f.kubeactions[i]
checkAction(expectedAction, action, f.t)
}
if len(f.kubeactions) > len(k8sActions) {
f.t.Errorf("%d additional expected actions:%+v", len(f.kubeactions)-len(k8sActions), f.kubeactions[len(k8sActions):])
}
if len(f.actions) > len(actions) {
f.t.Errorf("%d additional expected actions:%+v", len(f.actions)-len(actions), f.actions[len(actions):])
}
if len(dI.Pipelines[f.initDevOpsProject]) != len(f.expectPipeline) {
f.t.Errorf(" unexpected objects: %v", dI.Projects)
}
for _, pipeline := range f.expectPipeline {
actualPipeline := dI.Pipelines[f.initDevOpsProject][pipeline.Name]
if !reflect.DeepEqual(actualPipeline, pipeline) {
f.t.Errorf(" pipeline %+v not match %+v", pipeline, actualPipeline)
}
}
}
// checkAction verifies that expected and actual actions are equal and both have
// same attached resources
func checkAction(expected, actual core.Action, t *testing.T) {
if !(expected.Matches(actual.GetVerb(), actual.GetResource().Resource) && actual.GetSubresource() == expected.GetSubresource()) {
t.Errorf("Expected\n\t%#v\ngot\n\t%#v", expected, actual)
return
}
if reflect.TypeOf(actual) != reflect.TypeOf(expected) {
t.Errorf("Action has wrong type. Expected: %t. Got: %t", expected, actual)
return
}
switch a := actual.(type) {
case core.CreateActionImpl:
e, _ := expected.(core.CreateActionImpl)
expObject := e.GetObject()
object := a.GetObject()
if !reflect.DeepEqual(expObject, object) {
t.Errorf("Action %s %s has wrong object\nDiff:\n %s",
a.GetVerb(), a.GetResource().Resource, diff.ObjectGoPrintSideBySide(expObject, object))
}
case core.UpdateActionImpl:
e, _ := expected.(core.UpdateActionImpl)
expObject := e.GetObject()
object := a.GetObject()
if !reflect.DeepEqual(expObject, object) {
t.Errorf("Action %s %s has wrong object\nDiff:\n %s",
a.GetVerb(), a.GetResource().Resource, diff.ObjectGoPrintSideBySide(expObject, object))
}
case core.PatchActionImpl:
e, _ := expected.(core.PatchActionImpl)
expPatch := e.GetPatch()
patch := a.GetPatch()
if !reflect.DeepEqual(expPatch, patch) {
t.Errorf("Action %s %s has wrong patch\nDiff:\n %s",
a.GetVerb(), a.GetResource().Resource, diff.ObjectGoPrintSideBySide(expPatch, patch))
}
default:
t.Errorf("Uncaptured Action %s %s, you should explicitly add a case to capture it",
actual.GetVerb(), actual.GetResource().Resource)
}
}
// filterInformerActions filters list and watch actions for testing resources.
// Since list and watch don't change resource state we can filter it to lower
// nose level in our tests.
func filterInformerActions(actions []core.Action) []core.Action {
ret := []core.Action{}
for _, action := range actions {
if len(action.GetNamespace()) == 0 &&
(action.Matches("list", devops.ResourcePluralPipeline) ||
action.Matches("watch", devops.ResourcePluralPipeline) ||
action.Matches("list", "namespaces") ||
action.Matches("watch", "namespaces")) {
continue
}
ret = append(ret, action)
}
return ret
}
func (f *fixture) expectUpdatePipelineAction(p *devops.Pipeline) {
action := core.NewUpdateAction(schema.GroupVersionResource{
Version: devops.SchemeGroupVersion.Version,
Resource: devops.ResourcePluralPipeline,
Group: devops.SchemeGroupVersion.Group,
}, p.Namespace, p)
f.actions = append(f.actions, action)
}
func getKey(p *devops.Pipeline, t *testing.T) string {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(p)
if err != nil {
t.Errorf("Unexpected error getting key for pipeline %v: %v", p.Name, err)
return ""
}
return key
}
func TestDoNothing(t *testing.T) {
f := newFixture(t)
nsName := "test-123"
pipelineName := "test"
projectName := "test_project"
ns := newNamespace(nsName, projectName)
pipeline := newPipeline(nsName, pipelineName, devops.PipelineSpec{}, true, true)
f.pipelineLister = append(f.pipelineLister, pipeline)
f.namespaceLister = append(f.namespaceLister, ns)
f.objects = append(f.objects, pipeline)
f.initDevOpsProject = nsName
f.initPipeline = []*devops.Pipeline{pipeline}
f.expectPipeline = []*devops.Pipeline{pipeline}
f.run(getKey(pipeline, t))
}
func TestAddPipelineFinalizers(t *testing.T) {
f := newFixture(t)
nsName := "test-123"
pipelineName := "test"
projectName := "test_project"
ns := newNamespace(nsName, projectName)
pipeline := newPipeline(nsName, pipelineName, devops.PipelineSpec{}, false, false)
expectPipeline := newPipeline(nsName, pipelineName, devops.PipelineSpec{}, true, true)
f.pipelineLister = append(f.pipelineLister, pipeline)
f.namespaceLister = append(f.namespaceLister, ns)
f.objects = append(f.objects, pipeline)
f.initDevOpsProject = nsName
f.initPipeline = []*devops.Pipeline{pipeline}
f.expectPipeline = []*devops.Pipeline{pipeline}
f.expectUpdatePipelineAction(expectPipeline)
f.run(getKey(pipeline, t))
}
func TestCreatePipeline(t *testing.T) {
f := newFixture(t)
nsName := "test-123"
pipelineName := "test"
projectName := "test_project"
ns := newNamespace(nsName, projectName)
pipeline := newPipeline(nsName, pipelineName, devops.PipelineSpec{}, false, false)
expectPipeline := newPipeline(nsName, pipelineName, devops.PipelineSpec{}, true, true)
f.pipelineLister = append(f.pipelineLister, pipeline)
f.namespaceLister = append(f.namespaceLister, ns)
f.objects = append(f.objects, pipeline)
f.initDevOpsProject = nsName
f.expectPipeline = []*devops.Pipeline{expectPipeline}
f.run(getKey(pipeline, t))
}
func TestDeletePipeline(t *testing.T) {
f := newFixture(t)
nsName := "test-123"
pipelineName := "test"
projectName := "test_project"
ns := newNamespace(nsName, projectName)
pipeline := newDeletingPipeline(nsName, pipelineName)
expectPipeline := pipeline.DeepCopy()
expectPipeline.Finalizers = []string{}
f.pipelineLister = append(f.pipelineLister, pipeline)
f.namespaceLister = append(f.namespaceLister, ns)
f.objects = append(f.objects, pipeline)
f.initDevOpsProject = nsName
f.initPipeline = []*devops.Pipeline{pipeline}
f.expectPipeline = []*devops.Pipeline{}
f.expectUpdatePipelineAction(expectPipeline)
f.run(getKey(pipeline, t))
}
func TestDeleteNotExistPipeline(t *testing.T) {
f := newFixture(t)
nsName := "test-123"
pipelineName := "test"
projectName := "test_project"
ns := newNamespace(nsName, projectName)
pipeline := newDeletingPipeline(nsName, pipelineName)
expectPipeline := pipeline.DeepCopy()
expectPipeline.Finalizers = []string{}
f.pipelineLister = append(f.pipelineLister, pipeline)
f.namespaceLister = append(f.namespaceLister, ns)
f.objects = append(f.objects, pipeline)
f.initDevOpsProject = nsName
f.initPipeline = []*devops.Pipeline{}
f.expectPipeline = []*devops.Pipeline{}
f.expectUpdatePipelineAction(expectPipeline)
f.run(getKey(pipeline, t))
}
func TestUpdatePipelineConfig(t *testing.T) {
f := newFixture(t)
nsName := "test-123"
pipelineName := "test"
projectName := "test_project"
ns := newNamespace(nsName, projectName)
initPipeline := newPipeline(nsName, pipelineName, devops.PipelineSpec{}, true, false)
modifiedPipeline := newPipeline(nsName, pipelineName, devops.PipelineSpec{Type: "aa"}, true, false)
expectPipeline := newPipeline(nsName, pipelineName, devops.PipelineSpec{Type: "aa"}, true, true)
f.pipelineLister = append(f.pipelineLister, modifiedPipeline)
f.namespaceLister = append(f.namespaceLister, ns)
f.objects = append(f.objects, modifiedPipeline)
f.initDevOpsProject = nsName
f.initPipeline = []*devops.Pipeline{initPipeline}
f.expectPipeline = []*devops.Pipeline{expectPipeline}
f.run(getKey(modifiedPipeline, t))
}

View File

@@ -1,13 +0,0 @@
approvers:
- shaowenchen
- linuxsuren
reviewers:
- runzexia
- soulseen
- shaowenchen
- linuxsuren
labels:
- area/controller
- area/devops

View File

@@ -1,242 +0,0 @@
/*
Copyright 2020 KubeSphere Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package s2ibinary
import (
"context"
"fmt"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/simple/client/s3"
"kubesphere.io/kubesphere/pkg/utils/sliceutil"
devopsv1alpha1 "kubesphere.io/api/devops/v1alpha1"
devopsclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
devopsinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/devops/v1alpha1"
devopslisters "kubesphere.io/kubesphere/pkg/client/listers/devops/v1alpha1"
)
/**
s2ibinary-controller used to handle s2ibinary's delete logic.
s2ibinary creation and file upload provided by kubesphere/kapis
*/
type Controller struct {
client clientset.Interface
devopsClient devopsclient.Interface
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
s2iBinaryLister devopslisters.S2iBinaryLister
s2iBinarySynced cache.InformerSynced
workqueue workqueue.RateLimitingInterface
workerLoopPeriod time.Duration
s3Client s3.Interface
}
func NewController(client clientset.Interface,
devopsclientset devopsclient.Interface,
s2ibinInformer devopsinformers.S2iBinaryInformer,
s3Client s3.Interface) *Controller {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(func(format string, args ...interface{}) {
klog.Info(fmt.Sprintf(format, args))
})
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "s2ibinary-controller"})
v := &Controller{
client: client,
devopsClient: devopsclientset,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "s2ibinary"),
s2iBinaryLister: s2ibinInformer.Lister(),
s2iBinarySynced: s2ibinInformer.Informer().HasSynced,
workerLoopPeriod: time.Second,
s3Client: s3Client,
}
v.eventBroadcaster = broadcaster
v.eventRecorder = recorder
s2ibinInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: v.enqueueS2iBinary,
UpdateFunc: func(oldObj, newObj interface{}) {
old := oldObj.(*devopsv1alpha1.S2iBinary)
new := newObj.(*devopsv1alpha1.S2iBinary)
if old.ResourceVersion == new.ResourceVersion {
return
}
v.enqueueS2iBinary(newObj)
},
DeleteFunc: v.enqueueS2iBinary,
})
return v
}
// enqueueS2iBinary takes a Foo resource and converts it into a namespace/name
// string which is then put onto the work workqueue. This method should *not* be
// passed resources of any type other than S2iBinary.
func (c *Controller) enqueueS2iBinary(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(key)
}
func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer c.workqueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.syncHandler(key); err != nil {
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.workqueue.Forget(obj)
klog.V(5).Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
klog.Error(err, "could not reconcile s2ibinary")
utilruntime.HandleError(err)
return true
}
return true
}
func (c *Controller) worker() {
for c.processNextWorkItem() {
}
}
func (c *Controller) Start(stopCh <-chan struct{}) error {
return c.Run(1, stopCh)
}
func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
klog.Info("starting s2ibinary controller")
defer klog.Info("shutting down s2ibinary controller")
if !cache.WaitForCacheSync(stopCh, c.s2iBinarySynced) {
return fmt.Errorf("failed to wait for caches to sync")
}
for i := 0; i < workers; i++ {
go wait.Until(c.worker, c.workerLoopPeriod, stopCh)
}
<-stopCh
return nil
}
// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Foo resource
// with the current status of the resource.
func (c *Controller) syncHandler(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.Error(err, fmt.Sprintf("could not split s2ibin meta %s ", key))
return nil
}
s2ibin, err := c.s2iBinaryLister.S2iBinaries(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
klog.Info(fmt.Sprintf("s2ibin '%s' in work queue no longer exists ", key))
return nil
}
klog.Error(err, fmt.Sprintf("could not get s2ibin %s ", key))
return err
}
if s2ibin.ObjectMeta.DeletionTimestamp.IsZero() {
if !sliceutil.HasString(s2ibin.ObjectMeta.Finalizers, devopsv1alpha1.S2iBinaryFinalizerName) {
s2ibin.ObjectMeta.Finalizers = append(s2ibin.ObjectMeta.Finalizers, devopsv1alpha1.S2iBinaryFinalizerName)
_, err := c.devopsClient.DevopsV1alpha1().S2iBinaries(namespace).Update(context.Background(), s2ibin, metav1.UpdateOptions{})
if err != nil {
klog.Error(err, fmt.Sprintf("failed to update s2ibin %s ", key))
return err
}
}
} else {
if sliceutil.HasString(s2ibin.ObjectMeta.Finalizers, devopsv1alpha1.S2iBinaryFinalizerName) {
if err := c.deleteBinaryInS3(s2ibin); err != nil {
klog.Error(err, fmt.Sprintf("failed to delete resource %s in s3", key))
return err
}
s2ibin.ObjectMeta.Finalizers = sliceutil.RemoveString(s2ibin.ObjectMeta.Finalizers, func(item string) bool {
return item == devopsv1alpha1.S2iBinaryFinalizerName
})
_, err := c.devopsClient.DevopsV1alpha1().S2iBinaries(namespace).Update(context.Background(), s2ibin, metav1.UpdateOptions{})
if err != nil {
klog.Error(err, fmt.Sprintf("failed to update s2ibin %s ", key))
return err
}
}
}
return nil
}
func (c *Controller) deleteBinaryInS3(s2ibin *devopsv1alpha1.S2iBinary) error {
key := fmt.Sprintf("%s-%s", s2ibin.Namespace, s2ibin.Name)
err := c.s3Client.Delete(key)
if err != nil {
klog.Errorf("error happened while deleting %s, %v", key, err)
}
return nil
}

View File

@@ -1,254 +0,0 @@
/*
Copyright 2020 KubeSphere Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package s2ibinary
import (
"reflect"
"testing"
"time"
fakes3 "kubesphere.io/kubesphere/pkg/simple/client/s3/fake"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
k8sfake "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
s2i "kubesphere.io/api/devops/v1alpha1"
"kubesphere.io/kubesphere/pkg/client/clientset/versioned/fake"
informers "kubesphere.io/kubesphere/pkg/client/informers/externalversions"
)
var (
alwaysReady = func() bool { return true }
noResyncPeriodFunc = func() time.Duration { return 0 }
)
type fixture struct {
t *testing.T
client *fake.Clientset
kubeclient *k8sfake.Clientset
// Objects to put in the store.
s2ibinaryLister []*s2i.S2iBinary
actions []core.Action
// Objects from here preloaded into NewSimpleFake.
objects []runtime.Object
// Objects from here preloaded into s3
initS3Objects []*fakes3.Object
expectS3Objects []*fakes3.Object
}
func newFixture(t *testing.T) *fixture {
f := &fixture{}
f.t = t
f.objects = []runtime.Object{}
return f
}
func newS2iBinary(name string, spec s2i.S2iBinarySpec) *s2i.S2iBinary {
return &s2i.S2iBinary{
TypeMeta: metav1.TypeMeta{APIVersion: s2i.SchemeGroupVersion.String()},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: metav1.NamespaceDefault,
},
Spec: spec,
}
}
func newDeletingS2iBinary(name string) *s2i.S2iBinary {
deleteTime := metav1.Now()
return &s2i.S2iBinary{
TypeMeta: metav1.TypeMeta{APIVersion: s2i.SchemeGroupVersion.String()},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: metav1.NamespaceDefault,
Finalizers: []string{s2i.S2iBinaryFinalizerName},
DeletionTimestamp: &deleteTime,
},
}
}
func (f *fixture) newController() (*Controller, informers.SharedInformerFactory, *fakes3.FakeS3) {
f.client = fake.NewSimpleClientset(f.objects...)
f.kubeclient = k8sfake.NewSimpleClientset()
i := informers.NewSharedInformerFactory(f.client, noResyncPeriodFunc())
s3I := fakes3.NewFakeS3(f.expectS3Objects...)
c := NewController(f.kubeclient, f.client, i.Devops().V1alpha1().S2iBinaries(), s3I)
c.s2iBinarySynced = alwaysReady
c.eventRecorder = &record.FakeRecorder{}
for _, f := range f.s2ibinaryLister {
i.Devops().V1alpha1().S2iBinaries().Informer().GetIndexer().Add(f)
}
return c, i, s3I
}
func (f *fixture) run(fooName string) {
f.runController(fooName, true, false)
}
func (f *fixture) runExpectError(fooName string) {
f.runController(fooName, true, true)
}
func (f *fixture) runController(s2iBinaryName string, startInformers bool, expectError bool) {
c, i, s3I := f.newController()
if startInformers {
stopCh := make(chan struct{})
defer close(stopCh)
i.Start(stopCh)
}
err := c.syncHandler(s2iBinaryName)
if !expectError && err != nil {
f.t.Errorf("error syncing foo: %v", err)
} else if expectError && err == nil {
f.t.Error("expected error syncing foo, got nil")
}
actions := filterInformerActions(f.client.Actions())
for i, action := range actions {
if len(f.actions) < i+1 {
f.t.Errorf("%d unexpected actions: %+v", len(actions)-len(f.actions), actions[i:])
break
}
expectedAction := f.actions[i]
checkAction(expectedAction, action, f.t)
}
if len(f.actions) > len(actions) {
f.t.Errorf("%d additional expected actions:%+v", len(f.actions)-len(actions), f.actions[len(actions):])
}
if len(s3I.Storage) != len(f.expectS3Objects) {
f.t.Errorf(" unexpected objects: %v", s3I.Storage)
}
}
// checkAction verifies that expected and actual actions are equal and both have
// same attached resources
func checkAction(expected, actual core.Action, t *testing.T) {
if !(expected.Matches(actual.GetVerb(), actual.GetResource().Resource) && actual.GetSubresource() == expected.GetSubresource()) {
t.Errorf("Expected\n\t%#v\ngot\n\t%#v", expected, actual)
return
}
if reflect.TypeOf(actual) != reflect.TypeOf(expected) {
t.Errorf("Action has wrong type. Expected: %t. Got: %t", expected, actual)
return
}
switch a := actual.(type) {
case core.CreateActionImpl:
e, _ := expected.(core.CreateActionImpl)
expObject := e.GetObject()
object := a.GetObject()
if !reflect.DeepEqual(expObject, object) {
t.Errorf("Action %s %s has wrong object\nDiff:\n %s",
a.GetVerb(), a.GetResource().Resource, diff.ObjectGoPrintSideBySide(expObject, object))
}
case core.UpdateActionImpl:
e, _ := expected.(core.UpdateActionImpl)
expObject := e.GetObject()
object := a.GetObject()
if !reflect.DeepEqual(expObject, object) {
t.Errorf("Action %s %s has wrong object\nDiff:\n %s",
a.GetVerb(), a.GetResource().Resource, diff.ObjectGoPrintSideBySide(expObject, object))
}
case core.PatchActionImpl:
e, _ := expected.(core.PatchActionImpl)
expPatch := e.GetPatch()
patch := a.GetPatch()
if !reflect.DeepEqual(expPatch, patch) {
t.Errorf("Action %s %s has wrong patch\nDiff:\n %s",
a.GetVerb(), a.GetResource().Resource, diff.ObjectGoPrintSideBySide(expPatch, patch))
}
default:
t.Errorf("Uncaptured Action %s %s, you should explicitly add a case to capture it",
actual.GetVerb(), actual.GetResource().Resource)
}
}
// filterInformerActions filters list and watch actions for testing resources.
// Since list and watch don't change resource state we can filter it to lower
// nose level in our tests.
func filterInformerActions(actions []core.Action) []core.Action {
ret := []core.Action{}
for _, action := range actions {
if len(action.GetNamespace()) == 0 &&
(action.Matches("list", s2i.ResourcePluralS2iBinary) ||
action.Matches("watch", s2i.ResourcePluralS2iBinary)) {
continue
}
ret = append(ret, action)
}
return ret
}
func (f *fixture) expectUpdateS2iBinaryAction(s2ibinary *s2i.S2iBinary) {
action := core.NewUpdateAction(schema.GroupVersionResource{Resource: s2i.ResourcePluralS2iBinary}, s2ibinary.Namespace, s2ibinary)
f.actions = append(f.actions, action)
}
func getKey(s2ibinary *s2i.S2iBinary, t *testing.T) string {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(s2ibinary)
if err != nil {
t.Errorf("Unexpected error getting key for s2ibinary %v: %v", s2ibinary.Name, err)
return ""
}
return key
}
func TestDoNothing(t *testing.T) {
f := newFixture(t)
s2iBinary := newS2iBinary("test", s2i.S2iBinarySpec{})
f.s2ibinaryLister = append(f.s2ibinaryLister, s2iBinary)
f.objects = append(f.objects, s2iBinary)
f.expectUpdateS2iBinaryAction(s2iBinary)
f.run(getKey(s2iBinary, t))
}
func TestDeleteS3Object(t *testing.T) {
f := newFixture(t)
s2iBinary := newDeletingS2iBinary("test")
f.s2ibinaryLister = append(f.s2ibinaryLister, s2iBinary)
f.objects = append(f.objects, s2iBinary)
f.initS3Objects = []*fakes3.Object{{
Key: "default-test",
}}
f.expectS3Objects = []*fakes3.Object{}
f.expectUpdateS2iBinaryAction(s2iBinary)
f.run(getKey(s2iBinary, t))
}

View File

@@ -1,13 +0,0 @@
approvers:
- shaowenchen
- linuxsuren
reviewers:
- runzexia
- soulseen
- shaowenchen
- linuxsuren
labels:
- area/controller
- area/devops

View File

@@ -1,304 +0,0 @@
/*
Copyright 2020 KubeSphere Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package s2irun
import (
"context"
"fmt"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/utils/sliceutil"
devopsv1alpha1 "kubesphere.io/api/devops/v1alpha1"
devopsclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
devopsinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/devops/v1alpha1"
devopslisters "kubesphere.io/kubesphere/pkg/client/listers/devops/v1alpha1"
)
/**
s2irun-controller used to handle s2irun's delete logic.
s2irun creation and operation provided by s2ioperator
*/
type Controller struct {
client clientset.Interface
devopsClient devopsclient.Interface
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
s2iRunLister devopslisters.S2iRunLister
s2iRunSynced cache.InformerSynced
s2iBinaryLister devopslisters.S2iBinaryLister
s2iBinarySynced cache.InformerSynced
workqueue workqueue.RateLimitingInterface
workerLoopPeriod time.Duration
}
func NewS2iRunController(
client clientset.Interface,
devopsClientSet devopsclient.Interface,
s2iBinInformer devopsinformers.S2iBinaryInformer,
s2iRunInformer devopsinformers.S2iRunInformer) *Controller {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(func(format string, args ...interface{}) {
klog.Info(fmt.Sprintf(format, args))
})
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "s2irun-controller"})
v := &Controller{
client: client,
devopsClient: devopsClientSet,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "s2irun"),
s2iBinaryLister: s2iBinInformer.Lister(),
s2iBinarySynced: s2iBinInformer.Informer().HasSynced,
s2iRunLister: s2iRunInformer.Lister(),
s2iRunSynced: s2iRunInformer.Informer().HasSynced,
workerLoopPeriod: time.Second,
}
v.eventBroadcaster = broadcaster
v.eventRecorder = recorder
s2iRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: v.enqueueS2iRun,
UpdateFunc: func(oldObj, newObj interface{}) {
old := oldObj.(*devopsv1alpha1.S2iRun)
new := newObj.(*devopsv1alpha1.S2iRun)
if old.ResourceVersion == new.ResourceVersion {
return
}
v.enqueueS2iRun(newObj)
},
DeleteFunc: v.enqueueS2iRun,
})
return v
}
// enqueueFoo takes a Foo resource and converts it into a namespace/name
// string which is then put onto the work workqueue. This method should *not* be
// passed resources of any type other than Foo.
func (c Controller) enqueueS2iRun(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(key)
}
func (c Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer c.workqueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.syncHandler(key); err != nil {
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.workqueue.Forget(obj)
klog.V(5).Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
klog.Error(err, "could not reconcile s2irun")
utilruntime.HandleError(err)
return true
}
return true
}
func (c Controller) worker() {
for c.processNextWorkItem() {
}
}
func (c Controller) Start(stopCh <-chan struct{}) error {
return c.Run(1, stopCh)
}
func (c Controller) Run(workers int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
klog.Info("starting s2irun controller")
defer klog.Info("shutting down s2irun controller")
if !cache.WaitForCacheSync(stopCh, c.s2iBinarySynced) {
return fmt.Errorf("failed to wait for caches to sync")
}
for i := 0; i < workers; i++ {
go wait.Until(c.worker, c.workerLoopPeriod, stopCh)
}
<-stopCh
return nil
}
// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Foo resource
// with the current status of the resource.
func (c Controller) syncHandler(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.Error(err, fmt.Sprintf("could not split s2irun meta %s ", key))
return nil
}
s2irun, err := c.s2iRunLister.S2iRuns(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
klog.Info(fmt.Sprintf("s2irun '%s' in work queue no longer exists ", key))
return nil
}
klog.Error(err, fmt.Sprintf("could not get s2irun %s ", key))
return err
}
if s2irun.Labels != nil {
_, ok := s2irun.Labels[devopsv1alpha1.S2iBinaryLabelKey]
if ok {
if s2irun.ObjectMeta.DeletionTimestamp.IsZero() {
if !sliceutil.HasString(s2irun.ObjectMeta.Finalizers, devopsv1alpha1.S2iBinaryFinalizerName) {
s2irun.ObjectMeta.Finalizers = append(s2irun.ObjectMeta.Finalizers, devopsv1alpha1.S2iBinaryFinalizerName)
_, err = c.devopsClient.DevopsV1alpha1().S2iRuns(namespace).Update(context.Background(), s2irun, metav1.UpdateOptions{})
if err != nil {
klog.Error(err, fmt.Sprintf("failed to update s2irun %s", key))
return err
}
}
} else {
if sliceutil.HasString(s2irun.ObjectMeta.Finalizers, devopsv1alpha1.S2iBinaryFinalizerName) {
if err := c.DeleteS2iBinary(s2irun); err != nil {
klog.Error(err, fmt.Sprintf("failed to delete s2ibin %s in", key))
return err
}
s2irun.ObjectMeta.Finalizers = sliceutil.RemoveString(s2irun.ObjectMeta.Finalizers, func(item string) bool {
return item == devopsv1alpha1.S2iBinaryFinalizerName
})
_, err = c.devopsClient.DevopsV1alpha1().S2iRuns(namespace).Update(context.Background(), s2irun, metav1.UpdateOptions{})
if err != nil {
klog.Error(err, fmt.Sprintf("failed to update s2irun %s ", key))
return err
}
}
}
}
}
return nil
}
/**
DeleteS2iBinary mainly cleans up two parts of S2iBinary
1. s2ibinary bound to s2irun
2. s2ibinary that has been created for more than 24 hours but has not been used
*/
func (c Controller) DeleteS2iBinary(s2irun *devopsv1alpha1.S2iRun) error {
s2iBinName := s2irun.Labels[devopsv1alpha1.S2iBinaryLabelKey]
s2iBin, err := c.s2iBinaryLister.S2iBinaries(s2irun.Namespace).Get(s2iBinName)
if err != nil {
if errors.IsNotFound(err) {
klog.Info(fmt.Sprintf("s2ibin '%s/%s' has been delted ", s2irun.Namespace, s2iBinName))
return nil
}
klog.Error(err, fmt.Sprintf("failed to get s2ibin %s/%s ", s2irun.Namespace, s2iBinName))
return err
}
err = c.devopsClient.DevopsV1alpha1().S2iBinaries(s2iBin.Namespace).Delete(context.Background(), s2iBinName, metav1.DeleteOptions{})
if err != nil {
if errors.IsNotFound(err) {
klog.Info(fmt.Sprintf("s2ibin '%s/%s' has been delted ", s2irun.Namespace, s2iBinName))
return nil
}
klog.Error(err, fmt.Sprintf("failed to delete s2ibin %s/%s ", s2irun.Namespace, s2iBinName))
return err
}
if err = c.cleanOtherS2iBinary(s2irun.Namespace); err != nil {
klog.Error(err, fmt.Sprintf("failed to clean s2ibinary in %s", s2irun.Namespace))
}
return nil
}
// cleanOtherS2iBinary clean up s2ibinary created for more than 24 hours without associated s2irun
func (c Controller) cleanOtherS2iBinary(namespace string) error {
s2iBins, err := c.s2iBinaryLister.S2iBinaries(namespace).List(labels.Everything())
if err != nil {
klog.Error(err, fmt.Sprintf("failed to list s2ibin in %s ", namespace))
return err
}
now := time.Now()
dayBefore := metav1.NewTime(now.Add(time.Hour * 24 * -1))
for _, s2iBin := range s2iBins {
if s2iBin.Status.Phase != devopsv1alpha1.StatusReady && s2iBin.CreationTimestamp.Before(&dayBefore) {
runs, err := c.s2iRunLister.S2iRuns(namespace).List(labels.SelectorFromSet(labels.Set{devopsv1alpha1.S2iBinaryLabelKey: s2iBin.Name}))
if err != nil {
klog.Error(err, fmt.Sprintf("failed to list s2irun in %s ", namespace))
return err
}
if len(runs) == 0 {
err = c.devopsClient.DevopsV1alpha1().S2iBinaries(namespace).Delete(context.Background(), s2iBin.Name, metav1.DeleteOptions{})
if err != nil {
if errors.IsNotFound(err) {
klog.Info(fmt.Sprintf("s2ibin '%s/%s' has been deleted ", namespace, s2iBin.Name))
return nil
}
klog.Error(err, fmt.Sprintf("failed to delete s2ibin %s/%s ", namespace, s2iBin.Name))
return err
}
}
}
}
return nil
}

View File

@@ -1,323 +0,0 @@
/*
Copyright 2020 KubeSphere Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package s2irun
import (
"reflect"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
k8sfake "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
s2i "kubesphere.io/api/devops/v1alpha1"
"kubesphere.io/kubesphere/pkg/client/clientset/versioned/fake"
informers "kubesphere.io/kubesphere/pkg/client/informers/externalversions"
)
var (
alwaysReady = func() bool { return true }
noResyncPeriodFunc = func() time.Duration { return 0 }
)
type fixture struct {
t *testing.T
client *fake.Clientset
kubeclient *k8sfake.Clientset
// Objects to put in the store.
s2ibinaryLister []*s2i.S2iBinary
s2irunLister []*s2i.S2iRun
actions []core.Action
// Objects from here preloaded into NewSimpleFake.
objects []runtime.Object
}
func newFixture(t *testing.T) *fixture {
f := &fixture{}
f.t = t
f.objects = []runtime.Object{}
return f
}
func newS2iBinary(name string) *s2i.S2iBinary {
return &s2i.S2iBinary{
TypeMeta: metav1.TypeMeta{APIVersion: s2i.SchemeGroupVersion.String()},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: metav1.NamespaceDefault,
},
Spec: s2i.S2iBinarySpec{},
}
}
func newS2iBinaryWithCreateTime(name string, createTime metav1.Time) *s2i.S2iBinary {
return &s2i.S2iBinary{
TypeMeta: metav1.TypeMeta{APIVersion: s2i.SchemeGroupVersion.String()},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: metav1.NamespaceDefault,
CreationTimestamp: createTime,
},
Spec: s2i.S2iBinarySpec{},
}
}
func newS2iRun(name string, s2iBinaryName string) *s2i.S2iRun {
return &s2i.S2iRun{
TypeMeta: metav1.TypeMeta{APIVersion: s2i.SchemeGroupVersion.String()},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: metav1.NamespaceDefault,
Labels: map[string]string{
s2i.S2iBinaryLabelKey: s2iBinaryName,
},
},
}
}
func newDeletetingS2iRun(name string, s2iBinaryName string) *s2i.S2iRun {
now := metav1.Now()
return &s2i.S2iRun{
TypeMeta: metav1.TypeMeta{APIVersion: s2i.SchemeGroupVersion.String()},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: metav1.NamespaceDefault,
Labels: map[string]string{
s2i.S2iBinaryLabelKey: s2iBinaryName,
},
Finalizers: []string{s2i.S2iBinaryFinalizerName},
DeletionTimestamp: &now,
},
}
}
func (f *fixture) newController() (*Controller, informers.SharedInformerFactory) {
f.client = fake.NewSimpleClientset(f.objects...)
f.kubeclient = k8sfake.NewSimpleClientset()
i := informers.NewSharedInformerFactory(f.client, noResyncPeriodFunc())
c := NewS2iRunController(f.kubeclient, f.client,
i.Devops().V1alpha1().S2iBinaries(), i.Devops().V1alpha1().S2iRuns())
c.s2iBinarySynced = alwaysReady
c.eventRecorder = &record.FakeRecorder{}
for _, f := range f.s2ibinaryLister {
i.Devops().V1alpha1().S2iBinaries().Informer().GetIndexer().Add(f)
}
for _, f := range f.s2irunLister {
i.Devops().V1alpha1().S2iRuns().Informer().GetIndexer().Add(f)
}
return c, i
}
func (f *fixture) run(fooName string) {
f.runController(fooName, true, false)
}
func (f *fixture) runExpectError(fooName string) {
f.runController(fooName, true, true)
}
func (f *fixture) runController(s2iRunName string, startInformers bool, expectError bool) {
c, i := f.newController()
if startInformers {
stopCh := make(chan struct{})
defer close(stopCh)
i.Start(stopCh)
}
err := c.syncHandler(s2iRunName)
if !expectError && err != nil {
f.t.Errorf("error syncing foo: %v", err)
} else if expectError && err == nil {
f.t.Error("expected error syncing foo, got nil")
}
actions := filterInformerActions(f.client.Actions())
for i, action := range actions {
if len(f.actions) < i+1 {
f.t.Errorf("%d unexpected actions: %+v", len(actions)-len(f.actions), actions[i:])
break
}
expectedAction := f.actions[i]
checkAction(expectedAction, action, f.t)
}
if len(f.actions) > len(actions) {
f.t.Errorf("%d additional expected actions:%+v", len(f.actions)-len(actions), f.actions[len(actions):])
}
}
// checkAction verifies that expected and actual actions are equal and both have
// same attached resources
func checkAction(expected, actual core.Action, t *testing.T) {
if !(expected.Matches(actual.GetVerb(), actual.GetResource().Resource) && actual.GetSubresource() == expected.GetSubresource()) {
t.Errorf("Expected\n\t%#v\ngot\n\t%#v", expected, actual)
return
}
if reflect.TypeOf(actual) != reflect.TypeOf(expected) {
t.Errorf("Action has wrong type. Expected: %t. Got: %t", expected, actual)
return
}
switch a := actual.(type) {
case core.CreateActionImpl:
e, _ := expected.(core.CreateActionImpl)
expObject := e.GetObject()
object := a.GetObject()
if !reflect.DeepEqual(expObject, object) {
t.Errorf("Action %s %s has wrong object\nDiff:\n %s",
a.GetVerb(), a.GetResource().Resource, diff.ObjectGoPrintSideBySide(expObject, object))
}
case core.UpdateActionImpl:
e, _ := expected.(core.UpdateActionImpl)
expObject := e.GetObject()
object := a.GetObject()
if !reflect.DeepEqual(expObject, object) {
t.Errorf("Action %s %s has wrong object\nDiff:\n %s",
a.GetVerb(), a.GetResource().Resource, diff.ObjectGoPrintSideBySide(expObject, object))
}
case core.PatchActionImpl:
e, _ := expected.(core.PatchActionImpl)
expPatch := e.GetPatch()
patch := a.GetPatch()
if !reflect.DeepEqual(expPatch, patch) {
t.Errorf("Action %s %s has wrong patch\nDiff:\n %s",
a.GetVerb(), a.GetResource().Resource, diff.ObjectGoPrintSideBySide(expPatch, patch))
}
case core.DeleteActionImpl:
e, _ := expected.(core.DeleteActionImpl)
expName := e.GetName()
objectName := a.GetName()
if !reflect.DeepEqual(expName, objectName) {
t.Errorf("Action %s %s has wrong object\nDiff:\n %s",
a.GetVerb(), a.GetResource().Resource, diff.ObjectGoPrintSideBySide(expName, objectName))
}
expNamespace := e.GetNamespace()
objectNamespace := a.GetNamespace()
if !reflect.DeepEqual(expNamespace, objectNamespace) {
t.Errorf("Action %s %s has wrong object\nDiff:\n %s",
a.GetVerb(), a.GetResource().Resource, diff.ObjectGoPrintSideBySide(expNamespace, objectNamespace))
}
default:
t.Errorf("Uncaptured Action %s %s, you should explicitly add a case to capture it",
actual.GetVerb(), actual.GetResource().Resource)
}
}
// filterInformerActions filters list and watch actions for testing resources.
// Since list and watch don't change resource state we can filter it to lower
// nose level in our tests.
func filterInformerActions(actions []core.Action) []core.Action {
ret := []core.Action{}
for _, action := range actions {
if len(action.GetNamespace()) == 0 &&
(action.Matches("list", s2i.ResourcePluralS2iRun) ||
action.Matches("watch", s2i.ResourcePluralS2iRun) ||
action.Matches("list", s2i.ResourcePluralS2iBinary) ||
action.Matches("watch", s2i.ResourcePluralS2iBinary)) {
continue
}
ret = append(ret, action)
}
return ret
}
func (f *fixture) expectUpdateS2iRunAction(s2iRun *s2i.S2iRun) {
action := core.NewUpdateAction(schema.GroupVersionResource{Resource: s2i.ResourcePluralS2iRun}, s2iRun.Namespace, s2iRun)
f.actions = append(f.actions, action)
}
func (f *fixture) expectDeleteS2iBinaryAction(s2iBinary *s2i.S2iBinary) {
action := core.NewDeleteAction(schema.GroupVersionResource{Resource: s2i.ResourcePluralS2iBinary}, s2iBinary.Namespace, s2iBinary.Name)
f.actions = append(f.actions, action)
}
func getKey(s2i *s2i.S2iRun, t *testing.T) string {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(s2i)
if err != nil {
t.Errorf("Unexpected error getting key for s2i %v: %v", s2i.Name, err)
return ""
}
return key
}
func TestDoNothing(t *testing.T) {
f := newFixture(t)
s2iBinary := newS2iBinary("test")
s2iRun := newS2iRun("test", s2iBinary.Name)
f.s2ibinaryLister = append(f.s2ibinaryLister, s2iBinary)
f.s2irunLister = append(f.s2irunLister, s2iRun)
f.objects = append(f.objects, s2iBinary)
f.objects = append(f.objects, s2iRun)
f.expectUpdateS2iRunAction(s2iRun)
f.run(getKey(s2iRun, t))
}
func TestDeleteS2iBinary(t *testing.T) {
f := newFixture(t)
s2iBinary := newS2iBinary("test")
s2iRun := newDeletetingS2iRun("test", s2iBinary.Name)
f.s2ibinaryLister = append(f.s2ibinaryLister, s2iBinary)
f.s2irunLister = append(f.s2irunLister, s2iRun)
f.objects = append(f.objects, s2iBinary)
f.objects = append(f.objects, s2iRun)
f.expectDeleteS2iBinaryAction(s2iBinary)
f.expectUpdateS2iRunAction(s2iRun)
f.run(getKey(s2iRun, t))
}
func TestDeleteOtherS2iBinary(t *testing.T) {
f := newFixture(t)
s2iBinary := newS2iBinary("test")
s2iRun := newDeletetingS2iRun("test", s2iBinary.Name)
otherS2iBinary := newS2iBinaryWithCreateTime("test2", metav1.NewTime(time.Date(2000, 1, 1, 12, 0, 0, 0, time.UTC)))
f.s2ibinaryLister = append(f.s2ibinaryLister, s2iBinary)
f.s2ibinaryLister = append(f.s2ibinaryLister, otherS2iBinary)
f.s2irunLister = append(f.s2irunLister, s2iRun)
f.objects = append(f.objects, s2iBinary)
f.objects = append(f.objects, s2iRun)
f.objects = append(f.objects, otherS2iBinary)
f.expectDeleteS2iBinaryAction(s2iBinary)
f.expectDeleteS2iBinaryAction(otherS2iBinary)
f.expectUpdateS2iRunAction(s2iRun)
f.run(getKey(s2iRun, t))
}