retrive pods and logs by gateway

Signed-off-by: Roland.Ma <rolandma@kubesphere.io>
This commit is contained in:
Roland.Ma
2021-09-15 07:55:58 +00:00
parent 81c19701ef
commit e3a14ca299
5 changed files with 247 additions and 13 deletions

View File

@@ -19,12 +19,14 @@ package gateway
import (
"context"
"fmt"
"io"
"strings"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/klog"
jsonpatch "github.com/evanphx/json-patch"
@@ -38,7 +40,9 @@ import (
"kubesphere.io/kubesphere/pkg/api"
"kubesphere.io/kubesphere/pkg/apiserver/query"
"kubesphere.io/kubesphere/pkg/informers"
"kubesphere.io/kubesphere/pkg/models/resources/v1alpha3"
"kubesphere.io/kubesphere/pkg/models/resources/v1alpha3/pod"
"kubesphere.io/kubesphere/pkg/simple/client/gateway"
)
@@ -58,19 +62,25 @@ type GatewayOperator interface {
UpdateGateway(namespace string, obj *v1alpha1.Gateway) (*v1alpha1.Gateway, error)
UpgradeGateway(namespace string) (*v1alpha1.Gateway, error)
ListGateways(query *query.Query) (*api.ListResult, error)
GetPods(namesapce string, query *query.Query) (*api.ListResult, error)
GetPodLogs(ctx context.Context, namespace string, podName string, logOptions *corev1.PodLogOptions, responseWriter io.Writer) error
}
type gatewayOperator struct {
client client.Client
cache cache.Cache
options *gateway.Options
k8sclient kubernetes.Interface
factory informers.InformerFactory
client client.Client
cache cache.Cache
options *gateway.Options
}
func NewGatewayOperator(client client.Client, cache cache.Cache, options *gateway.Options) GatewayOperator {
func NewGatewayOperator(client client.Client, cache cache.Cache, options *gateway.Options, factory informers.InformerFactory, k8sclient kubernetes.Interface) GatewayOperator {
return &gatewayOperator{
client: client,
cache: cache,
options: options,
client: client,
cache: cache,
options: options,
k8sclient: k8sclient,
factory: factory,
}
}
@@ -449,3 +459,50 @@ func (c *gatewayOperator) filter(object runtime.Object, filter query.Filter) boo
return v1alpha3.DefaultObjectMetaFilter(gateway.ObjectMeta, filter)
}
}
func (c *gatewayOperator) GetPods(namesapce string, query *query.Query) (*api.ListResult, error) {
podGetter := pod.New(c.factory.KubernetesSharedInformerFactory())
//TODO: move the selector string to options
selector, err := labels.Parse(fmt.Sprintf("app.kubernetes.io/name=ingress-nginx,app.kubernetes.io/instance=kubesphere-router-%s-ingress", namesapce))
if err != nil {
return nil, fmt.Errorf("invild selector config")
}
query.LabelSelector = selector.String()
return podGetter.List(c.getWorkingNamespace(namesapce), query)
}
func (c *gatewayOperator) GetPodLogs(ctx context.Context, namespace string, podName string, logOptions *corev1.PodLogOptions, responseWriter io.Writer) error {
workingNamespace := c.getWorkingNamespace(namespace)
pods, err := c.GetPods(namespace, query.New())
if err != nil {
return err
}
if !c.hasPod(pods.Items, types.NamespacedName{Namespace: workingNamespace, Name: podName}) {
return fmt.Errorf("pod does not exist")
}
podLogRequest := c.k8sclient.CoreV1().
Pods(workingNamespace).
GetLogs(podName, logOptions)
reader, err := podLogRequest.Stream(context.TODO())
if err != nil {
return err
}
_, err = io.Copy(responseWriter, reader)
if err != nil {
return err
}
return nil
}
func (c *gatewayOperator) hasPod(slice []interface{}, key types.NamespacedName) bool {
for _, s := range slice {
pod, ok := s.(*corev1.Pod)
if ok && client.ObjectKeyFromObject(pod) == key {
return true
}
}
return false
}