fix application reconcile

Signed-off-by: zackzhang <zackzhang@yunify.com>
This commit is contained in:
zackzhang
2021-01-15 10:28:50 +08:00
parent 1f4d5cb686
commit 2b0b36672b
10 changed files with 151 additions and 203 deletions

View File

@@ -24,10 +24,11 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
servicemeshv1alpha2 "kubesphere.io/kubesphere/pkg/apis/servicemesh/v1alpha2"
"kubesphere.io/kubesphere/pkg/controller/virtualservice/util"
"kubesphere.io/kubesphere/pkg/controller/utils/servicemesh"
"sigs.k8s.io/application/api/v1beta1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
@@ -40,7 +41,7 @@ import (
"time"
)
// Add creates a new Workspace Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
// Add creates a new Application Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func Add(mgr manager.Manager) error {
return add(mgr, newReconciler(mgr))
@@ -71,11 +72,17 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
for _, s := range sources {
// Watch for changes to Application
err = c.Watch(&source.Kind{Type: s},
&handler.EnqueueRequestForOwner{OwnerType: &v1beta1.Application{}, IsController: false},
err = c.Watch(
&source.Kind{Type: s},
&handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(
func(h handler.MapObject) []reconcile.Request {
return []reconcile.Request{{NamespacedName: types.NamespacedName{
Name: servicemesh.GetApplictionName(h.Meta.GetLabels()),
Namespace: h.Meta.GetNamespace()}}}
})},
predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
return isApp(e.MetaOld)
return isApp(e.MetaOld, e.MetaNew)
},
CreateFunc: func(e event.CreateEvent) bool {
return isApp(e.Meta)
@@ -84,12 +91,10 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return isApp(e.Meta)
},
})
if err != nil {
return err
}
}
return nil
}
@@ -110,6 +115,7 @@ func (r *ReconcileApplication) Reconcile(request reconcile.Request) (reconcile.R
err := r.Get(ctx, request.NamespacedName, app)
if err != nil {
if errors.IsNotFound(err) {
klog.Errorf("application %s not found in namespace %s", request.Name, request.Namespace)
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
@@ -126,16 +132,18 @@ func (r *ReconcileApplication) Reconcile(request reconcile.Request) (reconcile.R
err = r.Update(ctx, app)
if err != nil {
if errors.IsNotFound(err) {
klog.V(4).Info("application has been deleted during update")
klog.V(4).Infof("application %s has been deleted during update in namespace %s", request.Name, request.Namespace)
return reconcile.Result{}, nil
}
}
return reconcile.Result{}, nil
}
func isApp(o metav1.Object) bool {
if o.GetLabels() == nil || !util.IsApplicationComponent(o.GetLabels()) {
return false
func isApp(obs ...metav1.Object) bool {
for _, o := range obs {
if o.GetLabels() != nil && servicemesh.IsAppComponent(o.GetLabels()) {
return true
}
}
return true
return false
}

View File

@@ -19,6 +19,7 @@ package destinationrule
import (
"context"
"fmt"
"kubesphere.io/kubesphere/pkg/controller/utils/servicemesh"
"reflect"
apinetworkingv1alpha3 "istio.io/api/networking/v1alpha3"
@@ -36,8 +37,6 @@ import (
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
log "k8s.io/klog"
servicemeshv1alpha2 "kubesphere.io/kubesphere/pkg/apis/servicemesh/v1alpha2"
"kubesphere.io/kubesphere/pkg/controller/virtualservice/util"
"time"
istioclientset "istio.io/client-go/pkg/clientset/versioned"
@@ -243,9 +242,9 @@ func (v *DestinationRuleController) syncService(key string) error {
return nil
}
if len(service.Labels) < len(util.ApplicationLabels) ||
!util.IsApplicationComponent(service.Labels) ||
!util.IsServicemeshEnabled(service.Annotations) ||
if len(service.Labels) < len(servicemesh.ApplicationLabels) ||
!servicemesh.IsApplicationComponent(service.Labels) ||
!servicemesh.IsServicemeshEnabled(service.Annotations) ||
len(service.Spec.Ports) == 0 {
// services don't have enough labels to create a destinationrule
// or they don't have necessary labels
@@ -254,7 +253,7 @@ func (v *DestinationRuleController) syncService(key string) error {
return nil
}
appName := util.GetComponentName(&service.ObjectMeta)
appName := servicemesh.GetComponentName(&service.ObjectMeta)
// fetch all deployments that match with service selector
deployments, err := v.deploymentLister.Deployments(namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
@@ -266,14 +265,14 @@ func (v *DestinationRuleController) syncService(key string) error {
for _, deployment := range deployments {
// not a valid deployment we required
if !util.IsApplicationComponent(deployment.Labels) ||
!util.IsApplicationComponent(deployment.Spec.Selector.MatchLabels) ||
if !servicemesh.IsApplicationComponent(deployment.Labels) ||
!servicemesh.IsApplicationComponent(deployment.Spec.Selector.MatchLabels) ||
deployment.Status.ReadyReplicas == 0 ||
!util.IsServicemeshEnabled(deployment.Annotations) {
!servicemesh.IsServicemeshEnabled(deployment.Annotations) {
continue
}
version := util.GetComponentVersion(&deployment.ObjectMeta)
version := servicemesh.GetComponentVersion(&deployment.ObjectMeta)
if len(version) == 0 {
log.V(4).Infof("Deployment %s doesn't have a version label", types.NamespacedName{Namespace: deployment.Namespace, Name: deployment.Name}.String())
@@ -281,9 +280,9 @@ func (v *DestinationRuleController) syncService(key string) error {
}
subset := &apinetworkingv1alpha3.Subset{
Name: util.NormalizeVersionName(version),
Name: servicemesh.NormalizeVersionName(version),
Labels: map[string]string{
util.VersionLabel: version,
servicemesh.VersionLabel: version,
},
}
@@ -309,7 +308,7 @@ func (v *DestinationRuleController) syncService(key string) error {
}
// fetch all servicepolicies associated to this service
servicePolicies, err := v.servicePolicyLister.ServicePolicies(namespace).List(labels.SelectorFromSet(map[string]string{util.AppLabel: appName}))
servicePolicies, err := v.servicePolicyLister.ServicePolicies(namespace).List(labels.SelectorFromSet(map[string]string{servicemesh.AppLabel: appName}))
if err != nil {
log.Error(err, "could not list service policies is namespace with component name", "namespace", namespace, "name", appName)
return err
@@ -388,7 +387,7 @@ func (v *DestinationRuleController) addDeployment(obj interface{}) {
deploy := obj.(*appsv1.Deployment)
// not a application component
if !util.IsApplicationComponent(deploy.Labels) || !util.IsApplicationComponent(deploy.Spec.Selector.MatchLabels) {
if !servicemesh.IsApplicationComponent(deploy.Labels) || !servicemesh.IsApplicationComponent(deploy.Spec.Selector.MatchLabels) {
return
}
@@ -437,8 +436,8 @@ func (v *DestinationRuleController) getDeploymentServiceMemberShip(deployment *a
for i := range allServices {
service := allServices[i]
if service.Spec.Selector == nil ||
!util.IsApplicationComponent(service.Labels) ||
!util.IsServicemeshEnabled(service.Annotations) {
!servicemesh.IsApplicationComponent(service.Labels) ||
!servicemesh.IsServicemeshEnabled(service.Annotations) {
// services with nil selectors match nothing, not everything.
continue
}
@@ -458,9 +457,9 @@ func (v *DestinationRuleController) getDeploymentServiceMemberShip(deployment *a
func (v *DestinationRuleController) addServicePolicy(obj interface{}) {
servicePolicy := obj.(*servicemeshv1alpha2.ServicePolicy)
appName := servicePolicy.Labels[util.AppLabel]
appName := servicePolicy.Labels[servicemesh.AppLabel]
services, err := v.serviceLister.Services(servicePolicy.Namespace).List(labels.SelectorFromSet(map[string]string{util.AppLabel: appName}))
services, err := v.serviceLister.Services(servicePolicy.Namespace).List(labels.SelectorFromSet(map[string]string{servicemesh.AppLabel: appName}))
if err != nil {
log.Error(err, "cannot list services", "namespace", servicePolicy.Namespace, "name", appName)
utilruntime.HandleError(fmt.Errorf("cannot list services in namespace %s, with component name %v", servicePolicy.Namespace, appName))

View File

@@ -34,7 +34,7 @@ import (
"kubesphere.io/kubesphere/pkg/apis/servicemesh/v1alpha2"
"kubesphere.io/kubesphere/pkg/client/clientset/versioned/fake"
informers "kubesphere.io/kubesphere/pkg/client/informers/externalversions"
"kubesphere.io/kubesphere/pkg/controller/virtualservice/util"
"kubesphere.io/kubesphere/pkg/controller/utils/servicemesh"
"kubesphere.io/kubesphere/pkg/utils/reflectutils"
"testing"
)
@@ -118,9 +118,9 @@ func newDestinationRule(service *corev1.Service, deployments ...*appsv1.Deployme
dr.Spec.Subsets = []*apiv1alpha3.Subset{}
for _, deployment := range deployments {
subset := &apiv1alpha3.Subset{
Name: util.GetComponentVersion(&deployment.ObjectMeta),
Name: servicemesh.GetComponentVersion(&deployment.ObjectMeta),
Labels: map[string]string{
"version": util.GetComponentVersion(&deployment.ObjectMeta),
"version": servicemesh.GetComponentVersion(&deployment.ObjectMeta),
},
}
dr.Spec.Subsets = append(dr.Spec.Subsets, subset)
@@ -195,9 +195,9 @@ func newServicePolicy(name string, service *corev1.Service, deployments ...*apps
sp.Spec.Template.Spec.Subsets = []*apiv1alpha3.Subset{}
for _, deployment := range deployments {
subset := &apiv1alpha3.Subset{
Name: util.GetComponentVersion(&deployment.ObjectMeta),
Name: servicemesh.GetComponentVersion(&deployment.ObjectMeta),
Labels: map[string]string{
"version": util.GetComponentVersion(&deployment.ObjectMeta),
"version": servicemesh.GetComponentVersion(&deployment.ObjectMeta),
},
}
sp.Spec.Template.Spec.Subsets = append(sp.Spec.Template.Spec.Subsets, subset)

View File

@@ -1,26 +1,10 @@
/*
Copyright 2020 KubeSphere Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
package servicemesh
import (
"istio.io/api/networking/v1alpha3"
clientgonetworkingv1alpha3 "istio.io/client-go/pkg/apis/networking/v1alpha3"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apiv1alpha3 "istio.io/api/networking/v1alpha3"
"istio.io/client-go/pkg/apis/networking/v1alpha3"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"strings"
)
@@ -39,6 +23,12 @@ var ApplicationLabels = [...]string{
AppLabel,
}
// resource with these following labels considered as part of kubernetes-sigs/application
var AppLabels = [...]string{
ApplicationNameLabel,
ApplicationVersionLabel,
}
var TrimChars = [...]string{".", "_", "-"}
// normalize version names
@@ -58,7 +48,7 @@ func GetApplictionName(lbs map[string]string) string {
}
func GetComponentName(meta *metav1.ObjectMeta) string {
func GetComponentName(meta *v1.ObjectMeta) string {
if len(meta.Labels[AppLabel]) > 0 {
return meta.Labels[AppLabel]
}
@@ -74,14 +64,14 @@ func IsServicemeshEnabled(annotations map[string]string) bool {
return false
}
func GetComponentVersion(meta *metav1.ObjectMeta) string {
func GetComponentVersion(meta *v1.ObjectMeta) string {
if len(meta.Labels[VersionLabel]) > 0 {
return meta.Labels[VersionLabel]
}
return ""
}
func ExtractApplicationLabels(meta *metav1.ObjectMeta) map[string]string {
func ExtractApplicationLabels(meta *v1.ObjectMeta) map[string]string {
labels := make(map[string]string, len(ApplicationLabels))
for _, label := range ApplicationLabels {
@@ -106,21 +96,33 @@ func IsApplicationComponent(lbs map[string]string) bool {
return true
}
// Whether it belongs to kubernetes-sigs/application or not
func IsAppComponent(lbs map[string]string) bool {
for _, label := range AppLabels {
if _, ok := lbs[label]; !ok {
return false
}
}
return true
}
// if virtualservice not specified with port number, then fill with service first port
func FillDestinationPort(vs *clientgonetworkingv1alpha3.VirtualService, service *v1.Service) {
func FillDestinationPort(vs *v1alpha3.VirtualService, service *corev1.Service) {
// fill http port
for i := range vs.Spec.Http {
for j := range vs.Spec.Http[i].Route {
port := vs.Spec.Http[i].Route[j].Destination.Port
if port == nil || port.Number == 0 {
vs.Spec.Http[i].Route[j].Destination.Port = &v1alpha3.PortSelector{
vs.Spec.Http[i].Route[j].Destination.Port = &apiv1alpha3.PortSelector{
Number: uint32(service.Spec.Ports[0].Port),
}
}
}
if vs.Spec.Http[i].Mirror != nil && (vs.Spec.Http[i].Mirror.Port == nil || vs.Spec.Http[i].Mirror.Port.Number == 0) {
vs.Spec.Http[i].Mirror.Port = &v1alpha3.PortSelector{
vs.Spec.Http[i].Mirror.Port = &apiv1alpha3.PortSelector{
Number: uint32(service.Spec.Ports[0].Port),
}
}
@@ -130,7 +132,7 @@ func FillDestinationPort(vs *clientgonetworkingv1alpha3.VirtualService, service
for i := range vs.Spec.Tcp {
for j := range vs.Spec.Tcp[i].Route {
if vs.Spec.Tcp[i].Route[j].Destination.Port == nil || vs.Spec.Tcp[i].Route[j].Destination.Port.Number == 0 {
vs.Spec.Tcp[i].Route[j].Destination.Port = &v1alpha3.PortSelector{
vs.Spec.Tcp[i].Route[j].Destination.Port = &apiv1alpha3.PortSelector{
Number: uint32(service.Spec.Ports[0].Port),
}
}

View File

@@ -19,11 +19,15 @@ package virtualservice
import (
"context"
"fmt"
"kubesphere.io/kubesphere/pkg/controller/utils/servicemesh"
"reflect"
"strings"
apinetworkingv1alpha3 "istio.io/api/networking/v1alpha3"
clientgonetworkingv1alpha3 "istio.io/client-go/pkg/apis/networking/v1alpha3"
istioclient "istio.io/client-go/pkg/clientset/versioned"
istioinformers "istio.io/client-go/pkg/informers/externalversions/networking/v1alpha3"
istiolisters "istio.io/client-go/pkg/listers/networking/v1alpha3"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -32,20 +36,15 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
log "k8s.io/klog"
"kubesphere.io/kubesphere/pkg/controller/virtualservice/util"
istioclient "istio.io/client-go/pkg/clientset/versioned"
istioinformers "istio.io/client-go/pkg/informers/externalversions/networking/v1alpha3"
istiolisters "istio.io/client-go/pkg/listers/networking/v1alpha3"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
log "k8s.io/klog"
servicemeshv1alpha2 "kubesphere.io/kubesphere/pkg/apis/servicemesh/v1alpha2"
servicemeshclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
servicemeshinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/servicemesh/v1alpha2"
@@ -256,9 +255,9 @@ func (v *VirtualServiceController) syncService(key string) error {
return err
}
if len(service.Labels) < len(util.ApplicationLabels) ||
!util.IsApplicationComponent(service.Labels) ||
!util.IsServicemeshEnabled(service.Annotations) ||
if len(service.Labels) < len(servicemesh.ApplicationLabels) ||
!servicemesh.IsApplicationComponent(service.Labels) ||
!servicemesh.IsServicemeshEnabled(service.Annotations) ||
len(service.Spec.Ports) == 0 {
// services don't have enough labels to create a virtualservice
// or they don't have necessary labels
@@ -266,7 +265,7 @@ func (v *VirtualServiceController) syncService(key string) error {
return nil
}
// get real component name, i.e label app value
appName = util.GetComponentName(&service.ObjectMeta)
appName = servicemesh.GetComponentName(&service.ObjectMeta)
destinationRule, err := v.destinationRuleLister.DestinationRules(namespace).Get(name)
if err != nil {
@@ -287,7 +286,7 @@ func (v *VirtualServiceController) syncService(key string) error {
}
// fetch all strategies applied to service
strategies, err := v.strategyLister.Strategies(namespace).List(labels.SelectorFromSet(map[string]string{util.AppLabel: appName}))
strategies, err := v.strategyLister.Strategies(namespace).List(labels.SelectorFromSet(map[string]string{servicemesh.AppLabel: appName}))
if err != nil {
log.Error(err, "list strategies for service failed", "namespace", namespace, "name", appName)
return err
@@ -306,7 +305,7 @@ func (v *VirtualServiceController) syncService(key string) error {
ObjectMeta: metav1.ObjectMeta{
Name: appName,
Namespace: namespace,
Labels: util.ExtractApplicationLabels(&service.ObjectMeta),
Labels: servicemesh.ExtractApplicationLabels(&service.ObjectMeta),
},
}
} else {
@@ -457,7 +456,7 @@ func (v *VirtualServiceController) addDestinationRule(obj interface{}) {
func (v *VirtualServiceController) addStrategy(obj interface{}) {
strategy := obj.(*servicemeshv1alpha2.Strategy)
lbs := util.ExtractApplicationLabels(&strategy.ObjectMeta)
lbs := servicemesh.ExtractApplicationLabels(&strategy.ObjectMeta)
if len(lbs) == 0 {
err := fmt.Errorf("invalid strategy %s/%s labels %s, not have required labels", strategy.Namespace, strategy.Name, strategy.Labels)
log.Error(err, "")
@@ -582,6 +581,6 @@ func (v *VirtualServiceController) generateVirtualServiceSpec(strategy *servicem
}
util.FillDestinationPort(vs, service)
servicemesh.FillDestinationPort(vs, service)
return vs
}

View File

@@ -34,7 +34,7 @@ import (
"kubesphere.io/kubesphere/pkg/apis/servicemesh/v1alpha2"
"kubesphere.io/kubesphere/pkg/client/clientset/versioned/fake"
informers "kubesphere.io/kubesphere/pkg/client/informers/externalversions"
"kubesphere.io/kubesphere/pkg/controller/virtualservice/util"
"kubesphere.io/kubesphere/pkg/controller/utils/servicemesh"
"kubesphere.io/kubesphere/pkg/utils/reflectutils"
"testing"
)
@@ -89,7 +89,7 @@ func (l Labels) WithApplication(name string) Labels {
func (l Labels) WithServiceMeshEnabled(enabled bool) Labels {
if enabled {
l[util.ServiceMeshEnabledAnnotation] = "true"
l[servicemesh.ServiceMeshEnabledAnnotation] = "true"
}
return l