service policy

refactor virtualservice controller
This commit is contained in:
Jeff
2019-03-28 14:02:51 +08:00
committed by zryfish
parent 43217d16a3
commit 2e1dc6a7b5
28 changed files with 2095 additions and 565 deletions

View File

@@ -17,13 +17,12 @@ limitations under the License.
package controller
import (
"kubesphere.io/kubesphere/pkg/controller/strategy"
"sigs.k8s.io/application/pkg/controller/application"
)
func init() {
// AddToManagerFuncs is a list of functions to create controllers and add them to a manager.
AddToManagerFuncs = append(AddToManagerFuncs, strategy.Add)
//AddToManagerFuncs = append(AddToManagerFuncs, strategy.Add)
// Add application to manager functions
AddToManagerFuncs = append(AddToManagerFuncs, application.Add)

View File

@@ -16,6 +16,7 @@ import (
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/metrics"
servicemeshv1alpha2 "kubesphere.io/kubesphere/pkg/apis/servicemesh/v1alpha2"
"kubesphere.io/kubesphere/pkg/controller/virtualservice/util"
"reflect"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
@@ -32,6 +33,9 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"time"
servicemeshinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/servicemesh/v1alpha2"
servicemeshlisters "kubesphere.io/kubesphere/pkg/client/listers/servicemesh/v1alpha2"
)
const (
@@ -59,6 +63,9 @@ type DestinationRuleController struct {
deploymentLister listersv1.DeploymentLister
deploymentSynced cache.InformerSynced
servicePolicyLister servicemeshlisters.ServicePolicyLister
servicePolicySynced cache.InformerSynced
destinationRuleLister istiolisters.DestinationRuleLister
destinationRuleSynced cache.InformerSynced
@@ -70,6 +77,7 @@ type DestinationRuleController struct {
func NewDestinationRuleController(deploymentInformer informersv1.DeploymentInformer,
destinationRuleInformer istioinformers.DestinationRuleInformer,
serviceInformer coreinformers.ServiceInformer,
servicePolicyInformer servicemeshinformers.ServicePolicyInformer,
client clientset.Interface,
destinationRuleClient istioclientset.Interface) *DestinationRuleController {
@@ -116,6 +124,17 @@ func NewDestinationRuleController(deploymentInformer informersv1.DeploymentInfor
v.destinationRuleLister = destinationRuleInformer.Lister()
v.destinationRuleSynced = destinationRuleInformer.Informer().HasSynced
v.servicePolicyLister = servicePolicyInformer.Lister()
v.servicePolicySynced = servicePolicyInformer.Informer().HasSynced
servicePolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: v.addServicePolicy,
UpdateFunc: func(old, cur interface{}) {
v.addServicePolicy(cur)
},
DeleteFunc: v.addServicePolicy,
})
v.eventBroadcaster = broadcaster
v.eventRecorder = recorder
@@ -136,7 +155,7 @@ func (v *DestinationRuleController) Run(workers int, stopCh <-chan struct{}) {
log.Info("starting destinationrule controller")
defer log.Info("shutting down destinationrule controller")
if !controller.WaitForCacheSync("destinationrule-controller", stopCh, v.serviceSynced, v.destinationRuleSynced, v.deploymentSynced) {
if !controller.WaitForCacheSync("destinationrule-controller", stopCh, v.serviceSynced, v.destinationRuleSynced, v.deploymentSynced, v.servicePolicySynced) {
return
}
@@ -177,6 +196,8 @@ func (v *DestinationRuleController) processNextWorkItem() bool {
return true
}
// main function of the reconcile for destinationrule
// destinationrule's name is same with the service that created it
func (v *DestinationRuleController) syncService(key string) error {
startTime := time.Now()
defer func() {
@@ -192,14 +213,14 @@ func (v *DestinationRuleController) syncService(key string) error {
if err != nil {
// Delete the corresponding destinationrule, as the service has been deleted.
err = v.destinationRuleClient.NetworkingV1alpha3().DestinationRules(namespace).Delete(name, nil)
if err != nil && !errors.IsNotFound(err) {
if !errors.IsNotFound(err) {
log.Error(err, "delete destination rule failed", "namespace", namespace, "name", name)
return err
}
return nil
}
if len(service.Labels) < len(util.ApplicationLabels) || !util.IsApplicationComponent(&service.ObjectMeta) ||
if len(service.Labels) < len(util.ApplicationLabels) || !util.IsApplicationComponent(service.Labels) ||
len(service.Spec.Ports) == 0 {
// services don't have enough labels to create a virtualservice
// or they don't have necessary labels
@@ -207,14 +228,22 @@ func (v *DestinationRuleController) syncService(key string) error {
return nil
}
appName := util.GetComponentName(&service.ObjectMeta)
// fetch all deployments that match with service selector
deployments, err := v.deploymentLister.Deployments(namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
if err != nil {
return err
}
subsets := []v1alpha3.Subset{}
subsets := make([]v1alpha3.Subset, 0)
for _, deployment := range deployments {
// not a valid deployment we required
if !util.IsApplicationComponent(deployment.Labels) || !util.IsApplicationComponent(deployment.Spec.Selector.MatchLabels) {
continue
}
version := util.GetComponentVersion(&deployment.ObjectMeta)
if len(version) == 0 {
@@ -248,19 +277,49 @@ func (v *DestinationRuleController) syncService(key string) error {
log.Error(err, "Couldn't get destinationrule for service", "key", key)
return err
}
}
// fetch all servicepolicies associated to this service
servicePolicies, err := v.servicePolicyLister.ServicePolicies(namespace).List(labels.SelectorFromSet(map[string]string{util.AppLabel: appName}))
if err != nil {
log.Error(err, "could not list service policies is namespace with component name", "namespace", namespace, "name", appName)
return err
}
dr := currentDestinationRule.DeepCopy()
dr.Spec.Subsets = subsets
//
if len(servicePolicies) > 0 {
if len(servicePolicies) > 1 {
err = fmt.Errorf("more than one service policy associated with service %s/%s is forbidden", namespace, name)
log.Error(err, "")
return err
}
sp := servicePolicies[0]
if sp.Spec.Template.Spec.TrafficPolicy != nil {
dr.Spec.TrafficPolicy = sp.Spec.Template.Spec.TrafficPolicy
}
for _, subset := range sp.Spec.Template.Spec.Subsets {
for i := range dr.Spec.Subsets {
if subset.Name == dr.Spec.Subsets[i].Name && subset.TrafficPolicy != nil {
dr.Spec.Subsets[i].TrafficPolicy = subset.TrafficPolicy
}
}
}
}
createDestinationRule := len(currentDestinationRule.ResourceVersion) == 0
if !createDestinationRule && reflect.DeepEqual(currentDestinationRule.Spec.Subsets, subsets) &&
if !createDestinationRule && reflect.DeepEqual(currentDestinationRule.Spec, dr.Spec) &&
reflect.DeepEqual(currentDestinationRule.Labels, service.Labels) {
log.V(5).Info("destinationrule are equal, skipping update", "key", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}.String())
return nil
}
newDestinationRule := currentDestinationRule.DeepCopy()
newDestinationRule.Spec.Subsets = subsets
newDestinationRule.Spec = dr.Spec
newDestinationRule.Labels = service.Labels
if newDestinationRule.Annotations == nil {
newDestinationRule.Annotations = make(map[string]string)
@@ -293,20 +352,13 @@ func (v *DestinationRuleController) syncService(key string) error {
return nil
}
func (v *DestinationRuleController) isApplicationComponent(meta *metav1.ObjectMeta) bool {
if len(meta.Labels) >= len(util.ApplicationLabels) && util.IsApplicationComponent(meta) {
return true
}
return false
}
// When a destinationrule is added, figure out which service it will be used
// and enqueue it. obj must have *appsv1.Deployment type
func (v *DestinationRuleController) addDeployment(obj interface{}) {
deploy := obj.(*appsv1.Deployment)
// not a application component
if !v.isApplicationComponent(&deploy.ObjectMeta) {
if !util.IsApplicationComponent(deploy.Labels) || !util.IsApplicationComponent(deploy.Spec.Selector.MatchLabels) {
return
}
@@ -354,7 +406,7 @@ func (v *DestinationRuleController) getDeploymentServiceMemberShip(deployment *a
for i := range allServices {
service := allServices[i]
if service.Spec.Selector == nil || !v.isApplicationComponent(&service.ObjectMeta) {
if service.Spec.Selector == nil || !util.IsApplicationComponent(service.Labels) {
// services with nil selectors match nothing, not everything.
continue
}
@@ -371,6 +423,34 @@ func (v *DestinationRuleController) getDeploymentServiceMemberShip(deployment *a
return set, nil
}
func (v *DestinationRuleController) addServicePolicy(obj interface{}) {
servicePolicy := obj.(*servicemeshv1alpha2.ServicePolicy)
appName := servicePolicy.Labels[util.AppLabel]
services, err := v.serviceLister.Services(servicePolicy.Namespace).List(labels.SelectorFromSet(map[string]string{util.AppLabel: appName}))
if err != nil {
log.Error(err, "cannot list services", "namespace", servicePolicy.Namespace, "name", appName)
utilruntime.HandleError(fmt.Errorf("cannot list services in namespace %s, with component name %v", servicePolicy.Namespace, appName))
return
}
set := sets.String{}
for _, service := range services {
key, err := controller.KeyFunc(service)
if err != nil {
utilruntime.HandleError(err)
continue
}
set.Insert(key)
}
// avoid enqueue a key multiple times
for key := range set {
v.queue.Add(key)
}
}
func (v *DestinationRuleController) handleErr(err error, key interface{}) {
if err != nil {
v.queue.Forget(key)
@@ -383,7 +463,7 @@ func (v *DestinationRuleController) handleErr(err error, key interface{}) {
return
}
log.V(0).Info("Dropping service out of the queue", "key", key, "error", err)
log.V(4).Info("Dropping service out of the queue", "key", key, "error", err)
v.queue.Forget(key)
utilruntime.HandleError(err)
}

View File

@@ -1 +1,58 @@
package destinationrule
import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// TODO(jeff): add test cases
var namespace = "default"
var lbs = map[string]string{
"app.kubernetes.io/name": "bookinfo",
"servicemesh.kubesphere.io/enabled": "",
"app": "reviews",
}
var service = corev1.Service{}
var deployments = []appsv1.Deployment{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: "deploy-v1",
Labels: map[string]string{
"app.kubernetes.io/name": "bookinfo",
"servicemesh.kubesphere.io/enabled": "",
"app": "reviews",
"version": "v1",
},
},
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app.kubernetes.io/name": "bookinfo",
"servicemesh.kubesphere.io/enabled": "",
"app": "reviews",
"version": "v1",
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app.kubernetes.io/name": "bookinfo",
"servicemesh.kubesphere.io/enabled": "",
"app": "reviews",
"version": "v1",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{},
},
},
},
},
},
}

View File

@@ -1,51 +0,0 @@
package strategy
import (
"fmt"
"github.com/knative/pkg/apis/istio/v1alpha3"
"k8s.io/api/core/v1"
"kubesphere.io/kubesphere/pkg/apis/servicemesh/v1alpha2"
)
const (
AppLabel = "app"
)
func getAppNameByStrategy(strategy *v1alpha2.Strategy) string {
if len(strategy.Labels) > 0 && len(strategy.Labels[AppLabel]) > 0 {
return strategy.Labels[AppLabel]
}
return ""
}
// if virtualservice not specified with port number, then fill with service first port
func fillDestinationPort(vs *v1alpha3.VirtualService, service *v1.Service) error {
if len(service.Spec.Ports) == 0 {
return fmt.Errorf("service %s/%s spec doesn't canotain any ports", service.Namespace, service.Name)
}
// fill http port
for i := range vs.Spec.Http {
for j := range vs.Spec.Http[i].Route {
if vs.Spec.Http[i].Route[j].Destination.Port.Number == 0 {
vs.Spec.Http[i].Route[j].Destination.Port.Number = uint32(service.Spec.Ports[0].Port)
}
}
if vs.Spec.Http[i].Mirror != nil && vs.Spec.Http[i].Mirror.Port.Number == 0 {
vs.Spec.Http[i].Mirror.Port.Number = uint32(service.Spec.Ports[0].Port)
}
}
// fill tcp port
for i := range vs.Spec.Tcp {
for j := range vs.Spec.Tcp[i].Route {
if vs.Spec.Tcp[i].Route[j].Destination.Port.Number == 0 {
vs.Spec.Tcp[i].Route[j].Destination.Port.Number = uint32(service.Spec.Ports[0].Port)
}
}
}
return nil
}

View File

@@ -1,192 +0,0 @@
/*
Copyright 2019 The KubeSphere authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package strategy
import (
"context"
"fmt"
"github.com/knative/pkg/apis/istio/v1alpha3"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
servicemeshv1alpha2 "kubesphere.io/kubesphere/pkg/apis/servicemesh/v1alpha2"
"reflect"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
"sigs.k8s.io/controller-runtime/pkg/source"
)
var log = logf.Log.WithName("strategy-controller")
// Add creates a new Strategy Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func Add(mgr manager.Manager) error {
return add(mgr, newReconciler(mgr))
}
// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
return &ReconcileStrategy{Client: mgr.GetClient(), scheme: mgr.GetScheme()}
}
// add adds a new Controller to mgr with r as the reconcile.Reconciler
func add(mgr manager.Manager, r reconcile.Reconciler) error {
// Create a new controller
c, err := controller.New("strategy-controller", mgr, controller.Options{Reconciler: r})
if err != nil {
return err
}
// Watch for changes to Strategy
err = c.Watch(&source.Kind{Type: &servicemeshv1alpha2.Strategy{}}, &handler.EnqueueRequestForObject{})
if err != nil {
return err
}
// TODO(user): Modify this to be the types you create
// Watch a VirtualService created by Strategy
err = c.Watch(&source.Kind{Type: &v1alpha3.VirtualService{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &servicemeshv1alpha2.Strategy{},
})
if err != nil {
return err
}
return nil
}
var _ reconcile.Reconciler = &ReconcileStrategy{}
// ReconcileStrategy reconciles a Strategy object
type ReconcileStrategy struct {
client.Client
scheme *runtime.Scheme
}
// Reconcile reads that state of the cluster for a Strategy object and makes changes based on the state read
// and what is in the Strategy.Spec
// a Deployment as an example
// Automatically generate RBAC rules to allow the Controller to read and write Deployments
// +kubebuilder:rbac:groups=networking.istio.io,resources=virtualservices,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=networking.istio.io,resources=virtualservices/status,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=servicemesh.kubesphere.io,resources=strategies,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=servicemesh.kubesphere.io,resources=strategies/status,verbs=get;update;patch
func (r *ReconcileStrategy) Reconcile(request reconcile.Request) (reconcile.Result, error) {
// Fetch the Strategy instance
strategy := &servicemeshv1alpha2.Strategy{}
err := r.Get(context.TODO(), request.NamespacedName, strategy)
if err != nil {
if errors.IsNotFound(err) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
return r.reconcileStrategy(strategy)
}
func (r *ReconcileStrategy) reconcileStrategy(strategy *servicemeshv1alpha2.Strategy) (reconcile.Result, error) {
appName := getAppNameByStrategy(strategy)
service := &v1.Service{}
err := r.Get(context.TODO(), types.NamespacedName{Namespace: strategy.Namespace, Name: appName}, service)
if err != nil {
log.Error(err, "couldn't find service", "namespace", strategy.Namespace, "name", appName)
return reconcile.Result{}, errors.NewBadRequest(fmt.Sprintf("service %s not found", appName))
}
vs, err := r.generateVirtualService(strategy, service)
// Check if the VirtualService already exists
found := &v1alpha3.VirtualService{}
err = r.Get(context.TODO(), types.NamespacedName{Name: vs.Name, Namespace: vs.Namespace}, found)
if err != nil && errors.IsNotFound(err) {
log.Info("Creating VirtualService", "namespace", vs.Namespace, "name", vs.Name)
err = r.Create(context.TODO(), vs)
return reconcile.Result{}, err
} else if err != nil {
return reconcile.Result{}, err
}
// Update the found object and write the result back if there are any changes
if !reflect.DeepEqual(vs.Spec, found.Spec) || len(vs.OwnerReferences) == 0 {
found.Spec = vs.Spec
found.OwnerReferences = vs.OwnerReferences
log.Info("Updating VirtualService", "namespace", vs.Namespace, "name", vs.Name)
err = r.Update(context.TODO(), found)
if err != nil {
return reconcile.Result{}, err
}
}
return reconcile.Result{}, nil
}
func (r *ReconcileStrategy) generateVirtualService(strategy *servicemeshv1alpha2.Strategy, service *v1.Service) (*v1alpha3.VirtualService, error) {
// Define VirtualService to be created
vs := &v1alpha3.VirtualService{
ObjectMeta: metav1.ObjectMeta{
Name: getAppNameByStrategy(strategy),
Namespace: strategy.Namespace,
Labels: strategy.Spec.Selector.MatchLabels,
},
Spec: strategy.Spec.Template.Spec,
}
// one version rules them all
if len(strategy.Spec.GovernorVersion) > 0 {
governorDestinationWeight := v1alpha3.DestinationWeight{
Destination: v1alpha3.Destination{
Host: getAppNameByStrategy(strategy),
Subset: strategy.Spec.GovernorVersion,
},
Weight: 100,
}
if len(strategy.Spec.Template.Spec.Http) > 0 {
governorRoute := v1alpha3.HTTPRoute{
Route: []v1alpha3.DestinationWeight{governorDestinationWeight},
}
vs.Spec.Http = []v1alpha3.HTTPRoute{governorRoute}
} else if len(strategy.Spec.Template.Spec.Tcp) > 0 {
governorRoute := v1alpha3.TCPRoute{
Route: []v1alpha3.DestinationWeight{governorDestinationWeight},
}
vs.Spec.Tcp = []v1alpha3.TCPRoute{governorRoute}
}
}
if err := fillDestinationPort(vs, service); err != nil {
return nil, err
}
return vs, nil
}

View File

@@ -1,75 +0,0 @@
/*
Copyright 2019 The KubeSphere authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package strategy
import (
stdlog "log"
"os"
"path/filepath"
"sync"
"testing"
"github.com/onsi/gomega"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"kubesphere.io/kubesphere/pkg/apis"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
var cfg *rest.Config
func TestMain(m *testing.M) {
t := &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crds")},
}
apis.AddToScheme(scheme.Scheme)
var err error
if cfg, err = t.Start(); err != nil {
stdlog.Fatal(err)
}
code := m.Run()
t.Stop()
os.Exit(code)
}
// SetupTestReconcile returns a reconcile.Reconcile implementation that delegates to inner and
// writes the request to requests after Reconcile is finished.
func SetupTestReconcile(inner reconcile.Reconciler) (reconcile.Reconciler, chan reconcile.Request) {
requests := make(chan reconcile.Request)
fn := reconcile.Func(func(req reconcile.Request) (reconcile.Result, error) {
result, err := inner.Reconcile(req)
requests <- req
return result, err
})
return fn, requests
}
// StartTestManager adds recFn
func StartTestManager(mgr manager.Manager, g *gomega.GomegaWithT) (chan struct{}, *sync.WaitGroup) {
stop := make(chan struct{})
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
g.Expect(mgr.Start(stop)).NotTo(gomega.HaveOccurred())
}()
return stop, wg
}

View File

@@ -1,177 +0,0 @@
/*
Copyright 2019 The KubeSphere authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package strategy
import (
"github.com/knative/pkg/apis/istio/common/v1alpha1"
"github.com/knative/pkg/apis/istio/v1alpha3"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/json"
"testing"
"time"
"github.com/onsi/gomega"
"golang.org/x/net/context"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
servicemeshv1alpha2 "kubesphere.io/kubesphere/pkg/apis/servicemesh/v1alpha2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
var c client.Client
var expectedRequest = reconcile.Request{NamespacedName: types.NamespacedName{Name: "foo", Namespace: "default"}}
var depKey = types.NamespacedName{Name: "details", Namespace: "default"}
const timeout = time.Second * 5
var labels = map[string]string{
"app.kubernetes.io/name": "details",
"app.kubernetes.io/version": "v1",
"app": "details",
"servicemesh.kubesphere.io/enabled": "",
}
var svc = v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "details",
Namespace: "default",
Labels: labels,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
{
Name: "http",
Port: 8080,
Protocol: v1.ProtocolTCP,
},
},
Selector: labels,
},
}
func TestReconcile(t *testing.T) {
g := gomega.NewGomegaWithT(t)
instance := &servicemeshv1alpha2.Strategy{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "default",
Labels: labels,
},
Spec: servicemeshv1alpha2.StrategySpec{
Type: servicemeshv1alpha2.CanaryType,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: servicemeshv1alpha2.VirtualServiceTemplateSpec{
Spec: v1alpha3.VirtualServiceSpec{
Hosts: []string{"details"},
Gateways: []string{"default"},
Http: []v1alpha3.HTTPRoute{
{
Match: []v1alpha3.HTTPMatchRequest{
{
Method: &v1alpha1.StringMatch{
Exact: "POST",
},
},
},
Route: []v1alpha3.DestinationWeight{
{
Destination: v1alpha3.Destination{
Host: "details",
Subset: "v1",
},
Weight: 60,
},
},
},
{
Route: []v1alpha3.DestinationWeight{
{
Destination: v1alpha3.Destination{
Host: "details",
Subset: "v2",
},
Weight: 40,
},
},
},
},
},
},
},
}
// Setup the Manager and Controller. Wrap the Controller Reconcile function so it writes each request to a
// channel when it is finished.
mgr, err := manager.New(cfg, manager.Options{})
g.Expect(err).NotTo(gomega.HaveOccurred())
c = mgr.GetClient()
recFn, requests := SetupTestReconcile(newReconciler(mgr))
g.Expect(add(mgr, recFn)).NotTo(gomega.HaveOccurred())
stopMgr, mgrStopped := StartTestManager(mgr, g)
defer func() {
close(stopMgr)
mgrStopped.Wait()
}()
err = c.Create(context.TODO(), &svc)
if apierrors.IsInvalid(err) {
t.Logf("failed to create service, %v", err)
return
}
g.Expect(err).NotTo(gomega.HaveOccurred())
//defer c.Delete(context.TODO(), &svc)
// Create the Strategy object and expect the Reconcile and Deployment to be created
err = c.Create(context.TODO(), instance)
// The instance object may not be a valid object because it might be missing some required fields.
// Please modify the instance object by adding required fields and then remove the following if statement.
if apierrors.IsInvalid(err) {
t.Logf("failed to create object, got an invalid object error: %v", err)
return
}
g.Expect(err).NotTo(gomega.HaveOccurred())
defer c.Delete(context.TODO(), instance)
g.Eventually(requests, timeout).Should(gomega.Receive(gomega.Equal(expectedRequest)))
vs := &v1alpha3.VirtualService{}
g.Eventually(func() error { return c.Get(context.TODO(), depKey, vs) }, timeout).
Should(gomega.Succeed())
if str, err := json.Marshal(vs); err == nil {
t.Logf("Created virtual service %s\n", str)
}
// Delete the Deployment and expect Reconcile to be called for Deployment deletion
g.Expect(c.Delete(context.TODO(), vs)).NotTo(gomega.HaveOccurred())
//g.Eventually(requests, timeout).Should(gomega.Receive(gomega.Equal(expectedRequest)))
//g.Eventually(func() error { return c.Get(context.TODO(), depKey, vs) }, timeout).Should(gomega.Succeed())
// Manually delete Deployment since GC isn't enabled in the test control plane
g.Eventually(func() error { return c.Delete(context.TODO(), vs) }, timeout).
Should(gomega.MatchError("virtualservices.networking.istio.io \"details\" not found"))
}

View File

@@ -1,6 +1,8 @@
package util
import (
"github.com/knative/pkg/apis/istio/v1alpha3"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"strings"
)
@@ -47,9 +49,9 @@ func GetComponentVersion(meta *metav1.ObjectMeta) string {
func ExtractApplicationLabels(meta *metav1.ObjectMeta) map[string]string {
labels := make(map[string]string, 0)
labels := make(map[string]string, len(ApplicationLabels))
for _, label := range ApplicationLabels {
if len(meta.Labels[label]) == 0 {
if _, ok := meta.Labels[label]; !ok {
return nil
} else {
labels[label] = meta.Labels[label]
@@ -59,13 +61,38 @@ func ExtractApplicationLabels(meta *metav1.ObjectMeta) map[string]string {
return labels
}
func IsApplicationComponent(meta *metav1.ObjectMeta) bool {
func IsApplicationComponent(lbs map[string]string) bool {
for _, label := range ApplicationLabels {
if len(meta.Labels[label]) == 0 {
if _, ok := lbs[label]; !ok {
return false
}
}
return true
}
// if virtualservice not specified with port number, then fill with service first port
func FillDestinationPort(vs *v1alpha3.VirtualService, service *v1.Service) {
// fill http port
for i := range vs.Spec.Http {
for j := range vs.Spec.Http[i].Route {
if vs.Spec.Http[i].Route[j].Destination.Port.Number == 0 {
vs.Spec.Http[i].Route[j].Destination.Port.Number = uint32(service.Spec.Ports[0].Port)
}
}
if vs.Spec.Http[i].Mirror != nil && vs.Spec.Http[i].Mirror.Port.Number == 0 {
vs.Spec.Http[i].Mirror.Port.Number = uint32(service.Spec.Ports[0].Port)
}
}
// fill tcp port
for i := range vs.Spec.Tcp {
for j := range vs.Spec.Tcp[i].Route {
if vs.Spec.Tcp[i].Route[j].Destination.Port.Number == 0 {
vs.Spec.Tcp[i].Route[j].Destination.Port.Number = uint32(service.Spec.Ports[0].Port)
}
}
}
}

View File

@@ -6,14 +6,17 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/metrics"
"kubesphere.io/kubesphere/pkg/controller/virtualservice/util"
"reflect"
"strings"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
@@ -27,6 +30,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
servicemeshv1alpha2 "kubesphere.io/kubesphere/pkg/apis/servicemesh/v1alpha2"
servicemeshinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/servicemesh/v1alpha2"
servicemeshlisters "kubesphere.io/kubesphere/pkg/client/listers/servicemesh/v1alpha2"
@@ -98,6 +102,7 @@ func NewVirtualServiceController(serviceInformer coreinformers.ServiceInformer,
AddFunc: v.enqueueService,
DeleteFunc: v.enqueueService,
UpdateFunc: func(old, cur interface{}) {
// TODO(jeff): need a more robust mechanism, because user may change labels
v.enqueueService(cur)
},
})
@@ -109,7 +114,11 @@ func NewVirtualServiceController(serviceInformer coreinformers.ServiceInformer,
v.strategySynced = strategyInformer.Informer().HasSynced
strategyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: v.deleteStrategy,
DeleteFunc: v.addStrategy,
AddFunc: v.addStrategy,
UpdateFunc: func(old, cur interface{}) {
v.addStrategy(cur)
},
})
v.destinationRuleLister = destinationRuleInformer.Lister()
@@ -185,48 +194,63 @@ func (v *VirtualServiceController) processNextWorkItem() bool {
return true
}
// created virtualservice's name are same as the service name, same
// as the destinationrule name
// labels:
// servicemesh.kubernetes.io/enabled: ""
// app.kubernetes.io/name: bookinfo
// app: reviews
// are used to bind them together.
// syncService are the main part of reconcile function body, it takes
// service, destinationrule, strategy as input to create a virtualservice
// for service.
func (v *VirtualServiceController) syncService(key string) error {
startTime := time.Now()
defer func() {
log.V(4).Info("Finished syncing service virtualservice. ", "service", key, "duration", time.Since(startTime))
}()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
log.Error(err, "not a valid controller key", "key", key)
return err
}
// default component name to service name
appName := name
defer func() {
log.V(4).Info("Finished syncing service virtualservice.", "namespace", namespace, "name", name, "duration", time.Since(startTime))
}()
service, err := v.serviceLister.Services(namespace).Get(name)
if err != nil {
// Delete the corresponding virtualservice, as the service has been deleted.
err = v.virtualServiceClient.NetworkingV1alpha3().VirtualServices(namespace).Delete(name, nil)
if err != nil && !errors.IsNotFound(err) {
log.Error(err, "delete orphan virtualservice failed", "namespace", service.Namespace, "name", service.Name)
return err
if errors.IsNotFound(err) {
// Delete the corresponding virtualservice, as the service has been deleted.
err = v.virtualServiceClient.NetworkingV1alpha3().VirtualServices(namespace).Delete(name, nil)
if err != nil && !errors.IsNotFound(err) {
log.Error(err, "delete orphan virtualservice failed", "namespace", namespace, "name", service.Name)
return err
}
return nil
}
return nil
log.Error(err, "get service failed", "namespace", namespace, "name", name)
return err
}
if len(service.Labels) < len(util.ApplicationLabels) || !util.IsApplicationComponent(&service.ObjectMeta) ||
if len(service.Labels) < len(util.ApplicationLabels) || !util.IsApplicationComponent(service.Labels) ||
len(service.Spec.Ports) == 0 {
// services don't have enough labels to create a virtualservice
// or they don't have necessary labels
// or they don't have any ports defined
return nil
}
vs, err := v.virtualServiceLister.VirtualServices(namespace).Get(name)
if err == nil {
// there already is virtual service there, no need to create another one
return nil
}
// get real component name, i.e label app value
appName = util.GetComponentName(&service.ObjectMeta)
destinationRule, err := v.destinationRuleLister.DestinationRules(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
// there is no destinationrule for this service
// maybe corresponding workloads are not created yet
return nil
log.Info("destination rules for service not found, retrying.", "namespace", namespace, "name", name)
return fmt.Errorf("destination rule for service %s/%s not found", namespace, name)
}
log.Error(err, "Couldn't get destinationrule for service.", "service", types.NamespacedName{Name: service.Name, Namespace: service.Namespace}.String())
return err
@@ -235,20 +259,47 @@ func (v *VirtualServiceController) syncService(key string) error {
subsets := destinationRule.Spec.Subsets
if len(subsets) == 0 {
// destination rule with no subsets, not possibly
err = fmt.Errorf("find destinationrule with no subsets for service %s", name)
log.Error(err, "Find destinationrule with no subsets for service", "namespace", service.Namespace, "name", name)
err = fmt.Errorf("found destinationrule with no subsets for service %s", name)
log.Error(err, "found destinationrule with no subsets", "namespace", namespace, "name", appName)
return err
} else {
vs = &v1alpha3.VirtualService{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: util.ExtractApplicationLabels(&service.ObjectMeta),
},
Spec: v1alpha3.VirtualServiceSpec{
Hosts: []string{name},
},
}
// fetch all strategies applied to service
strategies, err := v.strategyLister.Strategies(namespace).List(labels.SelectorFromSet(map[string]string{util.AppLabel: appName}))
if err != nil {
log.Error(err, "list strategies for service failed", "namespace", namespace, "name", appName)
return err
} else if len(strategies) > 1 {
// more than one strategies are not allowed, it will cause collision
err = fmt.Errorf("more than one strategies applied to service %s/%s is forbbiden", namespace, appName)
log.Error(err, "")
return err
}
// get current virtual service
currentVirtualService, err := v.virtualServiceLister.VirtualServices(namespace).Get(appName)
if err != nil {
if errors.IsNotFound(err) {
currentVirtualService = &v1alpha3.VirtualService{
ObjectMeta: metav1.ObjectMeta{
Name: appName,
Namespace: namespace,
Labels: util.ExtractApplicationLabels(&service.ObjectMeta),
},
}
}
return nil
}
vs := currentVirtualService.DeepCopy()
if len(strategies) > 0 {
// apply strategy spec to virtualservice
vs.Spec = v.generateVirtualServiceSpec(strategies[0], service).Spec
} else {
// create a whole new virtualservice
// TODO(jeff): use FQDN to replace service name
vs.Spec.Hosts = []string{name}
// check if service has TCP protocol ports
for _, port := range service.Spec.Ports {
@@ -275,18 +326,45 @@ func (v *VirtualServiceController) syncService(key string) error {
vs.Spec.Tcp = []v1alpha3.TCPRoute{{Route: []v1alpha3.DestinationWeight{route}}}
}
}
}
if len(vs.Spec.Http) > 0 || len(vs.Spec.Tcp) > 0 {
_, err := v.virtualServiceClient.NetworkingV1alpha3().VirtualServices(namespace).Create(vs)
if err != nil {
v.eventRecorder.Event(vs, v1.EventTypeWarning, "FailedToCreateVirtualService", fmt.Sprintf("Failed to create virtualservice for service %v/%v: %v", service.Namespace, service.Name, err))
log.Error(err, "create virtualservice for service failed.", "service", service)
return err
}
createVirtualService := len(currentVirtualService.ResourceVersion) == 0
if !createVirtualService &&
reflect.DeepEqual(vs.Spec, currentVirtualService.Spec) &&
reflect.DeepEqual(service.Labels, currentVirtualService.Labels) {
log.V(4).Info("virtual service are equal, skipping update ")
return nil
}
newVirtualService := currentVirtualService.DeepCopy()
newVirtualService.Labels = service.Labels
newVirtualService.Spec = vs.Spec
if newVirtualService.Annotations == nil {
newVirtualService.Annotations = make(map[string]string)
}
if len(newVirtualService.Spec.Http) == 0 && len(newVirtualService.Spec.Tcp) == 0 && len(newVirtualService.Spec.Tls) == 0 {
err = fmt.Errorf("service %s/%s doesn't have a valid port spec", namespace, name)
log.Error(err, "")
return err
}
if createVirtualService {
_, err = v.virtualServiceClient.NetworkingV1alpha3().VirtualServices(namespace).Create(newVirtualService)
} else {
_, err = v.virtualServiceClient.NetworkingV1alpha3().VirtualServices(namespace).Update(newVirtualService)
}
if err != nil {
if createVirtualService {
v.eventRecorder.Event(newVirtualService, v1.EventTypeWarning, "FailedToCreateVirtualService", fmt.Sprintf("Failed to create virtualservice for service %v/%v: %v", namespace, name, err))
} else {
log.Info("service doesn't have a tcp port.")
return nil
v.eventRecorder.Event(newVirtualService, v1.EventTypeWarning, "FailedToUpdateVirtualService", fmt.Sprintf("Failed to update virtualservice for service %v/%v: %v", namespace, name, err))
}
return err
}
return nil
@@ -299,7 +377,7 @@ func (v *VirtualServiceController) addDestinationRule(obj interface{}) {
service, err := v.serviceLister.Services(dr.Namespace).Get(dr.Name)
if err != nil {
if errors.IsNotFound(err) {
log.V(0).Info("service not created yet", "namespace", dr.Namespace, "service", dr.Name)
log.V(3).Info("service not created yet", "namespace", dr.Namespace, "service", dr.Name)
return
}
utilruntime.HandleError(fmt.Errorf("unable to get service with name %s/%s", dr.Namespace, dr.Name))
@@ -324,8 +402,46 @@ func (v *VirtualServiceController) addDestinationRule(obj interface{}) {
return
}
func (v *VirtualServiceController) deleteStrategy(obj interface{}) {
// nothing to do right now
// when a strategy created
func (v *VirtualServiceController) addStrategy(obj interface{}) {
strategy := obj.(*servicemeshv1alpha2.Strategy)
lbs := util.ExtractApplicationLabels(&strategy.ObjectMeta)
if len(lbs) == 0 {
err := fmt.Errorf("invalid strategy %s/%s labels %s, not have required labels", strategy.Namespace, strategy.Name, strategy.Labels)
log.Error(err, "")
utilruntime.HandleError(err)
return
}
allServices, err := v.serviceLister.Services(strategy.Namespace).List(labels.SelectorFromSet(lbs))
if err != nil {
log.Error(err, "list services failed")
utilruntime.HandleError(err)
return
}
// avoid insert a key multiple times
set := sets.String{}
for i := range allServices {
service := allServices[i]
if service.Spec.Selector == nil || len(service.Spec.Ports) == 0 {
// services with nil selectors match nothing, not everything.
continue
}
key, err := controller.KeyFunc(service)
if err != nil {
utilruntime.HandleError(err)
return
}
set.Insert(key)
}
for key := range set {
v.queue.Add(key)
}
}
func (v *VirtualServiceController) handleErr(err error, key interface{}) {
@@ -344,3 +460,40 @@ func (v *VirtualServiceController) handleErr(err error, key interface{}) {
v.queue.Forget(key)
utilruntime.HandleError(err)
}
func (v *VirtualServiceController) generateVirtualServiceSpec(strategy *servicemeshv1alpha2.Strategy, service *v1.Service) *v1alpha3.VirtualService {
// Define VirtualService to be created
vs := &v1alpha3.VirtualService{
Spec: strategy.Spec.Template.Spec,
}
// one version rules them all
if len(strategy.Spec.GovernorVersion) > 0 {
governorDestinationWeight := v1alpha3.DestinationWeight{
Destination: v1alpha3.Destination{
Host: service.Name,
Subset: strategy.Spec.GovernorVersion,
},
Weight: 100,
}
if len(strategy.Spec.Template.Spec.Http) > 0 {
governorRoute := v1alpha3.HTTPRoute{
Route: []v1alpha3.DestinationWeight{governorDestinationWeight},
}
vs.Spec.Http = []v1alpha3.HTTPRoute{governorRoute}
} else if len(strategy.Spec.Template.Spec.Tcp) > 0 {
governorRoute := v1alpha3.TCPRoute{
Route: []v1alpha3.DestinationWeight{governorDestinationWeight},
}
vs.Spec.Tcp = []v1alpha3.TCPRoute{governorRoute}
}
}
util.FillDestinationPort(vs, service)
return vs
}