append service status of the gateway

Signed-off-by: Roland.Ma <rolandma@kubesphere.io>
This commit is contained in:
Roland.Ma
2021-09-15 07:41:15 +00:00
parent e44f4ec81d
commit 81c19701ef
2 changed files with 277 additions and 4 deletions

View File

@@ -25,8 +25,11 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog"
jsonpatch "github.com/evanphx/json-patch"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"kubesphere.io/api/gateway/v1alpha1"
@@ -40,6 +43,7 @@ import (
)
const (
MasterLabel = "node-role.kubernetes.io/master"
SidecarInject = "sidecar.istio.io/inject"
gatewayPrefix = "kubesphere-router-"
workingNamespace = "kubesphere-controls-system"
@@ -163,6 +167,69 @@ func (c *gatewayOperator) convert(namespace string, svc *corev1.Service, deploy
return &legacy
}
func (c *gatewayOperator) getMasterNodeIp() []string {
internalIps := []string{}
masters := &corev1.NodeList{}
err := c.cache.List(context.TODO(), masters, &client.ListOptions{LabelSelector: labels.SelectorFromSet(
labels.Set{
MasterLabel: "",
})})
if err != nil {
klog.Info(err)
return internalIps
}
for _, node := range masters.Items {
for _, address := range node.Status.Addresses {
if address.Type == corev1.NodeInternalIP {
internalIps = append(internalIps, address.Address)
}
}
}
return internalIps
}
func (c *gatewayOperator) updateStatus(gateway *v1alpha1.Gateway, svc *corev1.Service) (*v1alpha1.Gateway, error) {
// append selected node ip as loadbalancer ingress ip
if svc.Spec.Type != corev1.ServiceTypeLoadBalancer && len(svc.Status.LoadBalancer.Ingress) == 0 {
rips := c.getMasterNodeIp()
for _, rip := range rips {
gIngress := corev1.LoadBalancerIngress{
IP: rip,
}
svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, gIngress)
}
}
status := unstructured.Unstructured{
Object: map[string]interface{}{
"loadBalancer": svc.Status.LoadBalancer,
"service": svc.Spec.Ports,
},
}
target, err := status.MarshalJSON()
if err != nil {
return gateway, err
}
if gateway.Status.Raw != nil {
//merge with origin status
patch, err := jsonpatch.CreateMergePatch([]byte(`{}`), target)
if err != nil {
return gateway, err
}
modified, err := jsonpatch.MergePatch(gateway.Status.Raw, patch)
if err != nil {
return gateway, err
}
gateway.Status.Raw = modified
return gateway, err
}
gateway.Status.Raw = target
return gateway, nil
}
// GetGateways returns all Gateways from the project. There are at most 2 gatways exists in a project,
// a Glabal Gateway and a Project Gateway or a Legacy Project Gateway.
func (c *gatewayOperator) GetGateways(namespace string) ([]*v1alpha1.Gateway, error) {
@@ -188,6 +255,22 @@ func (c *gatewayOperator) GetGateways(namespace string) ([]*v1alpha1.Gateway, er
return nil, err
}
gateways = append(gateways, obj)
for _, g := range gateways {
s := &corev1.Service{}
// We supports the Service name always as same as gateway name.
// TODO: We need a mapping relation between the service and the gateway. Label Selector should be a good option.
err := c.client.Get(context.TODO(), client.ObjectKeyFromObject(g), s)
if err != nil {
klog.Info(err)
continue
}
_, err = c.updateStatus(g, s)
if err != nil {
klog.Info(err)
}
}
return gateways, err
}
@@ -312,12 +395,28 @@ func (c *gatewayOperator) ListGateways(query *query.Query) (*api.ListResult, err
func (c *gatewayOperator) transform(obj runtime.Object) runtime.Object {
if g, ok := obj.(*v1alpha1.Gateway); ok {
svc := &corev1.Service{}
// We supports the Service name always same as gateway name.
err := c.client.Get(context.TODO(), client.ObjectKeyFromObject(g), svc)
if err != nil {
klog.Info(err)
return g
}
g, err := c.updateStatus(g, svc)
if err != nil {
klog.Info(err)
}
return g
}
if s, ok := obj.(*corev1.Service); ok {
if svc, ok := obj.(*corev1.Service); ok {
d := &appsv1.Deployment{}
c.client.Get(context.TODO(), client.ObjectKeyFromObject(s), d)
return c.convert(s.Labels["project"], s, d)
c.client.Get(context.TODO(), client.ObjectKeyFromObject(svc), d)
g, err := c.updateStatus(c.convert(svc.Labels["project"], svc, d), svc)
if err != nil {
klog.Info(err)
}
return g
}
return nil
}

View File

@@ -42,6 +42,7 @@ func Test_gatewayOperator_GetGateways(t *testing.T) {
type fields struct {
client client.Client
cache cache.Cache
options *gateway.Options
}
type args struct {
@@ -50,6 +51,8 @@ func Test_gatewayOperator_GetGateways(t *testing.T) {
var Scheme = runtime.NewScheme()
v1alpha1.AddToScheme(Scheme)
corev1.AddToScheme(Scheme)
client := fake.NewFakeClientWithScheme(Scheme)
client.Create(context.TODO(), &v1alpha1.Gateway{
@@ -69,7 +72,6 @@ func Test_gatewayOperator_GetGateways(t *testing.T) {
client2 := fake.NewFakeClientWithScheme(Scheme)
create_GlobalGateway(client2)
corev1.AddToScheme(Scheme)
client3 := fake.NewFakeClientWithScheme(Scheme)
create_LegacyGateway(client3, "project6")
@@ -84,6 +86,7 @@ func Test_gatewayOperator_GetGateways(t *testing.T) {
name: "return empty gateway list from watching namespace",
fields: fields{
client: client,
cache: &fakeClient{Client: client},
options: &gateway.Options{
Namespace: "",
},
@@ -96,6 +99,7 @@ func Test_gatewayOperator_GetGateways(t *testing.T) {
name: "return empty gateway list from working namespace",
fields: fields{
client: client,
cache: &fakeClient{Client: client},
options: &gateway.Options{
Namespace: "kubesphere-controls-system",
},
@@ -108,6 +112,7 @@ func Test_gatewayOperator_GetGateways(t *testing.T) {
name: "get gateway from watching namespace",
fields: fields{
client: client,
cache: &fakeClient{Client: client},
options: &gateway.Options{
Namespace: "",
},
@@ -121,6 +126,7 @@ func Test_gatewayOperator_GetGateways(t *testing.T) {
name: "get gateway from working namespace",
fields: fields{
client: client,
cache: &fakeClient{Client: client},
options: &gateway.Options{
Namespace: "kubesphere-controls-system",
},
@@ -134,6 +140,7 @@ func Test_gatewayOperator_GetGateways(t *testing.T) {
name: "get global gateway",
fields: fields{
client: client2,
cache: &fakeClient{Client: client2},
options: &gateway.Options{
Namespace: "",
},
@@ -147,6 +154,7 @@ func Test_gatewayOperator_GetGateways(t *testing.T) {
name: "get Legacy gateway",
fields: fields{
client: client3,
cache: &fakeClient{Client: client3},
options: &gateway.Options{
Namespace: "kubesphere-controls-system",
},
@@ -182,6 +190,7 @@ func Test_gatewayOperator_GetGateways(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
c := &gatewayOperator{
client: tt.fields.client,
cache: tt.fields.cache,
options: tt.fields.options,
}
got, err := c.GetGateways(tt.args.namespace)
@@ -239,6 +248,11 @@ func create_LegacyGateway(c client.Client, namespace string) {
},
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "http", Protocol: corev1.ProtocolTCP, Port: 80,
},
},
Type: corev1.ServiceTypeNodePort,
},
}
@@ -288,6 +302,7 @@ func Test_gatewayOperator_CreateGateway(t *testing.T) {
type fields struct {
client client.Client
options *gateway.Options
cache cache.Cache
}
type args struct {
namespace string
@@ -296,6 +311,9 @@ func Test_gatewayOperator_CreateGateway(t *testing.T) {
var Scheme = runtime.NewScheme()
v1alpha1.AddToScheme(Scheme)
corev1.AddToScheme(Scheme)
appsv1.AddToScheme(Scheme)
client := fake.NewFakeClientWithScheme(Scheme)
tests := []struct {
@@ -309,6 +327,7 @@ func Test_gatewayOperator_CreateGateway(t *testing.T) {
name: "creates gateway in watching namespace",
fields: fields{
client: client,
cache: &fakeClient{Client: client},
options: &gateway.Options{
Namespace: "",
},
@@ -339,6 +358,7 @@ func Test_gatewayOperator_CreateGateway(t *testing.T) {
name: "creates gateway in working namespace",
fields: fields{
client: client,
cache: &fakeClient{Client: client},
options: &gateway.Options{
Namespace: "kubesphere-controls-system",
},
@@ -370,6 +390,7 @@ func Test_gatewayOperator_CreateGateway(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
c := &gatewayOperator{
client: tt.fields.client,
cache: tt.fields.cache,
options: tt.fields.options,
}
got, err := c.CreateGateway(tt.args.namespace, tt.args.obj)
@@ -686,6 +707,9 @@ func Test_gatewayOperator_ListGateways(t *testing.T) {
},
},
},
Status: runtime.RawExtension{
Raw: []byte("{\"loadBalancer\":{},\"service\":[{\"name\":\"http\",\"protocol\":\"TCP\",\"port\":80,\"targetPort\":0}]}\n"),
},
},
{
ObjectMeta: v1.ObjectMeta{
@@ -790,3 +814,153 @@ func (f *fakeClient) WaitForCacheSync(ctx context.Context) bool {
func (f *fakeClient) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
return nil
}
func Test_gatewayOperator_status(t *testing.T) {
type fields struct {
client client.Client
cache cache.Cache
options *gateway.Options
}
var Scheme = runtime.NewScheme()
v1alpha1.AddToScheme(Scheme)
corev1.AddToScheme(Scheme)
appsv1.AddToScheme(Scheme)
client := fake.NewFakeClientWithScheme(Scheme)
client2 := fake.NewFakeClientWithScheme(Scheme)
fake := &corev1.Node{
ObjectMeta: v1.ObjectMeta{
Name: "fake-node",
Labels: map[string]string{
MasterLabel: "",
},
},
Status: corev1.NodeStatus{
Addresses: []corev1.NodeAddress{
{
Type: corev1.NodeInternalIP,
Address: "192.168.1.1",
},
},
},
}
client2.Create(context.TODO(), fake)
type args struct {
gateway *v1alpha1.Gateway
svc *corev1.Service
}
tests := []struct {
name string
fields fields
args args
want *v1alpha1.Gateway
wantErr bool
}{
{
name: "default",
fields: fields{
client: client,
cache: &fakeClient{Client: client},
options: &gateway.Options{
Namespace: "kubesphere-controls-system",
},
},
args: args{
gateway: &v1alpha1.Gateway{},
svc: &corev1.Service{
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "http", Protocol: corev1.ProtocolTCP, Port: 80,
},
},
},
},
},
want: &v1alpha1.Gateway{
Status: runtime.RawExtension{
Raw: []byte("{\"loadBalancer\":{},\"service\":[{\"name\":\"http\",\"protocol\":\"TCP\",\"port\":80,\"targetPort\":0}]}\n"),
},
},
},
{
name: "default",
fields: fields{
client: client,
cache: &fakeClient{Client: client},
options: &gateway.Options{
Namespace: "kubesphere-controls-system",
},
},
args: args{
gateway: &v1alpha1.Gateway{
Status: runtime.RawExtension{
Raw: []byte("{\"fake\":{}}"),
},
},
svc: &corev1.Service{
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "http", Protocol: corev1.ProtocolTCP, Port: 80,
},
},
},
},
},
want: &v1alpha1.Gateway{
Status: runtime.RawExtension{
Raw: []byte("{\"fake\":{},\"loadBalancer\":{},\"service\":[{\"name\":\"http\",\"port\":80,\"protocol\":\"TCP\",\"targetPort\":0}]}"),
},
},
},
{
name: "Master Node IP",
fields: fields{
client: client2,
cache: &fakeClient{Client: client2},
options: &gateway.Options{
Namespace: "kubesphere-controls-system",
},
},
args: args{
gateway: &v1alpha1.Gateway{},
svc: &corev1.Service{
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "http", Protocol: corev1.ProtocolTCP, Port: 80,
},
},
},
},
},
want: &v1alpha1.Gateway{
Status: runtime.RawExtension{
Raw: []byte("{\"loadBalancer\":{\"ingress\":[{\"ip\":\"192.168.1.1\"}]},\"service\":[{\"name\":\"http\",\"protocol\":\"TCP\",\"port\":80,\"targetPort\":0}]}\n"),
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &gatewayOperator{
client: tt.fields.client,
cache: tt.fields.cache,
options: tt.fields.options,
}
got, err := c.updateStatus(tt.args.gateway, tt.args.svc)
if (err != nil) != tt.wantErr {
t.Errorf("gatewayOperator.status() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("gatewayOperator.status() has wrong object\nDiff:\n %s", diff.ObjectGoPrintSideBySide(tt.want, got))
}
})
}
}