application controller will only reconcile applications matched with given label selector
Signed-off-by: Jeff <jeffzhang@yunify.com>
This commit is contained in:
@@ -18,45 +18,239 @@ package application
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
v1 "k8s.io/api/apps/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
v1beta12 "k8s.io/api/networking/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/equality"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/tools/record"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/klog"
|
||||
servicemeshv1alpha2 "kubesphere.io/kubesphere/pkg/apis/servicemesh/v1alpha2"
|
||||
"kubesphere.io/kubesphere/pkg/controller/utils/servicemesh"
|
||||
"sigs.k8s.io/application/api/v1beta1"
|
||||
appv1beta1 "sigs.k8s.io/application/api/v1beta1"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller"
|
||||
"sigs.k8s.io/controller-runtime/pkg/event"
|
||||
"sigs.k8s.io/controller-runtime/pkg/handler"
|
||||
"sigs.k8s.io/controller-runtime/pkg/manager"
|
||||
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
"sigs.k8s.io/controller-runtime/pkg/source"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Add creates a new Application Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
|
||||
// and Start it when the Manager is Started.
|
||||
func Add(mgr manager.Manager) error {
|
||||
return add(mgr, newReconciler(mgr))
|
||||
// ApplicationReconciler reconciles a Application object
|
||||
type ApplicationReconciler struct {
|
||||
client.Client
|
||||
Mapper meta.RESTMapper
|
||||
Scheme *runtime.Scheme
|
||||
ApplicationSelector labels.Selector //
|
||||
}
|
||||
|
||||
// newReconciler returns a new reconcile.Reconciler
|
||||
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
|
||||
return &ReconcileApplication{Client: mgr.GetClient(), scheme: mgr.GetScheme(),
|
||||
recorder: mgr.GetEventRecorderFor("application-controller")}
|
||||
func (r *ApplicationReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
|
||||
var app appv1beta1.Application
|
||||
err := r.Get(context.Background(), req.NamespacedName, &app)
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
// If label selector were given, only reconcile matched applications
|
||||
// match annotations and labels
|
||||
if !r.ApplicationSelector.Empty() {
|
||||
if !r.ApplicationSelector.Matches(labels.Set(app.Labels)) &&
|
||||
!r.ApplicationSelector.Matches(labels.Set(app.Annotations)) {
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
}
|
||||
|
||||
// Application is in the process of being deleted, so no need to do anything.
|
||||
if app.DeletionTimestamp != nil {
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
resources, errs := r.updateComponents(context.Background(), &app)
|
||||
newApplicationStatus := r.getNewApplicationStatus(context.Background(), &app, resources, &errs)
|
||||
|
||||
newApplicationStatus.ObservedGeneration = app.Generation
|
||||
if equality.Semantic.DeepEqual(newApplicationStatus, &app.Status) {
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
err = r.updateApplicationStatus(context.Background(), req.NamespacedName, newApplicationStatus)
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
// add adds a new Controller to mgr with r as the reconcile.Reconciler
|
||||
func add(mgr manager.Manager, r reconcile.Reconciler) error {
|
||||
// Create a new controller
|
||||
c, err := controller.New("application-controller", mgr, controller.Options{Reconciler: r})
|
||||
func (r *ApplicationReconciler) updateComponents(ctx context.Context, app *appv1beta1.Application) ([]*unstructured.Unstructured, []error) {
|
||||
var errs []error
|
||||
resources := r.fetchComponentListResources(ctx, app.Spec.ComponentGroupKinds, app.Spec.Selector, app.Namespace, &errs)
|
||||
|
||||
if app.Spec.AddOwnerRef {
|
||||
ownerRef := metav1.NewControllerRef(app, appv1beta1.GroupVersion.WithKind("Application"))
|
||||
*ownerRef.Controller = false
|
||||
if err := r.setOwnerRefForResources(ctx, *ownerRef, resources); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
return resources, errs
|
||||
}
|
||||
|
||||
func (r *ApplicationReconciler) getNewApplicationStatus(ctx context.Context, app *appv1beta1.Application, resources []*unstructured.Unstructured, errList *[]error) *appv1beta1.ApplicationStatus {
|
||||
objectStatuses := r.objectStatuses(ctx, resources, errList)
|
||||
errs := utilerrors.NewAggregate(*errList)
|
||||
|
||||
aggReady, countReady := aggregateReady(objectStatuses)
|
||||
|
||||
newApplicationStatus := app.Status.DeepCopy()
|
||||
newApplicationStatus.ComponentList = appv1beta1.ComponentList{
|
||||
Objects: objectStatuses,
|
||||
}
|
||||
newApplicationStatus.ComponentsReady = fmt.Sprintf("%d/%d", countReady, len(objectStatuses))
|
||||
if errs != nil {
|
||||
setReadyUnknownCondition(newApplicationStatus, "ComponentsReadyUnknown", "failed to aggregate all components' statuses, check the Error condition for details")
|
||||
} else if aggReady {
|
||||
setReadyCondition(newApplicationStatus, "ComponentsReady", "all components ready")
|
||||
} else {
|
||||
setNotReadyCondition(newApplicationStatus, "ComponentsNotReady", fmt.Sprintf("%d components not ready", len(objectStatuses)-countReady))
|
||||
}
|
||||
|
||||
if errs != nil {
|
||||
setErrorCondition(newApplicationStatus, "ErrorSeen", errs.Error())
|
||||
} else {
|
||||
clearErrorCondition(newApplicationStatus)
|
||||
}
|
||||
|
||||
return newApplicationStatus
|
||||
}
|
||||
|
||||
func (r *ApplicationReconciler) fetchComponentListResources(ctx context.Context, groupKinds []metav1.GroupKind, selector *metav1.LabelSelector, namespace string, errs *[]error) []*unstructured.Unstructured {
|
||||
var resources []*unstructured.Unstructured
|
||||
|
||||
if selector == nil {
|
||||
klog.V(2).Info("No selector is specified")
|
||||
return resources
|
||||
}
|
||||
|
||||
for _, gk := range groupKinds {
|
||||
mapping, err := r.Mapper.RESTMapping(schema.GroupKind{
|
||||
Group: appv1beta1.StripVersion(gk.Group),
|
||||
Kind: gk.Kind,
|
||||
})
|
||||
if err != nil {
|
||||
klog.V(2).Info("NoMappingForGK", "gk", gk.String())
|
||||
continue
|
||||
}
|
||||
|
||||
list := &unstructured.UnstructuredList{}
|
||||
list.SetGroupVersionKind(mapping.GroupVersionKind)
|
||||
if err = r.Client.List(ctx, list, client.InNamespace(namespace), client.MatchingLabels(selector.MatchLabels)); err != nil {
|
||||
klog.Error(err, "unable to list resources for GVK", "gvk", mapping.GroupVersionKind)
|
||||
*errs = append(*errs, err)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, u := range list.Items {
|
||||
resource := u
|
||||
resources = append(resources, &resource)
|
||||
}
|
||||
}
|
||||
return resources
|
||||
}
|
||||
|
||||
func (r *ApplicationReconciler) setOwnerRefForResources(ctx context.Context, ownerRef metav1.OwnerReference, resources []*unstructured.Unstructured) error {
|
||||
for _, resource := range resources {
|
||||
ownerRefs := resource.GetOwnerReferences()
|
||||
ownerRefFound := false
|
||||
for i, refs := range ownerRefs {
|
||||
if ownerRef.Kind == refs.Kind &&
|
||||
ownerRef.APIVersion == refs.APIVersion &&
|
||||
ownerRef.Name == refs.Name {
|
||||
ownerRefFound = true
|
||||
if ownerRef.UID != refs.UID {
|
||||
ownerRefs[i] = ownerRef
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !ownerRefFound {
|
||||
ownerRefs = append(ownerRefs, ownerRef)
|
||||
}
|
||||
resource.SetOwnerReferences(ownerRefs)
|
||||
err := r.Client.Update(ctx, resource)
|
||||
if err != nil {
|
||||
// We log this error, but we continue and try to set the ownerRefs on the other resources.
|
||||
klog.Error(err, "ErrorSettingOwnerRef", "gvk", resource.GroupVersionKind().String(),
|
||||
"namespace", resource.GetNamespace(), "name", resource.GetName())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ApplicationReconciler) objectStatuses(ctx context.Context, resources []*unstructured.Unstructured, errs *[]error) []appv1beta1.ObjectStatus {
|
||||
var objectStatuses []appv1beta1.ObjectStatus
|
||||
for _, resource := range resources {
|
||||
os := appv1beta1.ObjectStatus{
|
||||
Group: resource.GroupVersionKind().Group,
|
||||
Kind: resource.GetKind(),
|
||||
Name: resource.GetName(),
|
||||
Link: resource.GetSelfLink(),
|
||||
}
|
||||
s, err := status(resource)
|
||||
if err != nil {
|
||||
klog.Error(err, "unable to compute status for resource", "gvk", resource.GroupVersionKind().String(),
|
||||
"namespace", resource.GetNamespace(), "name", resource.GetName())
|
||||
*errs = append(*errs, err)
|
||||
}
|
||||
os.Status = s
|
||||
objectStatuses = append(objectStatuses, os)
|
||||
}
|
||||
return objectStatuses
|
||||
}
|
||||
|
||||
func aggregateReady(objectStatuses []appv1beta1.ObjectStatus) (bool, int) {
|
||||
countReady := 0
|
||||
for _, os := range objectStatuses {
|
||||
if os.Status == StatusReady {
|
||||
countReady++
|
||||
}
|
||||
}
|
||||
if countReady == len(objectStatuses) {
|
||||
return true, countReady
|
||||
}
|
||||
return false, countReady
|
||||
}
|
||||
|
||||
func (r *ApplicationReconciler) updateApplicationStatus(ctx context.Context, nn types.NamespacedName, status *appv1beta1.ApplicationStatus) error {
|
||||
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
original := &appv1beta1.Application{}
|
||||
if err := r.Get(ctx, nn, original); err != nil {
|
||||
return err
|
||||
}
|
||||
original.Status = *status
|
||||
if err := r.Client.Status().Update(ctx, original); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return fmt.Errorf("failed to update status of Application %s/%s: %v", nn.Namespace, nn.Name, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ApplicationReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
c, err := ctrl.NewControllerManagedBy(mgr).
|
||||
Named("application-controller").
|
||||
For(&appv1beta1.Application{}).Build(r)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -98,46 +292,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ reconcile.Reconciler = &ReconcileApplication{}
|
||||
|
||||
// ReconcileApplication reconciles a Workspace object
|
||||
type ReconcileApplication struct {
|
||||
client.Client
|
||||
scheme *runtime.Scheme
|
||||
recorder record.EventRecorder
|
||||
}
|
||||
|
||||
// +kubebuilder:rbac:groups=app.k8s.io,resources=applications,verbs=get;list;watch;create;update;patch;delete
|
||||
func (r *ReconcileApplication) Reconcile(request reconcile.Request) (reconcile.Result, error) {
|
||||
// Fetch the Application instance
|
||||
ctx := context.Background()
|
||||
app := &v1beta1.Application{}
|
||||
err := r.Get(ctx, request.NamespacedName, app)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
klog.Errorf("application %s not found in namespace %s", request.Name, request.Namespace)
|
||||
return reconcile.Result{}, nil
|
||||
}
|
||||
return reconcile.Result{}, err
|
||||
}
|
||||
|
||||
// add specified annotation for app when triggered by sub-resources,
|
||||
// so the application in sigs.k8s.io can reconcile to update status
|
||||
annotations := app.GetObjectMeta().GetAnnotations()
|
||||
if annotations == nil {
|
||||
annotations = make(map[string]string)
|
||||
}
|
||||
annotations["kubesphere.io/last-updated"] = time.Now().String()
|
||||
app.SetAnnotations(annotations)
|
||||
err = r.Update(ctx, app)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
klog.V(4).Infof("application %s has been deleted during update in namespace %s", request.Name, request.Namespace)
|
||||
return reconcile.Result{}, nil
|
||||
}
|
||||
}
|
||||
return reconcile.Result{}, nil
|
||||
}
|
||||
var _ reconcile.Reconciler = &ApplicationReconciler{}
|
||||
|
||||
func isApp(obs ...metav1.Object) bool {
|
||||
for _, o := range obs {
|
||||
|
||||
Reference in New Issue
Block a user