From 4c111533b154b45e3a15a3f92c9a60540fa1ec83 Mon Sep 17 00:00:00 2001 From: huanggze Date: Mon, 13 May 2019 18:40:38 +0800 Subject: [PATCH] fix: log statitiscs Signed-off-by: huanggze --- pkg/simple/client/elasticsearch/esclient.go | 136 +++++++++----------- 1 file changed, 62 insertions(+), 74 deletions(-) diff --git a/pkg/simple/client/elasticsearch/esclient.go b/pkg/simple/client/elasticsearch/esclient.go index c5075459c..896b1a2da 100644 --- a/pkg/simple/client/elasticsearch/esclient.go +++ b/pkg/simple/client/elasticsearch/esclient.go @@ -114,7 +114,8 @@ type QueryWord struct { } type MainHighLight struct { - Fields []interface{} `json:"fields,omitempty"` + Fields []interface{} `json:"fields,omitempty"` + FragmentSize int `json:"fragment_size"` } type LogHighLightField struct { @@ -136,26 +137,37 @@ type ContainerHighLightField struct { type EmptyField struct { } +// The aggs object holds two aggregations to be computed by Elasticsearch +// ContainterAgg is a cardinality aggregation to calculate the count of distinct containers +// StartTimeAgg is a top hits aggregation to retrieve the first record type StatisticsAggs struct { - NamespaceAgg NamespaceAgg `json:"Namespace"` -} - -type NamespaceAgg struct { - Terms StatisticsAggTerm `json:"terms"` - ContainerAggs ContainerAggs `json:"aggs"` -} - -type ContainerAggs struct { - ContainerAgg ContainerAgg `json:"Container"` + ContainerAgg ContainerAgg `json:"containers"` + StartTimeAgg StartTimeAgg `json:"starttime"` } type ContainerAgg struct { - Terms StatisticsAggTerm `json:"terms"` + Cardinality AggField `json:"cardinality"` } -type StatisticsAggTerm struct { +type AggField struct { Field string `json:"field"` - Size int64 `json:"size"` +} + +type StartTimeAgg struct { + TopHits TopHits `json:"top_hits"` +} + +type TopHits struct { + Sort []TopHitsSort `json:"sort"` + Size int `json:"size"` +} + +type TopHitsSort struct { + Order TopHitsSortOrder `json:"time"` +} + +type TopHitsSortOrder struct { + Type string `json:"order"` } type HistogramAggs struct { @@ -250,9 +262,10 @@ func createQueryRequest(param QueryParameters) (int, []byte, error) { if param.Operation == "statistics" { operation = OperationStatistics - containerAggs := ContainerAggs{ContainerAgg{StatisticsAggTerm{"kubernetes.container_name.keyword", 2147483647}}} - namespaceAgg := NamespaceAgg{StatisticsAggTerm{"kubernetes.namespace_name.keyword", 2147483647}, containerAggs} - request.Aggs = StatisticsAggs{namespaceAgg} + containerAgg := AggField{"kubernetes.docker_id.keyword"} + startTimeAgg := TopHits{[]TopHitsSort{{TopHitsSortOrder{"asc"}}}, 1} + statisticAggs := StatisticsAggs{ContainerAgg{containerAgg}, StartTimeAgg{startTimeAgg}} + request.Aggs = statisticAggs request.Size = 0 } else if param.Operation == "histogram" { operation = OperationHistogram @@ -282,6 +295,7 @@ func createQueryRequest(param QueryParameters) (int, []byte, error) { mainHighLight.Fields = append(mainHighLight.Fields, NamespaceHighLightField{}) mainHighLight.Fields = append(mainHighLight.Fields, PodHighLightField{}) mainHighLight.Fields = append(mainHighLight.Fields, ContainerHighLightField{}) + mainHighLight.FragmentSize = 0 request.MainHighLight = mainHighLight } @@ -315,6 +329,7 @@ type Hits struct { type Hit struct { Source Source `json:"_source"` HighLight HighLight `json:"highlight"` + Sort []int64 `json:"sort"` } type Source struct { @@ -354,43 +369,18 @@ type ReadResult struct { Records []LogRecord `json:"records,omitempty"` } -type NamespaceAggregations struct { - NamespaceAggregation NamespaceAggregation `json:"Namespace"` +// The aggregations object represents the return from an aggregation (see StatisticsAggs type) +type StatisticsResponseAggregations struct { + ContainerCount ContainerCount `json:"containers"` + StartTime StartTimeTopHit `json:"starttime"` } -type NamespaceAggregation struct { - Namespaces []NamespaceStatistics `json:"buckets"` +type ContainerCount struct { + Value int64 `json:"value"` } -type NamespaceStatistics struct { - Namespace string `json:"Key"` - Count int64 `json:"doc_count"` - ContainerAggregation ContainerAggregation `json:"Container"` -} - -type ContainerAggregation struct { - Containers []ContainerStatistics `json:"buckets"` -} - -type ContainerStatistics struct { - Container string `json:"Key"` - Count int64 `json:"doc_count"` -} - -type NamespaceResult struct { - Namespace string `json:"namespace"` - Count int64 `json:"count"` - Containers []ContainerResult `json:"containers"` -} - -type ContainerResult struct { - Container string `json:"container"` - Count int64 `json:"count"` -} - -type StatisticsResult struct { - Total int64 `json:"total"` - Namespaces []NamespaceResult `json:"namespaces"` +type StartTimeTopHit struct { + Hits Hits `json:"hits"` } type HistogramAggregations struct { @@ -411,6 +401,12 @@ type HistogramRecord struct { Count int64 `json:"count"` } +type StatisticsResult struct { + Containers int64 `json:"containers"` + Logs int64 `json:"logs"` + StartTime int64 `json:"starttime"` +} + type HistogramResult struct { Total int64 `json:"total"` StartTime int64 `json:"start_time"` @@ -464,8 +460,8 @@ func parseQueryResult(operation int, param QueryParameters, body []byte, query [ var response Response err := jsonIter.Unmarshal(body, &response) if err != nil { - //fmt.Println("Parse response error ", err.Error()) - queryResult.Status = http.StatusNotFound + glog.Errorln(err) + queryResult.Status = http.StatusInternalServerError return &queryResult } @@ -501,28 +497,15 @@ func parseQueryResult(operation int, param QueryParameters, body []byte, query [ queryResult.Read = &readResult case OperationStatistics: - var statisticsResult StatisticsResult - statisticsResult.Total = response.Hits.Total - - var namespaceAggregations NamespaceAggregations - jsonIter.Unmarshal(response.Aggregations, &namespaceAggregations) - - for _, namespace := range namespaceAggregations.NamespaceAggregation.Namespaces { - var namespaceResult NamespaceResult - namespaceResult.Namespace = namespace.Namespace - namespaceResult.Count = namespace.Count - - for _, container := range namespace.ContainerAggregation.Containers { - var containerResult ContainerResult - containerResult.Container = container.Container - containerResult.Count = container.Count - namespaceResult.Containers = append(namespaceResult.Containers, containerResult) - } - - statisticsResult.Namespaces = append(statisticsResult.Namespaces, namespaceResult) + var statisticsResponse StatisticsResponseAggregations + err := jsonIter.Unmarshal(response.Aggregations, &statisticsResponse) + if err != nil { + glog.Errorln(err) + queryResult.Status = http.StatusInternalServerError + return &queryResult } - - queryResult.Statistics = &statisticsResult + queryResult.Statistics = &StatisticsResult{Containers: statisticsResponse.ContainerCount.Value, + Logs: statisticsResponse.StartTime.Hits.Total, StartTime: statisticsResponse.StartTime.Hits.Hits[0].Sort[0]} case OperationHistogram: var histogramResult HistogramResult @@ -532,7 +515,12 @@ func parseQueryResult(operation int, param QueryParameters, body []byte, query [ histogramResult.Interval = param.Interval var histogramAggregations HistogramAggregations - jsonIter.Unmarshal(response.Aggregations, &histogramAggregations) + err := jsonIter.Unmarshal(response.Aggregations, &histogramAggregations) + if err != nil { + glog.Errorln(err) + queryResult.Status = http.StatusInternalServerError + return &queryResult + } for _, histogram := range histogramAggregations.HistogramAggregation.Histograms { var histogramRecord HistogramRecord histogramRecord.Time = histogram.Time