This commit is contained in:
Jeff
2019-04-07 23:42:00 +08:00
committed by zryfish
parent f01b4bd4a4
commit 828de74cc5
12 changed files with 145 additions and 101 deletions

View File

@@ -51,7 +51,8 @@ func AddControllers(mgr manager.Manager, cfg *rest.Config, stopCh <-chan struct{
istioInformer.Networking().V1alpha3().DestinationRules(), istioInformer.Networking().V1alpha3().DestinationRules(),
servicemeshinformer.Servicemesh().V1alpha2().Strategies(), servicemeshinformer.Servicemesh().V1alpha2().Strategies(),
kubeClient, kubeClient,
istioclient) istioclient,
servicemeshclient)
drController := destinationrule.NewDestinationRuleController(informerFactory.Apps().V1().Deployments(), drController := destinationrule.NewDestinationRuleController(informerFactory.Apps().V1().Deployments(),
istioInformer.Networking().V1alpha3().DestinationRules(), istioInformer.Networking().V1alpha3().DestinationRules(),

View File

@@ -114,7 +114,7 @@ func initializeKialiConfig(s *options.ServerRunOptions) {
func initializeESClientConfig() { func initializeESClientConfig() {
// List all outputs // List all outputs
outputs,err := logging.GetFluentbitOutputFromConfigMap() outputs, err := logging.GetFluentbitOutputFromConfigMap()
if err != nil { if err != nil {
glog.Errorln(err) glog.Errorln(err)
return return

View File

@@ -49,10 +49,6 @@ spec:
description: Governor version, the version takes control of all incoming description: Governor version, the version takes control of all incoming
traffic label version value traffic label version value
type: string type: string
paused:
description: Indicates that the strategy is paused and will not be processed
by the strategy controller
type: boolean
principal: principal:
description: Principal version, the one as reference version label version description: Principal version, the one as reference version label version
value value
@@ -60,6 +56,10 @@ spec:
selector: selector:
description: Label selector for virtual services. description: Label selector for virtual services.
type: object type: object
strategyPolicy:
description: strategy policy, how the strategy will be applied by the
strategy controller
type: string
template: template:
description: Template describes the virtual service that will be created. description: Template describes the virtual service that will be created.
properties: properties:

View File

@@ -4,50 +4,6 @@ metadata:
creationTimestamp: null creationTimestamp: null
name: manager-role name: manager-role
rules: rules:
- apiGroups:
- networking.istio.io
resources:
- virtualservices
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- apiGroups:
- networking.istio.io
resources:
- virtualservices/status
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- apiGroups:
- servicemesh.kubesphere.io
resources:
- strategies
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- apiGroups:
- servicemesh.kubesphere.io
resources:
- strategies/status
verbs:
- get
- update
- patch
- apiGroups: - apiGroups:
- admissionregistration.k8s.io - admissionregistration.k8s.io
resources: resources:

View File

@@ -214,4 +214,4 @@ func addWebService(c *restful.Container) error {
c.Add(ws) c.Add(ws)
return nil return nil
} }

View File

@@ -28,7 +28,6 @@ import (
type StrategyType string type StrategyType string
const ( const (
// Canary strategy type // Canary strategy type
CanaryType StrategyType = "Canary" CanaryType StrategyType = "Canary"
@@ -39,9 +38,21 @@ const (
Mirror StrategyType = "Mirror" Mirror StrategyType = "Mirror"
) )
type StrategyPolicy string
const (
// apply strategy only until workload is ready
PolicyWaitForWorkloadReady StrategyPolicy = "WaitForWorkloadReady"
// apply strategy immediately no matter workload status is
PolicyImmediately StrategyPolicy = "Immediately"
// pause strategy
PolicyPause StrategyPolicy = "Paused"
)
// StrategySpec defines the desired state of Strategy // StrategySpec defines the desired state of Strategy
type StrategySpec struct { type StrategySpec struct {
// Strategy type // Strategy type
Type StrategyType `json:"type,omitempty"` Type StrategyType `json:"type,omitempty"`
@@ -62,9 +73,9 @@ type StrategySpec struct {
// Template describes the virtual service that will be created. // Template describes the virtual service that will be created.
Template VirtualServiceTemplateSpec `json:"template,omitempty"` Template VirtualServiceTemplateSpec `json:"template,omitempty"`
// Indicates that the strategy is paused and will not be processed // strategy policy, how the strategy will be applied
// by the strategy controller // by the strategy controller
Paused bool `json:"paused,omitempty"` StrategyPolicy StrategyPolicy `json:"strategyPolicy,omitempty"`
} }
// VirtualServiceTemplateSpec // VirtualServiceTemplateSpec

View File

@@ -220,10 +220,13 @@ func (v *DestinationRuleController) syncService(key string) error {
return nil return nil
} }
if len(service.Labels) < len(util.ApplicationLabels) || !util.IsApplicationComponent(service.Labels) || if len(service.Labels) < len(util.ApplicationLabels) ||
!util.IsApplicationComponent(service.Labels) ||
!util.IsServicemeshEnabled(service.Annotations) ||
len(service.Spec.Ports) == 0 { len(service.Spec.Ports) == 0 {
// services don't have enough labels to create a virtualservice // services don't have enough labels to create a virtualservice
// or they don't have necessary labels // or they don't have necessary labels
// or they don't have servicemesh enabled
// or they don't have any ports defined // or they don't have any ports defined
return nil return nil
} }
@@ -240,7 +243,10 @@ func (v *DestinationRuleController) syncService(key string) error {
for _, deployment := range deployments { for _, deployment := range deployments {
// not a valid deployment we required // not a valid deployment we required
if !util.IsApplicationComponent(deployment.Labels) || !util.IsApplicationComponent(deployment.Spec.Selector.MatchLabels) { if !util.IsApplicationComponent(deployment.Labels) ||
!util.IsApplicationComponent(deployment.Spec.Selector.MatchLabels) ||
deployment.Status.ReadyReplicas == 0 ||
!util.IsServicemeshEnabled(deployment.Annotations) {
continue continue
} }
@@ -406,7 +412,9 @@ func (v *DestinationRuleController) getDeploymentServiceMemberShip(deployment *a
for i := range allServices { for i := range allServices {
service := allServices[i] service := allServices[i]
if service.Spec.Selector == nil || !util.IsApplicationComponent(service.Labels) { if service.Spec.Selector == nil ||
!util.IsApplicationComponent(service.Labels) ||
!util.IsServicemeshEnabled(service.Annotations) {
// services with nil selectors match nothing, not everything. // services with nil selectors match nothing, not everything.
continue continue
} }

View File

@@ -8,17 +8,16 @@ import (
) )
const ( const (
AppLabel = "app" AppLabel = "app"
VersionLabel = "version" VersionLabel = "version"
ApplicationNameLabel = "app.kubernetes.io/name" ApplicationNameLabel = "app.kubernetes.io/name"
ApplicationVersionLabel = "app.kubernetes.io/version" ApplicationVersionLabel = "app.kubernetes.io/version"
ServiceMeshEnabledLabel = "servicemesh.kubesphere.io/enabled" ServiceMeshEnabledAnnotation = "servicemesh.kubesphere.io/enabled"
) )
// resource with these following labels considered as part of servicemesh // resource with these following labels considered as part of servicemesh
var ApplicationLabels = [...]string{ var ApplicationLabels = [...]string{
ApplicationNameLabel, ApplicationNameLabel,
ServiceMeshEnabledLabel,
AppLabel, AppLabel,
} }
@@ -40,6 +39,15 @@ func GetComponentName(meta *metav1.ObjectMeta) string {
return "" return ""
} }
func IsServicemeshEnabled(annotations map[string]string) bool {
if enabled, ok := annotations[ServiceMeshEnabledAnnotation]; ok {
if enabled == "true" {
return true
}
}
return false
}
func GetComponentVersion(meta *metav1.ObjectMeta) string { func GetComponentVersion(meta *metav1.ObjectMeta) string {
if len(meta.Labels[VersionLabel]) > 0 { if len(meta.Labels[VersionLabel]) > 0 {
return meta.Labels[VersionLabel] return meta.Labels[VersionLabel]

View File

@@ -17,9 +17,8 @@ import (
"k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/pkg/util/metrics"
"kubesphere.io/kubesphere/pkg/controller/virtualservice/util" "kubesphere.io/kubesphere/pkg/controller/virtualservice/util"
"reflect" "reflect"
"strings"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
"strings"
istioclient "github.com/knative/pkg/client/clientset/versioned" istioclient "github.com/knative/pkg/client/clientset/versioned"
istioinformers "github.com/knative/pkg/client/informers/externalversions/istio/v1alpha3" istioinformers "github.com/knative/pkg/client/informers/externalversions/istio/v1alpha3"
@@ -31,6 +30,7 @@ import (
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
servicemeshv1alpha2 "kubesphere.io/kubesphere/pkg/apis/servicemesh/v1alpha2" servicemeshv1alpha2 "kubesphere.io/kubesphere/pkg/apis/servicemesh/v1alpha2"
servicemeshclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
servicemeshinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/servicemesh/v1alpha2" servicemeshinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/servicemesh/v1alpha2"
servicemeshlisters "kubesphere.io/kubesphere/pkg/client/listers/servicemesh/v1alpha2" servicemeshlisters "kubesphere.io/kubesphere/pkg/client/listers/servicemesh/v1alpha2"
@@ -52,6 +52,7 @@ type VirtualServiceController struct {
client clientset.Interface client clientset.Interface
virtualServiceClient istioclient.Interface virtualServiceClient istioclient.Interface
servicemeshClient servicemeshclient.Interface
eventBroadcaster record.EventBroadcaster eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder eventRecorder record.EventRecorder
@@ -78,7 +79,8 @@ func NewVirtualServiceController(serviceInformer coreinformers.ServiceInformer,
destinationRuleInformer istioinformers.DestinationRuleInformer, destinationRuleInformer istioinformers.DestinationRuleInformer,
strategyInformer servicemeshinformers.StrategyInformer, strategyInformer servicemeshinformers.StrategyInformer,
client clientset.Interface, client clientset.Interface,
virtualServiceClient istioclient.Interface) *VirtualServiceController { virtualServiceClient istioclient.Interface,
servicemeshClient servicemeshclient.Interface) *VirtualServiceController {
broadcaster := record.NewBroadcaster() broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(func(format string, args ...interface{}) { broadcaster.StartLogging(func(format string, args ...interface{}) {
@@ -94,6 +96,7 @@ func NewVirtualServiceController(serviceInformer coreinformers.ServiceInformer,
v := &VirtualServiceController{ v := &VirtualServiceController{
client: client, client: client,
virtualServiceClient: virtualServiceClient, virtualServiceClient: virtualServiceClient,
servicemeshClient: servicemeshClient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "virtualservice"), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "virtualservice"),
workerLoopPeriod: time.Second, workerLoopPeriod: time.Second,
} }
@@ -234,7 +237,9 @@ func (v *VirtualServiceController) syncService(key string) error {
return err return err
} }
if len(service.Labels) < len(util.ApplicationLabels) || !util.IsApplicationComponent(service.Labels) || if len(service.Labels) < len(util.ApplicationLabels) ||
!util.IsApplicationComponent(service.Labels) ||
!util.IsServicemeshEnabled(service.Annotations) ||
len(service.Spec.Ports) == 0 { len(service.Spec.Ports) == 0 {
// services don't have enough labels to create a virtualservice // services don't have enough labels to create a virtualservice
// or they don't have necessary labels // or they don't have necessary labels
@@ -294,40 +299,69 @@ func (v *VirtualServiceController) syncService(key string) error {
} }
vs := currentVirtualService.DeepCopy() vs := currentVirtualService.DeepCopy()
// create a whole new virtualservice
// TODO(jeff): use FQDN to replace service name
vs.Spec.Hosts = []string{name}
// check if service has TCP protocol ports
for _, port := range service.Spec.Ports {
var route v1alpha3.DestinationWeight
if port.Protocol == v1.ProtocolTCP {
route = v1alpha3.DestinationWeight{
Destination: v1alpha3.Destination{
Host: name,
Subset: subsets[0].Name,
Port: v1alpha3.PortSelector{
Number: uint32(port.Port),
},
},
Weight: 100,
}
// a http port, add to HTTPRoute
if len(port.Name) > 0 && (port.Name == "http" || strings.HasPrefix(port.Name, "http-")) {
vs.Spec.Http = []v1alpha3.HTTPRoute{{Route: []v1alpha3.DestinationWeight{route}}}
break
}
// everything else treated as TCPRoute
vs.Spec.Tcp = []v1alpha3.TCPRoute{{Route: []v1alpha3.DestinationWeight{route}}}
}
}
if len(strategies) > 0 { if len(strategies) > 0 {
// apply strategy spec to virtualservice // apply strategy spec to virtualservice
vs.Spec = v.generateVirtualServiceSpec(strategies[0], service).Spec
} else {
// create a whole new virtualservice
// TODO(jeff): use FQDN to replace service name switch strategies[0].Spec.StrategyPolicy {
vs.Spec.Hosts = []string{name} case servicemeshv1alpha2.PolicyPause:
break
case servicemeshv1alpha2.PolicyWaitForWorkloadReady:
set := v.getSubsets(strategies[0])
// check if service has TCP protocol ports setNames := sets.String{}
for _, port := range service.Spec.Ports { for i := range subsets {
var route v1alpha3.DestinationWeight setNames.Insert(subsets[i].Name)
if port.Protocol == v1.ProtocolTCP {
route = v1alpha3.DestinationWeight{
Destination: v1alpha3.Destination{
Host: name,
Subset: subsets[0].Name,
Port: v1alpha3.PortSelector{
Number: uint32(port.Port),
},
},
Weight: 100,
}
// a http port, add to HTTPRoute
if len(port.Name) > 0 && (port.Name == "http" || strings.HasPrefix(port.Name, "http-")) {
vs.Spec.Http = []v1alpha3.HTTPRoute{{Route: []v1alpha3.DestinationWeight{route}}}
break
}
// everything else treated as TCPRoute
vs.Spec.Tcp = []v1alpha3.TCPRoute{{Route: []v1alpha3.DestinationWeight{route}}}
} }
nonExist := false
for k := range set {
if !setNames.Has(k) {
nonExist = true
}
}
// strategy has subset that are not ready
if nonExist {
break
} else {
vs.Spec = v.generateVirtualServiceSpec(strategies[0], service).Spec
}
case servicemeshv1alpha2.PolicyImmediately:
vs.Spec = v.generateVirtualServiceSpec(strategies[0], service).Spec
default:
vs.Spec = v.generateVirtualServiceSpec(strategies[0], service).Spec
} }
} }
createVirtualService := len(currentVirtualService.ResourceVersion) == 0 createVirtualService := len(currentVirtualService.ResourceVersion) == 0
@@ -359,7 +393,6 @@ func (v *VirtualServiceController) syncService(key string) error {
} }
if err != nil { if err != nil {
if createVirtualService { if createVirtualService {
v.eventRecorder.Event(newVirtualService, v1.EventTypeWarning, "FailedToCreateVirtualService", fmt.Sprintf("Failed to create virtualservice for service %v/%v: %v", namespace, name, err)) v.eventRecorder.Event(newVirtualService, v1.EventTypeWarning, "FailedToCreateVirtualService", fmt.Sprintf("Failed to create virtualservice for service %v/%v: %v", namespace, name, err))
} else { } else {
@@ -463,6 +496,34 @@ func (v *VirtualServiceController) handleErr(err error, key interface{}) {
utilruntime.HandleError(err) utilruntime.HandleError(err)
} }
func (v *VirtualServiceController) getSubsets(strategy *servicemeshv1alpha2.Strategy) sets.String {
set := sets.String{}
for _, httpRoute := range strategy.Spec.Template.Spec.Http {
for _, dw := range httpRoute.Route {
set.Insert(dw.Destination.Subset)
}
if httpRoute.Mirror != nil {
set.Insert(httpRoute.Mirror.Subset)
}
}
for _, tcpRoute := range strategy.Spec.Template.Spec.Tcp {
for _, dw := range tcpRoute.Route {
set.Insert(dw.Destination.Subset)
}
}
for _, tlsRoute := range strategy.Spec.Template.Spec.Tls {
for _, dw := range tlsRoute.Route {
set.Insert(dw.Destination.Subset)
}
}
return set
}
func (v *VirtualServiceController) generateVirtualServiceSpec(strategy *servicemeshv1alpha2.Strategy, service *v1.Service) *v1alpha3.VirtualService { func (v *VirtualServiceController) generateVirtualServiceSpec(strategy *servicemeshv1alpha2.Strategy, service *v1.Service) *v1alpha3.VirtualService {
// Define VirtualService to be created // Define VirtualService to be created
@@ -472,7 +533,6 @@ func (v *VirtualServiceController) generateVirtualServiceSpec(strategy *servicem
// one version rules them all // one version rules them all
if len(strategy.Spec.GovernorVersion) > 0 { if len(strategy.Spec.GovernorVersion) > 0 {
governorDestinationWeight := v1alpha3.DestinationWeight{ governorDestinationWeight := v1alpha3.DestinationWeight{
Destination: v1alpha3.Destination{ Destination: v1alpha3.Destination{
Host: service.Name, Host: service.Name,

View File

@@ -27,4 +27,4 @@ const (
QueryLevelWorkload QueryLevelWorkload
QueryLevelPod QueryLevelPod
QueryLevelContainer QueryLevelContainer
) )

View File

@@ -248,7 +248,7 @@ func FluentbitOutputInsert(output fb.OutputPlugin) *FluentbitOutputsResult {
// 1. Update ConfigMap // 1. Update ConfigMap
var outputs []fb.OutputPlugin var outputs []fb.OutputPlugin
outputs, err := GetFluentbitOutputFromConfigMap() outputs, err := GetFluentbitOutputFromConfigMap()
if err != nil { if err != nil {
// If the ConfigMap doesn't exist, a new one will be created later // If the ConfigMap doesn't exist, a new one will be created later
glog.Errorln(err) glog.Errorln(err)
} }

View File

@@ -42,7 +42,7 @@ func (*s2iRunSearcher) match(match map[string]string, item *v1alpha1.S2iRun) boo
return false return false
} }
case status: case status:
if string(item.Status.RunState) != v{ if string(item.Status.RunState) != v {
return false return false
} }
default: default: