devlopment branch (#1736)
This commit is contained in:
@@ -22,22 +22,17 @@ import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
v1 "k8s.io/api/apps/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/klog"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client"
|
||||
"sort"
|
||||
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"kubesphere.io/kubesphere/pkg/informers"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
|
||||
"k8s.io/klog"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"kubesphere.io/kubesphere/pkg/constants"
|
||||
)
|
||||
|
||||
// choose router node ip by labels, currently select master node
|
||||
@@ -46,20 +41,32 @@ var routerNodeIPLabelSelector = map[string]string{
|
||||
}
|
||||
|
||||
const (
|
||||
servicemeshEnabled = "servicemesh.kubesphere.io/enabled"
|
||||
sidecarInject = "sidecar.istio.io/inject"
|
||||
servicemeshEnabled = "servicemesh.kubesphere.io/enabled"
|
||||
sidecarInject = "sidecar.istio.io/inject"
|
||||
ingressControllerFolder = "/etc/kubesphere/ingress-controller"
|
||||
ingressControllerPrefix = "kubesphere-router-"
|
||||
ingressControllerNamespace = "kubesphere-controls-system"
|
||||
)
|
||||
|
||||
var routerTemplates map[string]runtime.Object
|
||||
type RouterOperator interface {
|
||||
GetRouter(namespace string) (*corev1.Service, error)
|
||||
CreateRouter(namespace string, serviceType corev1.ServiceType, annotations map[string]string) (*corev1.Service, error)
|
||||
DeleteRouter(namespace string) (*corev1.Service, error)
|
||||
UpdateRouter(namespace string, serviceType corev1.ServiceType, annotations map[string]string) (*corev1.Service, error)
|
||||
}
|
||||
|
||||
// Load yamls
|
||||
func init() {
|
||||
type routerOperator struct {
|
||||
routerTemplates map[string]runtime.Object
|
||||
client kubernetes.Interface
|
||||
informers informers.SharedInformerFactory
|
||||
}
|
||||
|
||||
func NewRouterOperator(client kubernetes.Interface, informers informers.SharedInformerFactory) RouterOperator {
|
||||
yamls, err := loadYamls()
|
||||
routerTemplates = make(map[string]runtime.Object, 2)
|
||||
routerTemplates := make(map[string]runtime.Object, 2)
|
||||
|
||||
if err != nil {
|
||||
klog.Warning("error happened during loading external yamls", err)
|
||||
return
|
||||
klog.Fatalf("error happened during loading external yamls, %v", err)
|
||||
}
|
||||
|
||||
for _, f := range yamls {
|
||||
@@ -79,13 +86,18 @@ func init() {
|
||||
}
|
||||
}
|
||||
|
||||
return &routerOperator{
|
||||
client: client,
|
||||
informers: informers,
|
||||
routerTemplates: routerTemplates,
|
||||
}
|
||||
}
|
||||
|
||||
// get master node ip, if there are multiple master nodes,
|
||||
// choose first one according by their names alphabetically
|
||||
func getMasterNodeIp() string {
|
||||
func (c *routerOperator) getMasterNodeIp() string {
|
||||
|
||||
nodeLister := informers.SharedInformerFactory().Core().V1().Nodes().Lister()
|
||||
nodeLister := c.informers.Core().V1().Nodes().Lister()
|
||||
selector := labels.SelectorFromSet(routerNodeIPLabelSelector)
|
||||
|
||||
masters, err := nodeLister.List(selector)
|
||||
@@ -110,7 +122,7 @@ func getMasterNodeIp() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func addLoadBalancerIp(service *corev1.Service) {
|
||||
func (c *routerOperator) addLoadBalancerIp(service *corev1.Service) {
|
||||
|
||||
if service == nil {
|
||||
return
|
||||
@@ -118,7 +130,7 @@ func addLoadBalancerIp(service *corev1.Service) {
|
||||
|
||||
// append selected node ip as loadbalancer ingress ip
|
||||
if service.Spec.Type != corev1.ServiceTypeLoadBalancer && len(service.Status.LoadBalancer.Ingress) == 0 {
|
||||
rip := getMasterNodeIp()
|
||||
rip := c.getMasterNodeIp()
|
||||
if len(rip) == 0 {
|
||||
klog.Info("can not get node ip")
|
||||
return
|
||||
@@ -132,36 +144,17 @@ func addLoadBalancerIp(service *corev1.Service) {
|
||||
}
|
||||
}
|
||||
|
||||
func GetAllRouters() ([]*corev1.Service, error) {
|
||||
|
||||
selector := labels.SelectorFromSet(labels.Set{"app": "kubesphere", "component": "ks-router", "tier": "backend"})
|
||||
serviceLister := informers.SharedInformerFactory().Core().V1().Services().Lister()
|
||||
services, err := serviceLister.Services(constants.IngressControllerNamespace).List(selector)
|
||||
|
||||
for i := range services {
|
||||
addLoadBalancerIp(services[i])
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return services, nil
|
||||
}
|
||||
|
||||
// Get router from a namespace
|
||||
func GetRouter(namespace string) (*corev1.Service, error) {
|
||||
service, err := getRouterService(namespace)
|
||||
addLoadBalancerIp(service)
|
||||
func (c *routerOperator) GetRouter(namespace string) (*corev1.Service, error) {
|
||||
service, err := c.getRouterService(namespace)
|
||||
c.addLoadBalancerIp(service)
|
||||
return service, err
|
||||
}
|
||||
|
||||
func getRouterService(namespace string) (*corev1.Service, error) {
|
||||
serviceName := constants.IngressControllerPrefix + namespace
|
||||
|
||||
serviceLister := informers.SharedInformerFactory().Core().V1().Services().Lister()
|
||||
service, err := serviceLister.Services(constants.IngressControllerNamespace).Get(serviceName)
|
||||
func (c *routerOperator) getRouterService(namespace string) (*corev1.Service, error) {
|
||||
serviceName := ingressControllerPrefix + namespace
|
||||
serviceLister := c.informers.Core().V1().Services().Lister()
|
||||
service, err := serviceLister.Services(ingressControllerNamespace).Get(serviceName)
|
||||
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
@@ -175,10 +168,8 @@ func getRouterService(namespace string) (*corev1.Service, error) {
|
||||
|
||||
// Load all resource yamls
|
||||
func loadYamls() ([]string, error) {
|
||||
|
||||
var yamls []string
|
||||
|
||||
files, err := ioutil.ReadDir(constants.IngressControllerFolder)
|
||||
files, err := ioutil.ReadDir(ingressControllerFolder)
|
||||
if err != nil {
|
||||
klog.Warning(err)
|
||||
return nil, err
|
||||
@@ -188,7 +179,7 @@ func loadYamls() ([]string, error) {
|
||||
if file.IsDir() || !strings.HasSuffix(file.Name(), ".yaml") {
|
||||
continue
|
||||
}
|
||||
content, err := ioutil.ReadFile(constants.IngressControllerFolder + "/" + file.Name())
|
||||
content, err := ioutil.ReadFile(ingressControllerFolder + "/" + file.Name())
|
||||
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
@@ -202,7 +193,7 @@ func loadYamls() ([]string, error) {
|
||||
}
|
||||
|
||||
// Create a ingress controller in a namespace
|
||||
func CreateRouter(namespace string, routerType corev1.ServiceType, annotations map[string]string) (*corev1.Service, error) {
|
||||
func (c *routerOperator) CreateRouter(namespace string, routerType corev1.ServiceType, annotations map[string]string) (*corev1.Service, error) {
|
||||
|
||||
injectSidecar := false
|
||||
if enabled, ok := annotations[servicemeshEnabled]; ok {
|
||||
@@ -211,32 +202,32 @@ func CreateRouter(namespace string, routerType corev1.ServiceType, annotations m
|
||||
}
|
||||
}
|
||||
|
||||
err := createOrUpdateRouterWorkload(namespace, routerType == corev1.ServiceTypeLoadBalancer, injectSidecar)
|
||||
err := c.createOrUpdateRouterWorkload(namespace, routerType == corev1.ServiceTypeLoadBalancer, injectSidecar)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
router, err := createRouterService(namespace, routerType, annotations)
|
||||
router, err := c.createRouterService(namespace, routerType, annotations)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
_ = deleteRouterWorkload(namespace)
|
||||
_ = c.deleteRouterWorkload(namespace)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
addLoadBalancerIp(router)
|
||||
c.addLoadBalancerIp(router)
|
||||
return router, nil
|
||||
}
|
||||
|
||||
// DeleteRouter is used to delete ingress controller related resources in namespace
|
||||
// It will not delete ClusterRole resource cause it maybe used by other controllers
|
||||
func DeleteRouter(namespace string) (*corev1.Service, error) {
|
||||
err := deleteRouterWorkload(namespace)
|
||||
func (c *routerOperator) DeleteRouter(namespace string) (*corev1.Service, error) {
|
||||
err := c.deleteRouterWorkload(namespace)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
}
|
||||
|
||||
router, err := deleteRouterService(namespace)
|
||||
router, err := c.deleteRouterService(namespace)
|
||||
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
@@ -245,28 +236,23 @@ func DeleteRouter(namespace string) (*corev1.Service, error) {
|
||||
return router, nil
|
||||
}
|
||||
|
||||
func createRouterService(namespace string, routerType corev1.ServiceType, annotations map[string]string) (*corev1.Service, error) {
|
||||
func (c *routerOperator) createRouterService(namespace string, routerType corev1.ServiceType, annotations map[string]string) (*corev1.Service, error) {
|
||||
|
||||
obj, ok := routerTemplates["SERVICE"]
|
||||
obj, ok := c.routerTemplates["SERVICE"]
|
||||
if !ok {
|
||||
klog.Error("service template not loaded")
|
||||
return nil, fmt.Errorf("service template not loaded")
|
||||
}
|
||||
|
||||
k8sClient := client.ClientSets().K8s().Kubernetes()
|
||||
|
||||
service := obj.(*corev1.Service)
|
||||
|
||||
service.SetAnnotations(annotations)
|
||||
service.Spec.Type = routerType
|
||||
service.Name = constants.IngressControllerPrefix + namespace
|
||||
service.Name = ingressControllerPrefix + namespace
|
||||
|
||||
// Add project selector
|
||||
service.Labels["project"] = namespace
|
||||
|
||||
service.Spec.Selector["project"] = namespace
|
||||
|
||||
service, err := k8sClient.CoreV1().Services(constants.IngressControllerNamespace).Create(service)
|
||||
service, err := c.client.CoreV1().Services(ingressControllerNamespace).Create(service)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
return nil, err
|
||||
@@ -275,41 +261,32 @@ func createRouterService(namespace string, routerType corev1.ServiceType, annota
|
||||
return service, nil
|
||||
}
|
||||
|
||||
func updateRouterService(namespace string, routerType corev1.ServiceType, annotations map[string]string) (*corev1.Service, error) {
|
||||
|
||||
k8sClient := client.ClientSets().K8s().Kubernetes()
|
||||
|
||||
service, err := getRouterService(namespace)
|
||||
func (c *routerOperator) updateRouterService(namespace string, routerType corev1.ServiceType, annotations map[string]string) (*corev1.Service, error) {
|
||||
service, err := c.getRouterService(namespace)
|
||||
if err != nil {
|
||||
klog.Error(err, "get router failed")
|
||||
return service, err
|
||||
}
|
||||
|
||||
service.Spec.Type = routerType
|
||||
|
||||
service.SetAnnotations(annotations)
|
||||
|
||||
service, err = k8sClient.CoreV1().Services(constants.IngressControllerNamespace).Update(service)
|
||||
|
||||
service, err = c.client.CoreV1().Services(ingressControllerNamespace).Update(service)
|
||||
return service, err
|
||||
}
|
||||
|
||||
func deleteRouterService(namespace string) (*corev1.Service, error) {
|
||||
|
||||
service, err := getRouterService(namespace)
|
||||
func (c *routerOperator) deleteRouterService(namespace string) (*corev1.Service, error) {
|
||||
|
||||
service, err := c.getRouterService(namespace)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
return service, err
|
||||
}
|
||||
|
||||
k8sClient := client.ClientSets().K8s().Kubernetes()
|
||||
|
||||
// delete controller service
|
||||
serviceName := constants.IngressControllerPrefix + namespace
|
||||
serviceName := ingressControllerPrefix + namespace
|
||||
deleteOptions := metav1.DeleteOptions{}
|
||||
|
||||
err = k8sClient.CoreV1().Services(constants.IngressControllerNamespace).Delete(serviceName, &deleteOptions)
|
||||
err = c.client.CoreV1().Services(ingressControllerNamespace).Delete(serviceName, &deleteOptions)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
return service, err
|
||||
@@ -318,17 +295,16 @@ func deleteRouterService(namespace string) (*corev1.Service, error) {
|
||||
return service, nil
|
||||
}
|
||||
|
||||
func createOrUpdateRouterWorkload(namespace string, publishService bool, servicemeshEnabled bool) error {
|
||||
obj, ok := routerTemplates["DEPLOYMENT"]
|
||||
func (c *routerOperator) createOrUpdateRouterWorkload(namespace string, publishService bool, servicemeshEnabled bool) error {
|
||||
obj, ok := c.routerTemplates["DEPLOYMENT"]
|
||||
if !ok {
|
||||
klog.Error("Deployment template file not loaded")
|
||||
return fmt.Errorf("deployment template file not loaded")
|
||||
}
|
||||
|
||||
deployName := constants.IngressControllerPrefix + namespace
|
||||
deployName := ingressControllerPrefix + namespace
|
||||
|
||||
k8sClient := client.ClientSets().K8s().Kubernetes()
|
||||
deployment, err := k8sClient.AppsV1().Deployments(constants.IngressControllerNamespace).Get(deployName, metav1.GetOptions{})
|
||||
deployment, err := c.client.AppsV1().Deployments(ingressControllerNamespace).Get(deployName, metav1.GetOptions{})
|
||||
|
||||
createDeployment := true
|
||||
|
||||
@@ -336,7 +312,7 @@ func createOrUpdateRouterWorkload(namespace string, publishService bool, service
|
||||
if errors.IsNotFound(err) {
|
||||
deployment = obj.(*v1.Deployment)
|
||||
|
||||
deployment.Name = constants.IngressControllerPrefix + namespace
|
||||
deployment.Name = ingressControllerPrefix + namespace
|
||||
|
||||
// Add project label
|
||||
deployment.Spec.Selector.MatchLabels["project"] = namespace
|
||||
@@ -382,15 +358,15 @@ func createOrUpdateRouterWorkload(namespace string, publishService bool, service
|
||||
}
|
||||
|
||||
if publishService {
|
||||
deployment.Spec.Template.Spec.Containers[0].Args = append(deployment.Spec.Template.Spec.Containers[0].Args, "--publish-service="+constants.IngressControllerNamespace+"/"+constants.IngressControllerPrefix+namespace)
|
||||
deployment.Spec.Template.Spec.Containers[0].Args = append(deployment.Spec.Template.Spec.Containers[0].Args, "--publish-service="+ingressControllerNamespace+"/"+ingressControllerPrefix+namespace)
|
||||
} else {
|
||||
deployment.Spec.Template.Spec.Containers[0].Args = append(deployment.Spec.Template.Spec.Containers[0].Args, "--report-node-internal-ip-address")
|
||||
}
|
||||
|
||||
if createDeployment {
|
||||
deployment, err = k8sClient.AppsV1().Deployments(constants.IngressControllerNamespace).Create(deployment)
|
||||
deployment, err = c.client.AppsV1().Deployments(ingressControllerNamespace).Create(deployment)
|
||||
} else {
|
||||
deployment, err = k8sClient.AppsV1().Deployments(constants.IngressControllerNamespace).Update(deployment)
|
||||
deployment, err = c.client.AppsV1().Deployments(ingressControllerNamespace).Update(deployment)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
@@ -401,13 +377,11 @@ func createOrUpdateRouterWorkload(namespace string, publishService bool, service
|
||||
return nil
|
||||
}
|
||||
|
||||
func deleteRouterWorkload(namespace string) error {
|
||||
k8sClient := client.ClientSets().K8s().Kubernetes()
|
||||
|
||||
func (c *routerOperator) deleteRouterWorkload(namespace string) error {
|
||||
deleteOptions := metav1.DeleteOptions{}
|
||||
// delete controller deployment
|
||||
deploymentName := constants.IngressControllerPrefix + namespace
|
||||
err := k8sClient.AppsV1().Deployments(constants.IngressControllerNamespace).Delete(deploymentName, &deleteOptions)
|
||||
deploymentName := ingressControllerPrefix + namespace
|
||||
err := c.client.AppsV1().Deployments(ingressControllerNamespace).Delete(deploymentName, &deleteOptions)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
}
|
||||
@@ -420,15 +394,14 @@ func deleteRouterWorkload(namespace string) error {
|
||||
"tier": "backend",
|
||||
"project": namespace,
|
||||
})
|
||||
replicaSetLister := informers.SharedInformerFactory().Apps().V1().ReplicaSets().Lister()
|
||||
replicaSets, err := replicaSetLister.ReplicaSets(constants.IngressControllerNamespace).List(selector)
|
||||
|
||||
replicaSetLister := c.informers.Apps().V1().ReplicaSets().Lister()
|
||||
replicaSets, err := replicaSetLister.ReplicaSets(ingressControllerNamespace).List(selector)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
}
|
||||
|
||||
for i := range replicaSets {
|
||||
err = k8sClient.AppsV1().ReplicaSets(constants.IngressControllerNamespace).Delete(replicaSets[i].Name, &deleteOptions)
|
||||
err = c.client.AppsV1().ReplicaSets(ingressControllerNamespace).Delete(replicaSets[i].Name, &deleteOptions)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
}
|
||||
@@ -438,10 +411,10 @@ func deleteRouterWorkload(namespace string) error {
|
||||
}
|
||||
|
||||
// Update Ingress Controller Service, change type from NodePort to loadbalancer or vice versa.
|
||||
func UpdateRouter(namespace string, routerType corev1.ServiceType, annotations map[string]string) (*corev1.Service, error) {
|
||||
func (c *routerOperator) UpdateRouter(namespace string, routerType corev1.ServiceType, annotations map[string]string) (*corev1.Service, error) {
|
||||
var router *corev1.Service
|
||||
|
||||
router, err := getRouterService(namespace)
|
||||
router, err := c.getRouterService(namespace)
|
||||
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
@@ -450,14 +423,13 @@ func UpdateRouter(namespace string, routerType corev1.ServiceType, annotations m
|
||||
|
||||
enableServicemesh := annotations[servicemeshEnabled] == "true"
|
||||
|
||||
err = createOrUpdateRouterWorkload(namespace, routerType == corev1.ServiceTypeLoadBalancer, enableServicemesh)
|
||||
err = c.createOrUpdateRouterWorkload(namespace, routerType == corev1.ServiceTypeLoadBalancer, enableServicemesh)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
return router, err
|
||||
}
|
||||
|
||||
newRouter, err := updateRouterService(namespace, routerType, annotations)
|
||||
|
||||
newRouter, err := c.updateRouterService(namespace, routerType, annotations)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
return newRouter, err
|
||||
|
||||
Reference in New Issue
Block a user