diff --git a/pkg/api/logging/v1alpha2/types.go b/pkg/api/logging/v1alpha2/types.go index 7a16a0f6b..59e5cdfd1 100644 --- a/pkg/api/logging/v1alpha2/types.go +++ b/pkg/api/logging/v1alpha2/types.go @@ -2,6 +2,14 @@ package v1alpha2 import ( "encoding/json" + "time" +) + +const ( + OperationQuery int = iota + OperationStatistics + OperationHistogram + OperationExport ) // elasticsearch client config @@ -28,13 +36,14 @@ type QueryParameters struct { ContainerQuery []string LogQuery []string - Operation string - Interval string - StartTime string - EndTime string - Sort string - From int64 - Size int64 + Operation int + Interval string + StartTime string + EndTime string + Sort string + From int64 + Size int64 + ScrollTimeout time.Duration } // elasticsearch request body @@ -148,8 +157,7 @@ type DateHistogram struct { // Fore more info, refer to https://www.elastic.co/guide/en/elasticsearch/reference/current/getting-started-search-API.html // Response body from the elasticsearch engine type Response struct { - Status int `json:"status"` - Workspace string `json:"workspace,omitempty"` + ScrollId string `json:"_scroll_id"` Shards Shards `json:"_shards"` Hits Hits `json:"hits"` Aggregations json.RawMessage `json:"aggregations"` @@ -195,7 +203,7 @@ type HighLight struct { } type LogRecord struct { - Time int64 `json:"time,omitempty" description:"log timestamp"` + Time string `json:"time,omitempty" description:"log timestamp"` Log string `json:"log,omitempty" description:"log message"` Namespace string `json:"namespace,omitempty" description:"namespace"` Pod string `json:"pod,omitempty" description:"pod name"` @@ -205,10 +213,9 @@ type LogRecord struct { } type ReadResult struct { - Total int64 `json:"total" description:"total number of matched results"` - From int64 `json:"from" description:"the offset from the result set"` - Size int64 `json:"size" description:"the amount of hits to be returned"` - Records []LogRecord `json:"records,omitempty" description:"actual array of results"` + ScrollID string `json:"_scroll_id,omitempty"` + Total int64 `json:"total" description:"total number of matched results"` + Records []LogRecord `json:"records,omitempty" description:"actual array of results"` } // StatisticsResponseAggregations, the struct for `aggregations` of type Reponse, holds return results from the aggregation StatisticsAggs @@ -245,16 +252,11 @@ type StatisticsResult struct { type HistogramResult struct { Total int64 `json:"total" description:"total number of logs"` - StartTime int64 `json:"start_time" description:"start time"` - EndTime int64 `json:"end_time" description:"end time"` - Interval string `json:"interval" description:"interval"` Histograms []HistogramRecord `json:"histograms" description:"actual array of histogram results"` } // Wrap elasticsearch response type QueryResult struct { - Status int `json:"status,omitempty" description:"query status"` - Error string `json:"error,omitempty" description:"debugging information"` Read *ReadResult `json:"query,omitempty" description:"query results"` Statistics *StatisticsResult `json:"statistics,omitempty" description:"statistics results"` Histogram *HistogramResult `json:"histogram,omitempty" description:"histogram results"` diff --git a/pkg/apis/logging/v1alpha2/register.go b/pkg/apis/logging/v1alpha2/register.go index f8e9b2d8d..8e89b4632 100644 --- a/pkg/apis/logging/v1alpha2/register.go +++ b/pkg/apis/logging/v1alpha2/register.go @@ -47,7 +47,7 @@ func addWebService(c *restful.Container) error { ws.Route(ws.GET("/cluster").To(logging.LoggingQueryCluster). Doc("Query logs against the cluster."). - Param(ws.QueryParameter("operation", "Query type. This can be one of three types: query (for querying logs), statistics (for retrieving statistical data), and histogram (for displaying log count by time interval). Defaults to query.").DefaultValue("query").DataType("string").Required(false)). + Param(ws.QueryParameter("operation", "Operation type. This can be one of four types: query (for querying logs), statistics (for retrieving statistical data), histogram (for displaying log count by time interval) and export (for exporting logs). Defaults to query.").DefaultValue("query").DataType("string").Required(false)). Param(ws.QueryParameter("workspaces", "A comma-separated list of workspaces. This field restricts the query to specified workspaces. For example, the following filter matches the workspace my-ws and demo-ws: `my-ws,demo-ws`").DataType("string").Required(false)). Param(ws.QueryParameter("workspace_query", "A comma-separated list of keywords. Differing from **workspaces**, this field performs fuzzy matching on workspaces. For example, the following value limits the query to workspaces whose name contains the word my(My,MY,...) *OR* demo(Demo,DemO,...): `my,demo`.").DataType("string").Required(false)). Param(ws.QueryParameter("namespaces", "A comma-separated list of namespaces. This field restricts the query to specified namespaces. For example, the following filter matches the namespace my-ns and demo-ns: `my-ns,demo-ns`").DataType("string").Required(false)). @@ -69,7 +69,7 @@ func addWebService(c *restful.Container) error { Writes(v1alpha2.QueryResult{}). Returns(http.StatusOK, RespOK, v1alpha2.QueryResult{})). Consumes(restful.MIME_JSON, restful.MIME_XML). - Produces(restful.MIME_JSON) + Produces(restful.MIME_JSON, restful.MIME_OCTET) ws.Route(ws.GET("/workspaces/{workspace}").To(logging.LoggingQueryWorkspace). Doc("Query logs against the specific workspace."). @@ -166,7 +166,7 @@ func addWebService(c *restful.Container) error { Param(ws.PathParameter("namespace", "The name of the namespace.").DataType("string").Required(true)). Param(ws.PathParameter("pod", "Pod name.").DataType("string").Required(true)). Param(ws.PathParameter("container", "Container name.").DataType("string").Required(true)). - Param(ws.QueryParameter("operation", "Query type. This can be one of three types: query (for querying logs), statistics (for retrieving statistical data), and histogram (for displaying log count by time interval). Defaults to query.").DefaultValue("query").DataType("string").Required(false)). + Param(ws.QueryParameter("operation", "Operation type. This can be one of four types: query (for querying logs), statistics (for retrieving statistical data), histogram (for displaying log count by time interval) and export (for exporting logs). Defaults to query.").DefaultValue("query").DataType("string").Required(false)). Param(ws.QueryParameter("log_query", "A comma-separated list of keywords. The query returns logs which contain at least one keyword. Case-insensitive matching. For example, if the field is set to `err,INFO`, the query returns any log containing err(ERR,Err,...) *OR* INFO(info,InFo,...).").DataType("string").Required(false)). Param(ws.QueryParameter("interval", "Time interval. It requires **operation** is set to histogram. The format is [0-9]+[smhdwMqy]. Defaults to 15m (i.e. 15 min).").DefaultValue("15m").DataType("string").Required(false)). Param(ws.QueryParameter("start_time", "Start time of query. Default to 0. The format is a string representing milliseconds since the epoch, eg. 1559664000000.").DataType("string").Required(false)). @@ -178,7 +178,7 @@ func addWebService(c *restful.Container) error { Writes(v1alpha2.QueryResult{}). Returns(http.StatusOK, RespOK, v1alpha2.QueryResult{})). Consumes(restful.MIME_JSON, restful.MIME_XML). - Produces(restful.MIME_JSON) + Produces(restful.MIME_JSON, restful.MIME_OCTET) ws.Route(ws.GET("/fluentbit/outputs").To(logging.LoggingQueryFluentbitOutputs). Doc("List all Fluent bit output plugins."). diff --git a/pkg/apis/tenant/v1alpha2/register.go b/pkg/apis/tenant/v1alpha2/register.go index 6454c8664..df75d8b8d 100644 --- a/pkg/apis/tenant/v1alpha2/register.go +++ b/pkg/apis/tenant/v1alpha2/register.go @@ -157,7 +157,7 @@ func addWebService(c *restful.Container) error { ws.Route(ws.GET("/logs"). To(tenant.LogQuery). Doc("Query cluster-level logs in a multi-tenants environment"). - Param(ws.QueryParameter("operation", "Query type. This can be one of three types: query (for querying logs), statistics (for retrieving statistical data), and histogram (for displaying log count by time interval). Defaults to query.").DefaultValue("query").DataType("string").Required(false)). + Param(ws.QueryParameter("operation", "Operation type. This can be one of four types: query (for querying logs), statistics (for retrieving statistical data), histogram (for displaying log count by time interval) and export (for exporting logs). Defaults to query.").DefaultValue("query").DataType("string").Required(false)). Param(ws.QueryParameter("workspaces", "A comma-separated list of workspaces. This field restricts the query to specified workspaces. For example, the following filter matches the workspace my-ws and demo-ws: `my-ws,demo-ws`").DataType("string").Required(false)). Param(ws.QueryParameter("workspace_query", "A comma-separated list of keywords. Differing from **workspaces**, this field performs fuzzy matching on workspaces. For example, the following value limits the query to workspaces whose name contains the word my(My,MY,...) *OR* demo(Demo,DemO,...): `my,demo`.").DataType("string").Required(false)). Param(ws.QueryParameter("namespaces", "A comma-separated list of namespaces. This field restricts the query to specified namespaces. For example, the following filter matches the namespace my-ns and demo-ns: `my-ns,demo-ns`").DataType("string").Required(false)). @@ -179,7 +179,7 @@ func addWebService(c *restful.Container) error { Writes(v1alpha2.Response{}). Returns(http.StatusOK, RespOK, v1alpha2.Response{})). Consumes(restful.MIME_JSON, restful.MIME_XML). - Produces(restful.MIME_JSON) + Produces(restful.MIME_JSON, restful.MIME_OCTET) c.Add(ws) return nil diff --git a/pkg/apiserver/logging/logging.go b/pkg/apiserver/logging/logging.go index 50348be7d..f824adbea 100644 --- a/pkg/apiserver/logging/logging.go +++ b/pkg/apiserver/logging/logging.go @@ -19,7 +19,10 @@ package logging import ( + "bytes" + "fmt" "github.com/emicklei/go-restful" + "io" "k8s.io/klog" "kubesphere.io/kubesphere/pkg/api/logging/v1alpha2" "kubesphere.io/kubesphere/pkg/models/log" @@ -30,94 +33,45 @@ import ( "net/http" "strconv" "strings" + "time" ) func LoggingQueryCluster(request *restful.Request, response *restful.Response) { - res, err := logQuery(log.QueryLevelCluster, request) - if err != nil { - response.WriteHeaderAndEntity(http.StatusServiceUnavailable, err) - return + param := parseRequest(log.QueryLevelCluster, request) + if param.Operation == v1alpha2.OperationExport { + logExport(param, request, response) + } else { + logQuery(param, response) } - - if res.Status != http.StatusOK { - response.WriteHeaderAndEntity(res.Status, errors.New(res.Error)) - return - } - - response.WriteAsJson(res) } func LoggingQueryWorkspace(request *restful.Request, response *restful.Response) { - res, err := logQuery(log.QueryLevelWorkspace, request) - if err != nil { - response.WriteHeaderAndEntity(http.StatusServiceUnavailable, err) - return - } - - if res.Status != http.StatusOK { - response.WriteHeaderAndEntity(res.Status, errors.New(res.Error)) - return - } - - response.WriteAsJson(res) + param := parseRequest(log.QueryLevelWorkspace, request) + logQuery(param, response) } func LoggingQueryNamespace(request *restful.Request, response *restful.Response) { - res, err := logQuery(log.QueryLevelNamespace, request) - if err != nil { - response.WriteHeaderAndEntity(http.StatusServiceUnavailable, err) - return - } - - if res.Status != http.StatusOK { - response.WriteHeaderAndEntity(res.Status, errors.New(res.Error)) - return - } - - response.WriteAsJson(res) + param := parseRequest(log.QueryLevelNamespace, request) + logQuery(param, response) } func LoggingQueryWorkload(request *restful.Request, response *restful.Response) { - res, err := logQuery(log.QueryLevelWorkload, request) - if err != nil { - response.WriteHeaderAndEntity(http.StatusServiceUnavailable, err) - return - } - - if res.Status != http.StatusOK { - response.WriteHeaderAndEntity(res.Status, errors.New(res.Error)) - return - } - - response.WriteAsJson(res) + param := parseRequest(log.QueryLevelWorkload, request) + logQuery(param, response) } func LoggingQueryPod(request *restful.Request, response *restful.Response) { - res, err := logQuery(log.QueryLevelPod, request) - if err != nil { - response.WriteHeaderAndEntity(http.StatusServiceUnavailable, err) - return - } - - if res.Status != http.StatusOK { - response.WriteHeaderAndEntity(res.Status, errors.New(res.Error)) - return - } - response.WriteAsJson(res) + param := parseRequest(log.QueryLevelPod, request) + logQuery(param, response) } func LoggingQueryContainer(request *restful.Request, response *restful.Response) { - res, err := logQuery(log.QueryLevelContainer, request) - if err != nil { - response.WriteHeaderAndEntity(http.StatusServiceUnavailable, err) - return + param := parseRequest(log.QueryLevelContainer, request) + if param.Operation == v1alpha2.OperationExport { + logExport(param, request, response) + } else { + logQuery(param, response) } - - if res.Status != http.StatusOK { - response.WriteHeaderAndEntity(res.Status, errors.New(res.Error)) - return - } - response.WriteAsJson(res) } func LoggingQueryFluentbitOutputs(request *restful.Request, response *restful.Response) { @@ -130,7 +84,6 @@ func LoggingQueryFluentbitOutputs(request *restful.Request, response *restful.Re } func LoggingInsertFluentbitOutput(request *restful.Request, response *restful.Response) { - var output fb.OutputPlugin var res *log.FluentbitOutputsResult @@ -151,7 +104,6 @@ func LoggingInsertFluentbitOutput(request *restful.Request, response *restful.Re } func LoggingUpdateFluentbitOutput(request *restful.Request, response *restful.Response) { - var output fb.OutputPlugin id := request.PathParameter("output") @@ -174,7 +126,6 @@ func LoggingUpdateFluentbitOutput(request *restful.Request, response *restful.Re } func LoggingDeleteFluentbitOutput(request *restful.Request, response *restful.Response) { - var res *log.FluentbitOutputsResult id := request.PathParameter("output") @@ -188,22 +139,92 @@ func LoggingDeleteFluentbitOutput(request *restful.Request, response *restful.Re response.WriteAsJson(res) } -func logQuery(level log.LogQueryLevel, request *restful.Request) (*v1alpha2.QueryResult, error) { +func logQuery(param v1alpha2.QueryParameters, response *restful.Response) { es, err := cs.ClientSets().ElasticSearch() if err != nil { - klog.Error(err) - return nil, err + response.WriteHeaderAndEntity(http.StatusServiceUnavailable, errors.Wrap(err)) + return } + res, err := es.Query(param) + if err != nil { + response.WriteHeaderAndEntity(http.StatusInternalServerError, errors.Wrap(err)) + return + } + + response.WriteAsJson(res) +} + +func logExport(param v1alpha2.QueryParameters, request *restful.Request, response *restful.Response) { + es, err := cs.ClientSets().ElasticSearch() + if err != nil { + response.WriteHeaderAndEntity(http.StatusServiceUnavailable, errors.Wrap(err)) + return + } + + response.Header().Set("Content-Type", restful.MIME_OCTET) + + // keep search context alive for 1m + param.ScrollTimeout = time.Minute + // export 1000 records in every iteration + param.Size = 1000 + // from is not allowed in a scroll context + param.From = 0 + + var scrollId string + // limit to retrieve max 100k records + for i := 0; i < 100; i++ { + var res *v1alpha2.QueryResult + if scrollId == "" { + res, err = es.Query(param) + if err != nil { + response.WriteHeaderAndEntity(http.StatusInternalServerError, errors.Wrap(err)) + return + } + } else { + res, err = es.Scroll(scrollId) + if err != nil { + break + } + } + + if res.Read == nil || len(res.Read.Records) == 0 { + break + } + output := new(bytes.Buffer) + for _, r := range res.Read.Records { + output.WriteString(fmt.Sprintf(`%s`, stringutils.StripAnsi(r.Log))) + } + _, err = io.Copy(response, output) + if err != nil { + klog.Error(err) + break + } + + scrollId = res.Read.ScrollID + + select { + case <-request.Request.Context().Done(): + break + default: + } + } + + if scrollId != "" { + es.ClearScroll(scrollId) + } +} + +func parseRequest(level log.LogQueryLevel, request *restful.Request) v1alpha2.QueryParameters { var param v1alpha2.QueryParameters switch level { case log.QueryLevelCluster: var namespaces []string param.NamespaceNotFound, namespaces = log.MatchNamespace(stringutils.Split(request.QueryParameter("namespaces"), ","), - stringutils.Split(strings.ToLower(request.QueryParameter("namespace_query")), ","), // case-insensitive + stringutils.Split(strings.ToLower(request.QueryParameter("namespace_query")), ","), stringutils.Split(request.QueryParameter("workspaces"), ","), - stringutils.Split(strings.ToLower(request.QueryParameter("workspace_query")), ",")) // case-insensitive + stringutils.Split(strings.ToLower(request.QueryParameter("workspace_query")), ",")) param.NamespaceWithCreationTime = log.MakeNamespaceCreationTimeMap(namespaces) param.WorkloadFilter = stringutils.Split(request.QueryParameter("workloads"), ",") param.WorkloadQuery = stringutils.Split(request.QueryParameter("workload_query"), ",") @@ -214,8 +235,8 @@ func logQuery(level log.LogQueryLevel, request *restful.Request) (*v1alpha2.Quer case log.QueryLevelWorkspace: var namespaces []string param.NamespaceNotFound, namespaces = log.MatchNamespace(stringutils.Split(request.QueryParameter("namespaces"), ","), - stringutils.Split(strings.ToLower(request.QueryParameter("namespace_query")), ","), // case-insensitive - stringutils.Split(request.PathParameter("workspace"), ","), nil) // case-insensitive + stringutils.Split(strings.ToLower(request.QueryParameter("namespace_query")), ","), + stringutils.Split(request.PathParameter("workspace"), ","), nil) param.NamespaceWithCreationTime = log.MakeNamespaceCreationTimeMap(namespaces) param.WorkloadFilter = stringutils.Split(request.QueryParameter("workloads"), ",") param.WorkloadQuery = stringutils.Split(request.QueryParameter("workload_query"), ",") @@ -254,21 +275,31 @@ func logQuery(level log.LogQueryLevel, request *restful.Request) (*v1alpha2.Quer } param.LogQuery = stringutils.Split(request.QueryParameter("log_query"), ",") - - param.Operation = request.QueryParameter("operation") param.Interval = request.QueryParameter("interval") param.StartTime = request.QueryParameter("start_time") param.EndTime = request.QueryParameter("end_time") param.Sort = request.QueryParameter("sort") + switch request.QueryParameter("operation") { + case "statistics": + param.Operation = v1alpha2.OperationStatistics + case "histogram": + param.Operation = v1alpha2.OperationHistogram + case "export": + param.Operation = v1alpha2.OperationExport + default: + param.Operation = v1alpha2.OperationQuery + } + var err error param.From, err = strconv.ParseInt(request.QueryParameter("from"), 10, 64) if err != nil { param.From = 0 } + param.Size, err = strconv.ParseInt(request.QueryParameter("size"), 10, 64) if err != nil { param.Size = 10 } - return es.Query(param), nil + return param } diff --git a/pkg/apiserver/tenant/tenant.go b/pkg/apiserver/tenant/tenant.go index a82d57679..a8e7de52d 100644 --- a/pkg/apiserver/tenant/tenant.go +++ b/pkg/apiserver/tenant/tenant.go @@ -339,6 +339,23 @@ func ListDevopsRules(req *restful.Request, resp *restful.Response) { } func LogQuery(req *restful.Request, resp *restful.Response) { + req, err := regenerateLoggingRequest(req) + switch { + case err != nil: + resp.WriteHeaderAndEntity(http.StatusInternalServerError, errors.Wrap(err)) + case req != nil: + logging.LoggingQueryCluster(req, resp) + default: + if req.QueryParameter("operation") == "export" { + resp.Write(nil) + } else { + resp.WriteAsJson(loggingv1alpha2.QueryResult{}) + } + } +} + +// override namespace query conditions +func regenerateLoggingRequest(req *restful.Request) (*restful.Request, error) { username := req.HeaderParameter(constants.UserNameHeader) @@ -348,9 +365,8 @@ func LogQuery(req *restful.Request, resp *restful.Response) { clusterRules, err := iam.GetUserClusterRules(username) if err != nil { - resp.WriteHeaderAndEntity(http.StatusInternalServerError, errors.Wrap(err)) klog.Errorln(err) - return + return nil, err } hasClusterLogAccess := iam.RulesMatchesRequired(clusterRules, rbacv1.PolicyRule{Verbs: []string{"get"}, Resources: []string{"*"}, APIGroups: []string{"logging.kubesphere.io"}}) @@ -361,9 +377,8 @@ func LogQuery(req *restful.Request, resp *restful.Response) { namespaces := make([]string, 0) roles, err := iam.GetUserRoles("", username) if err != nil { - resp.WriteHeaderAndEntity(http.StatusInternalServerError, errors.Wrap(err)) klog.Errorln(err) - return + return nil, err } for _, role := range roles { if !sliceutil.HasString(namespaces, role.Namespace) && iam.RulesMatchesRequired(role.Rules, rbacv1.PolicyRule{Verbs: []string{"get"}, Resources: []string{"*"}, APIGroups: []string{"logging.kubesphere.io"}}) { @@ -374,17 +389,13 @@ func LogQuery(req *restful.Request, resp *restful.Response) { // if the user belongs to no namespace // then no log visible if len(namespaces) == 0 { - res := loggingv1alpha2.QueryResult{Status: http.StatusOK} - resp.WriteAsJson(res) - return + return nil, nil } else if len(queryNamespaces) == 1 && queryNamespaces[0] == "" { values.Set("namespaces", strings.Join(namespaces, ",")) } else { inter := intersection(queryNamespaces, namespaces) if len(inter) == 0 { - res := loggingv1alpha2.QueryResult{Status: http.StatusOK} - resp.WriteAsJson(res) - return + return nil, nil } values.Set("namespaces", strings.Join(inter, ",")) } @@ -394,7 +405,7 @@ func LogQuery(req *restful.Request, resp *restful.Response) { // forward the request to logging model newHttpRequest, _ := http.NewRequest(http.MethodGet, newUrl.String(), nil) - logging.LoggingQueryCluster(restful.NewRequest(newHttpRequest), resp) + return restful.NewRequest(newHttpRequest), nil } func intersection(s1, s2 []string) (inter []string) { diff --git a/pkg/simple/client/elasticsearch/esclient.go b/pkg/simple/client/elasticsearch/esclient.go index 2d9af100e..59dad211f 100644 --- a/pkg/simple/client/elasticsearch/esclient.go +++ b/pkg/simple/client/elasticsearch/esclient.go @@ -21,8 +21,6 @@ import ( v5 "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch/versions/v5" v6 "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch/versions/v6" v7 "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch/versions/v7" - "net/http" - "strconv" "strings" "time" @@ -30,10 +28,6 @@ import ( ) const ( - OperationQuery int = iota - OperationStatistics - OperationHistogram - matchPhrase = iota matchPhrasePrefix regexpQuery @@ -137,7 +131,7 @@ func detectVersionMajor(host string) (string, error) { return v, nil } -func createQueryRequest(param v1alpha2.QueryParameters) (int, []byte, error) { +func createQueryRequest(param v1alpha2.QueryParameters) ([]byte, error) { var request v1alpha2.Request var mainBoolQuery v1alpha2.BoolFilter @@ -190,16 +184,12 @@ func createQueryRequest(param v1alpha2.QueryParameters) (int, []byte, error) { rangeQuery := v1alpha2.RangeQuery{RangeSpec: v1alpha2.RangeSpec{TimeRange: v1alpha2.TimeRange{Gte: param.StartTime, Lte: param.EndTime}}} mainBoolQuery.Filter = append(mainBoolQuery.Filter, rangeQuery) - var operation int - - if param.Operation == "statistics" { - operation = OperationStatistics + if param.Operation == v1alpha2.OperationStatistics { containerAgg := v1alpha2.AggField{Field: "kubernetes.docker_id.keyword"} statisticAggs := v1alpha2.StatisticsAggs{ContainerAgg: v1alpha2.ContainerAgg{Cardinality: containerAgg}} request.Aggs = statisticAggs request.Size = 0 - } else if param.Operation == "histogram" { - operation = OperationHistogram + } else if param.Operation == v1alpha2.OperationHistogram { var interval string if param.Interval != "" { interval = param.Interval @@ -210,7 +200,6 @@ func createQueryRequest(param v1alpha2.QueryParameters) (int, []byte, error) { request.Aggs = v1alpha2.HistogramAggs{HistogramAgg: v1alpha2.HistogramAgg{DateHistogram: v1alpha2.DateHistogram{Field: "time", Interval: interval}}} request.Size = 0 } else { - operation = OperationQuery request.From = param.From request.Size = param.Size var order string @@ -232,9 +221,7 @@ func createQueryRequest(param v1alpha2.QueryParameters) (int, []byte, error) { request.MainQuery = v1alpha2.BoolQuery{Bool: mainBoolQuery} - queryRequest, err := json.Marshal(request) - - return operation, queryRequest, err + return json.Marshal(request) } func makeBoolShould(queryType int, field string, list []string) v1alpha2.BoolQuery { @@ -288,46 +275,14 @@ func makePodNameRegexp(workloadName string) string { return regexp } -func calcTimestamp(input string) int64 { - var t time.Time - var err error - var ret int64 - - ret = 0 - - t, err = time.Parse(time.RFC3339, input) - if err != nil { - var i int64 - i, err = strconv.ParseInt(input, 10, 64) - if err == nil { - ret = time.Unix(i/1000, (i%1000)*1000000).UnixNano() / 1000000 - } - } else { - ret = t.UnixNano() / 1000000 - } - - return ret -} - -func (c *ElasticSearchClient) parseQueryResult(operation int, param v1alpha2.QueryParameters, body []byte) *v1alpha2.QueryResult { +func (c *ElasticSearchClient) parseQueryResult(operation int, body []byte) (*v1alpha2.QueryResult, error) { var queryResult v1alpha2.QueryResult var response v1alpha2.Response err := jsonIter.Unmarshal(body, &response) if err != nil { - klog.Errorln(err) - queryResult.Status = http.StatusInternalServerError - queryResult.Error = err.Error() - return &queryResult - } - - if response.Status != 0 { - //Elastic error, eg, es_rejected_execute_exception - err := "The query failed with no response" - queryResult.Status = response.Status - queryResult.Error = err - klog.Errorln(err) - return &queryResult + klog.Error(err) + return nil, err } if response.Shards.Successful != response.Shards.Total { @@ -337,14 +292,12 @@ func (c *ElasticSearchClient) parseQueryResult(operation int, param v1alpha2.Que } switch operation { - case OperationQuery: + case v1alpha2.OperationQuery: var readResult v1alpha2.ReadResult readResult.Total = c.client.GetTotalHitCount(response.Hits.Total) - readResult.From = param.From - readResult.Size = param.Size for _, hit := range response.Hits.Hits { var logRecord v1alpha2.LogRecord - logRecord.Time = calcTimestamp(hit.Source.Time) + logRecord.Time = hit.Source.Time logRecord.Log = hit.Source.Log logRecord.Namespace = hit.Source.Kubernetes.Namespace logRecord.Pod = hit.Source.Kubernetes.Pod @@ -354,32 +307,23 @@ func (c *ElasticSearchClient) parseQueryResult(operation int, param v1alpha2.Que readResult.Records = append(readResult.Records, logRecord) } queryResult.Read = &readResult - - case OperationStatistics: + case v1alpha2.OperationStatistics: var statisticsResponse v1alpha2.StatisticsResponseAggregations err := jsonIter.Unmarshal(response.Aggregations, &statisticsResponse) if err != nil && response.Aggregations != nil { - klog.Errorln(err) - queryResult.Status = http.StatusInternalServerError - queryResult.Error = err.Error() - return &queryResult + klog.Error(err) + return nil, err } queryResult.Statistics = &v1alpha2.StatisticsResult{Containers: statisticsResponse.ContainerCount.Value, Logs: c.client.GetTotalHitCount(response.Hits.Total)} - - case OperationHistogram: + case v1alpha2.OperationHistogram: var histogramResult v1alpha2.HistogramResult histogramResult.Total = c.client.GetTotalHitCount(response.Hits.Total) - histogramResult.StartTime = calcTimestamp(param.StartTime) - histogramResult.EndTime = calcTimestamp(param.EndTime) - histogramResult.Interval = param.Interval var histogramAggregations v1alpha2.HistogramAggregations err = jsonIter.Unmarshal(response.Aggregations, &histogramAggregations) if err != nil && response.Aggregations != nil { - klog.Errorln(err) - queryResult.Status = http.StatusInternalServerError - queryResult.Error = err.Error() - return &queryResult + klog.Error(err) + return nil, err } for _, histogram := range histogramAggregations.HistogramAggregation.Histograms { var histogramRecord v1alpha2.HistogramRecord @@ -390,58 +334,61 @@ func (c *ElasticSearchClient) parseQueryResult(operation int, param v1alpha2.Que } queryResult.Histogram = &histogramResult + case v1alpha2.OperationExport: + var readResult v1alpha2.ReadResult + readResult.ScrollID = response.ScrollId + for _, hit := range response.Hits.Hits { + var logRecord v1alpha2.LogRecord + logRecord.Log = hit.Source.Log + readResult.Records = append(readResult.Records, logRecord) + } + queryResult.Read = &readResult } - queryResult.Status = http.StatusOK - - return &queryResult + return &queryResult, nil } -func (c *ElasticSearchClient) Query(param v1alpha2.QueryParameters) *v1alpha2.QueryResult { +func (c *ElasticSearchClient) Query(param v1alpha2.QueryParameters) (*v1alpha2.QueryResult, error) { var queryResult = new(v1alpha2.QueryResult) if param.NamespaceNotFound { queryResult = new(v1alpha2.QueryResult) - queryResult.Status = http.StatusOK switch param.Operation { - case "statistics": + case v1alpha2.OperationStatistics: queryResult.Statistics = new(v1alpha2.StatisticsResult) - case "histogram": - queryResult.Histogram = &v1alpha2.HistogramResult{ - StartTime: calcTimestamp(param.StartTime), - EndTime: calcTimestamp(param.EndTime), - Interval: param.Interval} + case v1alpha2.OperationHistogram: + queryResult.Histogram = new(v1alpha2.HistogramResult) default: queryResult.Read = new(v1alpha2.ReadResult) } - return queryResult + return queryResult, nil } - if c.client == nil { - queryResult.Status = http.StatusBadRequest - queryResult.Error = "can not create elastic search client" - return queryResult - } - - operation, query, err := createQueryRequest(param) + query, err := createQueryRequest(param) if err != nil { - klog.Errorln(err) - queryResult.Status = http.StatusInternalServerError - queryResult.Error = err.Error() - return queryResult + klog.Error(err) + return nil, err } - body, err := c.client.Search(query) + body, err := c.client.Search(query, param.ScrollTimeout) if err != nil { - klog.Errorln(err) - queryResult = new(v1alpha2.QueryResult) - queryResult.Status = http.StatusInternalServerError - queryResult.Error = err.Error() - return queryResult + klog.Error(err) + return nil, err } - queryResult = c.parseQueryResult(operation, param, body) - - return queryResult + return c.parseQueryResult(param.Operation, body) +} + +func (c *ElasticSearchClient) Scroll(scrollId string) (*v1alpha2.QueryResult, error) { + body, err := c.client.Scroll(scrollId, time.Minute) + if err != nil { + klog.Error(err) + return nil, err + } + return c.parseQueryResult(v1alpha2.OperationExport, body) +} + +func (c *ElasticSearchClient) ClearScroll(scrollId string) { + c.client.ClearScroll(scrollId) } diff --git a/pkg/simple/client/elasticsearch/interface.go b/pkg/simple/client/elasticsearch/interface.go index ddbf9b3d8..de278c8ae 100644 --- a/pkg/simple/client/elasticsearch/interface.go +++ b/pkg/simple/client/elasticsearch/interface.go @@ -1,7 +1,11 @@ package esclient +import "time" + type Client interface { // Perform Search API - Search(body []byte) ([]byte, error) + Search(body []byte, scrollTimeout time.Duration) ([]byte, error) + Scroll(scrollId string, scrollTimeout time.Duration) ([]byte, error) + ClearScroll(scrollId string) GetTotalHitCount(v interface{}) int64 } diff --git a/pkg/simple/client/elasticsearch/versions/v5/elasticsearch.go b/pkg/simple/client/elasticsearch/versions/v5/elasticsearch.go index 6fd96fc4e..45cad63b2 100644 --- a/pkg/simple/client/elasticsearch/versions/v5/elasticsearch.go +++ b/pkg/simple/client/elasticsearch/versions/v5/elasticsearch.go @@ -6,7 +6,9 @@ import ( "encoding/json" "fmt" "github.com/elastic/go-elasticsearch/v5" + "github.com/elastic/go-elasticsearch/v5/esapi" "io/ioutil" + "time" ) type Elastic struct { @@ -15,7 +17,6 @@ type Elastic struct { } func New(address string, index string) *Elastic { - client, _ := elasticsearch.NewClient(elasticsearch.Config{ Addresses: []string{address}, }) @@ -23,33 +24,60 @@ func New(address string, index string) *Elastic { return &Elastic{client: client, index: index} } -func (e *Elastic) Search(body []byte) ([]byte, error) { - +func (e *Elastic) Search(body []byte, scrollTimeout time.Duration) ([]byte, error) { response, err := e.client.Search( e.client.Search.WithContext(context.Background()), e.client.Search.WithIndex(fmt.Sprintf("%s*", e.index)), e.client.Search.WithBody(bytes.NewBuffer(body)), - ) + e.client.Search.WithScroll(scrollTimeout)) if err != nil { return nil, err } defer response.Body.Close() if response.IsError() { - var e map[string]interface{} - if err := json.NewDecoder(response.Body).Decode(&e); err != nil { - return nil, err - } else { - // Print the response status and error information. - e, _ := e["error"].(map[string]interface{}) - return nil, fmt.Errorf("[%s] %s: %s", response.Status(), e["type"], e["reason"]) - } + return nil, parseError(response) } return ioutil.ReadAll(response.Body) } +func (e *Elastic) Scroll(scrollId string, scrollTimeout time.Duration) ([]byte, error) { + response, err := e.client.Scroll( + e.client.Scroll.WithContext(context.Background()), + e.client.Scroll.WithScrollID(scrollId), + e.client.Scroll.WithScroll(scrollTimeout)) + if err != nil { + return nil, err + } + defer response.Body.Close() + + if response.IsError() { + return nil, parseError(response) + } + + return ioutil.ReadAll(response.Body) +} + +func (e *Elastic) ClearScroll(scrollId string) { + response, _ := e.client.ClearScroll( + e.client.ClearScroll.WithContext(context.Background()), + e.client.ClearScroll.WithScrollID(scrollId)) + defer response.Body.Close() +} + func (e *Elastic) GetTotalHitCount(v interface{}) int64 { f, _ := v.(float64) return int64(f) } + +func parseError(response *esapi.Response) error { + var e map[string]interface{} + if err := json.NewDecoder(response.Body).Decode(&e); err != nil { + return err + } else { + // Print the response status and error information. + e, _ := e["error"].(map[string]interface{}) + return fmt.Errorf("%s: %s", e["type"], e["reason"]) + } +} diff --git a/pkg/simple/client/elasticsearch/versions/v6/elasticsearch.go b/pkg/simple/client/elasticsearch/versions/v6/elasticsearch.go index 577447747..80184497a 100644 --- a/pkg/simple/client/elasticsearch/versions/v6/elasticsearch.go +++ b/pkg/simple/client/elasticsearch/versions/v6/elasticsearch.go @@ -6,7 +6,9 @@ import ( "encoding/json" "fmt" "github.com/elastic/go-elasticsearch/v6" + "github.com/elastic/go-elasticsearch/v6/esapi" "io/ioutil" + "time" ) type Elastic struct { @@ -15,7 +17,6 @@ type Elastic struct { } func New(address string, index string) *Elastic { - client, _ := elasticsearch.NewClient(elasticsearch.Config{ Addresses: []string{address}, }) @@ -23,33 +24,60 @@ func New(address string, index string) *Elastic { return &Elastic{Client: client, index: index} } -func (e *Elastic) Search(body []byte) ([]byte, error) { - +func (e *Elastic) Search(body []byte, scrollTimeout time.Duration) ([]byte, error) { response, err := e.Client.Search( e.Client.Search.WithContext(context.Background()), e.Client.Search.WithIndex(fmt.Sprintf("%s*", e.index)), e.Client.Search.WithBody(bytes.NewBuffer(body)), - ) + e.Client.Search.WithScroll(scrollTimeout)) if err != nil { return nil, err } defer response.Body.Close() if response.IsError() { - var e map[string]interface{} - if err := json.NewDecoder(response.Body).Decode(&e); err != nil { - return nil, err - } else { - // Print the response status and error information. - e, _ := e["error"].(map[string]interface{}) - return nil, fmt.Errorf("[%s] %s: %s", response.Status(), e["type"], e["reason"]) - } + return nil, parseError(response) } return ioutil.ReadAll(response.Body) } +func (e *Elastic) Scroll(scrollId string, scrollTimeout time.Duration) ([]byte, error) { + response, err := e.Client.Scroll( + e.Client.Scroll.WithContext(context.Background()), + e.Client.Scroll.WithScrollID(scrollId), + e.Client.Scroll.WithScroll(scrollTimeout)) + if err != nil { + return nil, err + } + defer response.Body.Close() + + if response.IsError() { + return nil, parseError(response) + } + + return ioutil.ReadAll(response.Body) +} + +func (e *Elastic) ClearScroll(scrollId string) { + response, _ := e.Client.ClearScroll( + e.Client.ClearScroll.WithContext(context.Background()), + e.Client.ClearScroll.WithScrollID(scrollId)) + defer response.Body.Close() +} + func (e *Elastic) GetTotalHitCount(v interface{}) int64 { f, _ := v.(float64) return int64(f) } + +func parseError(response *esapi.Response) error { + var e map[string]interface{} + if err := json.NewDecoder(response.Body).Decode(&e); err != nil { + return err + } else { + // Print the response status and error information. + e, _ := e["error"].(map[string]interface{}) + return fmt.Errorf("%s: %s", e["type"], e["reason"]) + } +} diff --git a/pkg/simple/client/elasticsearch/versions/v7/elasticsearch.go b/pkg/simple/client/elasticsearch/versions/v7/elasticsearch.go index bc1293baf..7777d046d 100644 --- a/pkg/simple/client/elasticsearch/versions/v7/elasticsearch.go +++ b/pkg/simple/client/elasticsearch/versions/v7/elasticsearch.go @@ -6,7 +6,9 @@ import ( "encoding/json" "fmt" "github.com/elastic/go-elasticsearch/v7" + "github.com/elastic/go-elasticsearch/v7/esapi" "io/ioutil" + "time" ) type Elastic struct { @@ -15,7 +17,6 @@ type Elastic struct { } func New(address string, index string) *Elastic { - client, _ := elasticsearch.NewClient(elasticsearch.Config{ Addresses: []string{address}, }) @@ -23,35 +24,63 @@ func New(address string, index string) *Elastic { return &Elastic{client: client, index: index} } -func (e *Elastic) Search(body []byte) ([]byte, error) { - +func (e *Elastic) Search(body []byte, scrollTimeout time.Duration) ([]byte, error) { response, err := e.client.Search( e.client.Search.WithContext(context.Background()), e.client.Search.WithIndex(fmt.Sprintf("%s*", e.index)), e.client.Search.WithTrackTotalHits(true), e.client.Search.WithBody(bytes.NewBuffer(body)), - ) + e.client.Search.WithScroll(scrollTimeout)) if err != nil { return nil, err } defer response.Body.Close() if response.IsError() { - var e map[string]interface{} - if err := json.NewDecoder(response.Body).Decode(&e); err != nil { - return nil, err - } else { - // Print the response status and error information. - e, _ := e["error"].(map[string]interface{}) - return nil, fmt.Errorf("[%s] %s: %s", response.Status(), e["type"], e["reason"]) - } + return nil, parseError(response) } return ioutil.ReadAll(response.Body) } +func (e *Elastic) Scroll(scrollId string, scrollTimeout time.Duration) ([]byte, error) { + response, err := e.client.Scroll( + e.client.Scroll.WithContext(context.Background()), + e.client.Scroll.WithScrollID(scrollId), + e.client.Scroll.WithScroll(scrollTimeout)) + if err != nil { + return nil, err + } + defer response.Body.Close() + + if response.IsError() { + return nil, parseError(response) + } + + b, err := ioutil.ReadAll(response.Body) + return b, err +} + +func (e *Elastic) ClearScroll(scrollId string) { + response, _ := e.client.ClearScroll( + e.client.ClearScroll.WithContext(context.Background()), + e.client.ClearScroll.WithScrollID(scrollId)) + defer response.Body.Close() +} + func (e *Elastic) GetTotalHitCount(v interface{}) int64 { m, _ := v.(map[string]interface{}) f, _ := m["value"].(float64) return int64(f) } + +func parseError(response *esapi.Response) error { + var e map[string]interface{} + if err := json.NewDecoder(response.Body).Decode(&e); err != nil { + return err + } else { + // Print the response status and error information. + e, _ := e["error"].(map[string]interface{}) + return fmt.Errorf("%s: %s", e["type"], e["reason"]) + } +} diff --git a/pkg/utils/stringutils/string.go b/pkg/utils/stringutils/string.go index ef68b33bd..acc7fb1ea 100644 --- a/pkg/utils/stringutils/string.go +++ b/pkg/utils/stringutils/string.go @@ -14,12 +14,17 @@ limitations under the License. package stringutils import ( + "regexp" "strings" "unicode/utf8" "github.com/asaskevich/govalidator" ) +const ansi = "[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))" + +var re = regexp.MustCompile(ansi) + // Creates an slice of slice values not included in the other given slice. func Diff(base, exclude []string) (result []string) { excludeMap := make(map[string]bool) @@ -83,3 +88,7 @@ func Split(str string, sep string) []string { } return strings.Split(str, sep) } + +func StripAnsi(str string) string { + return re.ReplaceAllString(str, "") +}