Search gateway logs with ES
Signed-off-by: Roland.Ma <rolandma@kubesphere.io>
This commit is contained in:
@@ -256,7 +256,7 @@ func (s *APIServer) installKubeSphereAPIs() {
|
||||
urlruntime.Must(notificationkapisv2beta1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.Kubernetes(),
|
||||
s.KubernetesClient.KubeSphere()))
|
||||
urlruntime.Must(notificationkapisv2beta2.AddToContainer(s.container, s.Config.NotificationOptions))
|
||||
urlruntime.Must(gatewayv1alpha1.AddToContainer(s.container, s.Config.GatewayOptions, s.RuntimeCache, s.RuntimeClient, s.InformerFactory, s.KubernetesClient.Kubernetes()))
|
||||
urlruntime.Must(gatewayv1alpha1.AddToContainer(s.container, s.Config.GatewayOptions, s.RuntimeCache, s.RuntimeClient, s.InformerFactory, s.KubernetesClient.Kubernetes(), s.LoggingClient))
|
||||
}
|
||||
|
||||
func (s *APIServer) Run(ctx context.Context) (err error) {
|
||||
|
||||
@@ -19,6 +19,7 @@ package v1alpha1
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/emicklei/go-restful"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
@@ -31,30 +32,39 @@ import (
|
||||
"kubesphere.io/api/gateway/v1alpha1"
|
||||
|
||||
"kubesphere.io/kubesphere/pkg/api"
|
||||
loggingv1alpha2 "kubesphere.io/kubesphere/pkg/api/logging/v1alpha2"
|
||||
"kubesphere.io/kubesphere/pkg/apiserver/query"
|
||||
"kubesphere.io/kubesphere/pkg/informers"
|
||||
operator "kubesphere.io/kubesphere/pkg/models/gateway"
|
||||
|
||||
"kubesphere.io/kubesphere/pkg/models/logging"
|
||||
servererr "kubesphere.io/kubesphere/pkg/server/errors"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/gateway"
|
||||
loggingclient "kubesphere.io/kubesphere/pkg/simple/client/logging"
|
||||
conversionsv1 "kubesphere.io/kubesphere/pkg/utils/conversions/core/v1"
|
||||
"kubesphere.io/kubesphere/pkg/utils/stringutils"
|
||||
)
|
||||
|
||||
type handler struct {
|
||||
options *gateway.Options
|
||||
gw operator.GatewayOperator
|
||||
factory informers.InformerFactory
|
||||
lo logging.LoggingOperator
|
||||
}
|
||||
|
||||
//newHandler create an instance of the handler
|
||||
func newHandler(options *gateway.Options, cache cache.Cache, client client.Client, factory informers.InformerFactory, k8sClient kubernetes.Interface) *handler {
|
||||
func newHandler(options *gateway.Options, cache cache.Cache, client client.Client, factory informers.InformerFactory, k8sClient kubernetes.Interface, loggingClient loggingclient.Client) *handler {
|
||||
conversionsv1.RegisterConversions(scheme.Scheme)
|
||||
// Do not register Gateway scheme globally. Which will cause conflict in ks-controller-manager.
|
||||
v1alpha1.AddToScheme(client.Scheme())
|
||||
var lo logging.LoggingOperator
|
||||
if loggingClient != nil {
|
||||
lo = logging.NewLoggingOperator(loggingClient)
|
||||
}
|
||||
return &handler{
|
||||
options: options,
|
||||
factory: factory,
|
||||
gw: operator.NewGatewayOperator(client, cache, options, factory, k8sClient),
|
||||
lo: lo,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,3 +183,59 @@ func (h *handler) PodLog(request *restful.Request, response *restful.Response) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (h *handler) PodLogSearch(request *restful.Request, response *restful.Response) {
|
||||
if h.lo == nil {
|
||||
api.HandleError(response, request, fmt.Errorf("logging isn't enabled"))
|
||||
return
|
||||
}
|
||||
|
||||
ns := request.PathParameter("namespace")
|
||||
logQuery, err := loggingv1alpha2.ParseQueryParameter(request)
|
||||
if err != nil {
|
||||
api.HandleError(response, request, err)
|
||||
return
|
||||
}
|
||||
// ES log will be filted by pods and namespace by default.
|
||||
pods, err := h.gw.GetPods(ns, &query.Query{})
|
||||
if err != nil {
|
||||
api.HandleError(response, request, err)
|
||||
return
|
||||
}
|
||||
|
||||
var podfilter []string
|
||||
namespaceCreateTimeMap := make(map[string]*time.Time)
|
||||
var ar loggingv1alpha2.APIResponse
|
||||
|
||||
for _, p := range pods.Items {
|
||||
pod, ok := p.(*corev1.Pod)
|
||||
if ok {
|
||||
podfilter = append(podfilter, pod.Name)
|
||||
namespaceCreateTimeMap[pod.Namespace] = nil
|
||||
}
|
||||
}
|
||||
|
||||
sf := loggingclient.SearchFilter{
|
||||
NamespaceFilter: namespaceCreateTimeMap,
|
||||
PodFilter: podfilter,
|
||||
PodSearch: stringutils.Split(logQuery.PodSearch, ","),
|
||||
ContainerSearch: stringutils.Split(logQuery.ContainerSearch, ","),
|
||||
ContainerFilter: stringutils.Split(logQuery.ContainerFilter, ","),
|
||||
LogSearch: stringutils.Split(logQuery.LogSearch, ","),
|
||||
Starttime: logQuery.StartTime,
|
||||
Endtime: logQuery.EndTime,
|
||||
}
|
||||
|
||||
noHit := len(namespaceCreateTimeMap) == 0 || len(podfilter) == 0
|
||||
if noHit {
|
||||
ar.Logs = &loggingclient.Logs{}
|
||||
} else {
|
||||
ar, err = h.lo.SearchLogs(sf, logQuery.From, logQuery.Size, logQuery.Sort)
|
||||
if err != nil {
|
||||
api.HandleError(response, request, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
response.WriteEntity(ar)
|
||||
}
|
||||
|
||||
@@ -28,19 +28,21 @@ import (
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"kubesphere.io/kubesphere/pkg/api"
|
||||
loggingv1alpha2 "kubesphere.io/kubesphere/pkg/api/logging/v1alpha2"
|
||||
"kubesphere.io/kubesphere/pkg/apiserver/runtime"
|
||||
"kubesphere.io/kubesphere/pkg/constants"
|
||||
"kubesphere.io/kubesphere/pkg/informers"
|
||||
"kubesphere.io/kubesphere/pkg/server/errors"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/gateway"
|
||||
loggingclient "kubesphere.io/kubesphere/pkg/simple/client/logging"
|
||||
)
|
||||
|
||||
var GroupVersion = schema.GroupVersion{Group: "gateway.kubesphere.io", Version: "v1alpha1"}
|
||||
|
||||
func AddToContainer(container *restful.Container, options *gateway.Options, cache cache.Cache, client client.Client, factory informers.InformerFactory, k8sClient kubernetes.Interface) error {
|
||||
func AddToContainer(container *restful.Container, options *gateway.Options, cache cache.Cache, client client.Client, factory informers.InformerFactory, k8sClient kubernetes.Interface, loggingClient loggingclient.Client) error {
|
||||
ws := runtime.NewWebService(GroupVersion)
|
||||
|
||||
handler := newHandler(options, cache, client, factory, k8sClient)
|
||||
handler := newHandler(options, cache, client, factory, k8sClient, loggingClient)
|
||||
|
||||
// register gateway apis
|
||||
ws.Route(ws.POST("/namespaces/{namespace}/gateways").
|
||||
@@ -102,6 +104,14 @@ func AddToContainer(container *restful.Container, options *gateway.Options, cach
|
||||
Returns(http.StatusOK, api.StatusOK, v1alpha1.Gateway{}).
|
||||
Metadata(restfulspec.KeyOpenAPITags, []string{constants.GatewayTag}))
|
||||
|
||||
ws.Route(ws.GET("/namespaces/{namespace}/gateways/{gateway}/logs").
|
||||
To(handler.PodLogSearch).
|
||||
Doc("Retrieve log of the gateway's pod from ES").
|
||||
Param(ws.PathParameter("namespace", "the watching namespace of the gateway")).
|
||||
Param(ws.PathParameter("gateway", "the name of the gateway")).
|
||||
Returns(http.StatusOK, api.StatusOK, loggingv1alpha2.APIResponse{}).
|
||||
Metadata(restfulspec.KeyOpenAPITags, []string{constants.GatewayTag}))
|
||||
|
||||
container.Add(ws)
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user