From 28f6784aff596babc31215ad0ed3fa84d68a1764 Mon Sep 17 00:00:00 2001 From: "Roland.Ma" Date: Sun, 26 Sep 2021 10:33:34 +0000 Subject: [PATCH] Search gateway logs with ES Signed-off-by: Roland.Ma --- pkg/apiserver/apiserver.go | 2 +- pkg/kapis/gateway/v1alpha1/handler.go | 70 +++++++++++++++++++++++++- pkg/kapis/gateway/v1alpha1/register.go | 14 +++++- 3 files changed, 81 insertions(+), 5 deletions(-) diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index e5e591e37..50599e2d8 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -256,7 +256,7 @@ func (s *APIServer) installKubeSphereAPIs() { urlruntime.Must(notificationkapisv2beta1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.Kubernetes(), s.KubernetesClient.KubeSphere())) urlruntime.Must(notificationkapisv2beta2.AddToContainer(s.container, s.Config.NotificationOptions)) - urlruntime.Must(gatewayv1alpha1.AddToContainer(s.container, s.Config.GatewayOptions, s.RuntimeCache, s.RuntimeClient, s.InformerFactory, s.KubernetesClient.Kubernetes())) + urlruntime.Must(gatewayv1alpha1.AddToContainer(s.container, s.Config.GatewayOptions, s.RuntimeCache, s.RuntimeClient, s.InformerFactory, s.KubernetesClient.Kubernetes(), s.LoggingClient)) } func (s *APIServer) Run(ctx context.Context) (err error) { diff --git a/pkg/kapis/gateway/v1alpha1/handler.go b/pkg/kapis/gateway/v1alpha1/handler.go index 6e2c0974c..323b0228f 100644 --- a/pkg/kapis/gateway/v1alpha1/handler.go +++ b/pkg/kapis/gateway/v1alpha1/handler.go @@ -19,6 +19,7 @@ package v1alpha1 import ( "context" "fmt" + "time" "github.com/emicklei/go-restful" corev1 "k8s.io/api/core/v1" @@ -31,30 +32,39 @@ import ( "kubesphere.io/api/gateway/v1alpha1" "kubesphere.io/kubesphere/pkg/api" + loggingv1alpha2 "kubesphere.io/kubesphere/pkg/api/logging/v1alpha2" "kubesphere.io/kubesphere/pkg/apiserver/query" "kubesphere.io/kubesphere/pkg/informers" operator "kubesphere.io/kubesphere/pkg/models/gateway" - + "kubesphere.io/kubesphere/pkg/models/logging" servererr "kubesphere.io/kubesphere/pkg/server/errors" "kubesphere.io/kubesphere/pkg/simple/client/gateway" + loggingclient "kubesphere.io/kubesphere/pkg/simple/client/logging" conversionsv1 "kubesphere.io/kubesphere/pkg/utils/conversions/core/v1" + "kubesphere.io/kubesphere/pkg/utils/stringutils" ) type handler struct { options *gateway.Options gw operator.GatewayOperator factory informers.InformerFactory + lo logging.LoggingOperator } //newHandler create an instance of the handler -func newHandler(options *gateway.Options, cache cache.Cache, client client.Client, factory informers.InformerFactory, k8sClient kubernetes.Interface) *handler { +func newHandler(options *gateway.Options, cache cache.Cache, client client.Client, factory informers.InformerFactory, k8sClient kubernetes.Interface, loggingClient loggingclient.Client) *handler { conversionsv1.RegisterConversions(scheme.Scheme) // Do not register Gateway scheme globally. Which will cause conflict in ks-controller-manager. v1alpha1.AddToScheme(client.Scheme()) + var lo logging.LoggingOperator + if loggingClient != nil { + lo = logging.NewLoggingOperator(loggingClient) + } return &handler{ options: options, factory: factory, gw: operator.NewGatewayOperator(client, cache, options, factory, k8sClient), + lo: lo, } } @@ -173,3 +183,59 @@ func (h *handler) PodLog(request *restful.Request, response *restful.Response) { return } } + +func (h *handler) PodLogSearch(request *restful.Request, response *restful.Response) { + if h.lo == nil { + api.HandleError(response, request, fmt.Errorf("logging isn't enabled")) + return + } + + ns := request.PathParameter("namespace") + logQuery, err := loggingv1alpha2.ParseQueryParameter(request) + if err != nil { + api.HandleError(response, request, err) + return + } + // ES log will be filted by pods and namespace by default. + pods, err := h.gw.GetPods(ns, &query.Query{}) + if err != nil { + api.HandleError(response, request, err) + return + } + + var podfilter []string + namespaceCreateTimeMap := make(map[string]*time.Time) + var ar loggingv1alpha2.APIResponse + + for _, p := range pods.Items { + pod, ok := p.(*corev1.Pod) + if ok { + podfilter = append(podfilter, pod.Name) + namespaceCreateTimeMap[pod.Namespace] = nil + } + } + + sf := loggingclient.SearchFilter{ + NamespaceFilter: namespaceCreateTimeMap, + PodFilter: podfilter, + PodSearch: stringutils.Split(logQuery.PodSearch, ","), + ContainerSearch: stringutils.Split(logQuery.ContainerSearch, ","), + ContainerFilter: stringutils.Split(logQuery.ContainerFilter, ","), + LogSearch: stringutils.Split(logQuery.LogSearch, ","), + Starttime: logQuery.StartTime, + Endtime: logQuery.EndTime, + } + + noHit := len(namespaceCreateTimeMap) == 0 || len(podfilter) == 0 + if noHit { + ar.Logs = &loggingclient.Logs{} + } else { + ar, err = h.lo.SearchLogs(sf, logQuery.From, logQuery.Size, logQuery.Sort) + if err != nil { + api.HandleError(response, request, err) + return + } + } + + response.WriteEntity(ar) +} diff --git a/pkg/kapis/gateway/v1alpha1/register.go b/pkg/kapis/gateway/v1alpha1/register.go index bbcf715af..46900d9aa 100644 --- a/pkg/kapis/gateway/v1alpha1/register.go +++ b/pkg/kapis/gateway/v1alpha1/register.go @@ -28,19 +28,21 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "kubesphere.io/kubesphere/pkg/api" + loggingv1alpha2 "kubesphere.io/kubesphere/pkg/api/logging/v1alpha2" "kubesphere.io/kubesphere/pkg/apiserver/runtime" "kubesphere.io/kubesphere/pkg/constants" "kubesphere.io/kubesphere/pkg/informers" "kubesphere.io/kubesphere/pkg/server/errors" "kubesphere.io/kubesphere/pkg/simple/client/gateway" + loggingclient "kubesphere.io/kubesphere/pkg/simple/client/logging" ) var GroupVersion = schema.GroupVersion{Group: "gateway.kubesphere.io", Version: "v1alpha1"} -func AddToContainer(container *restful.Container, options *gateway.Options, cache cache.Cache, client client.Client, factory informers.InformerFactory, k8sClient kubernetes.Interface) error { +func AddToContainer(container *restful.Container, options *gateway.Options, cache cache.Cache, client client.Client, factory informers.InformerFactory, k8sClient kubernetes.Interface, loggingClient loggingclient.Client) error { ws := runtime.NewWebService(GroupVersion) - handler := newHandler(options, cache, client, factory, k8sClient) + handler := newHandler(options, cache, client, factory, k8sClient, loggingClient) // register gateway apis ws.Route(ws.POST("/namespaces/{namespace}/gateways"). @@ -102,6 +104,14 @@ func AddToContainer(container *restful.Container, options *gateway.Options, cach Returns(http.StatusOK, api.StatusOK, v1alpha1.Gateway{}). Metadata(restfulspec.KeyOpenAPITags, []string{constants.GatewayTag})) + ws.Route(ws.GET("/namespaces/{namespace}/gateways/{gateway}/logs"). + To(handler.PodLogSearch). + Doc("Retrieve log of the gateway's pod from ES"). + Param(ws.PathParameter("namespace", "the watching namespace of the gateway")). + Param(ws.PathParameter("gateway", "the name of the gateway")). + Returns(http.StatusOK, api.StatusOK, loggingv1alpha2.APIResponse{}). + Metadata(restfulspec.KeyOpenAPITags, []string{constants.GatewayTag})) + container.Add(ws) return nil }