feat: allow to export logs

Signed-off-by: huanggze <loganhuang@yunify.com>
This commit is contained in:
huanggze
2019-09-19 23:25:18 +08:00
parent 49dacd3e70
commit a71b35db9c
11 changed files with 346 additions and 257 deletions

View File

@@ -21,8 +21,6 @@ import (
v5 "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch/versions/v5"
v6 "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch/versions/v6"
v7 "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch/versions/v7"
"net/http"
"strconv"
"strings"
"time"
@@ -30,10 +28,6 @@ import (
)
const (
OperationQuery int = iota
OperationStatistics
OperationHistogram
matchPhrase = iota
matchPhrasePrefix
regexpQuery
@@ -137,7 +131,7 @@ func detectVersionMajor(host string) (string, error) {
return v, nil
}
func createQueryRequest(param v1alpha2.QueryParameters) (int, []byte, error) {
func createQueryRequest(param v1alpha2.QueryParameters) ([]byte, error) {
var request v1alpha2.Request
var mainBoolQuery v1alpha2.BoolFilter
@@ -190,16 +184,12 @@ func createQueryRequest(param v1alpha2.QueryParameters) (int, []byte, error) {
rangeQuery := v1alpha2.RangeQuery{RangeSpec: v1alpha2.RangeSpec{TimeRange: v1alpha2.TimeRange{Gte: param.StartTime, Lte: param.EndTime}}}
mainBoolQuery.Filter = append(mainBoolQuery.Filter, rangeQuery)
var operation int
if param.Operation == "statistics" {
operation = OperationStatistics
if param.Operation == v1alpha2.OperationStatistics {
containerAgg := v1alpha2.AggField{Field: "kubernetes.docker_id.keyword"}
statisticAggs := v1alpha2.StatisticsAggs{ContainerAgg: v1alpha2.ContainerAgg{Cardinality: containerAgg}}
request.Aggs = statisticAggs
request.Size = 0
} else if param.Operation == "histogram" {
operation = OperationHistogram
} else if param.Operation == v1alpha2.OperationHistogram {
var interval string
if param.Interval != "" {
interval = param.Interval
@@ -210,7 +200,6 @@ func createQueryRequest(param v1alpha2.QueryParameters) (int, []byte, error) {
request.Aggs = v1alpha2.HistogramAggs{HistogramAgg: v1alpha2.HistogramAgg{DateHistogram: v1alpha2.DateHistogram{Field: "time", Interval: interval}}}
request.Size = 0
} else {
operation = OperationQuery
request.From = param.From
request.Size = param.Size
var order string
@@ -232,9 +221,7 @@ func createQueryRequest(param v1alpha2.QueryParameters) (int, []byte, error) {
request.MainQuery = v1alpha2.BoolQuery{Bool: mainBoolQuery}
queryRequest, err := json.Marshal(request)
return operation, queryRequest, err
return json.Marshal(request)
}
func makeBoolShould(queryType int, field string, list []string) v1alpha2.BoolQuery {
@@ -288,46 +275,14 @@ func makePodNameRegexp(workloadName string) string {
return regexp
}
func calcTimestamp(input string) int64 {
var t time.Time
var err error
var ret int64
ret = 0
t, err = time.Parse(time.RFC3339, input)
if err != nil {
var i int64
i, err = strconv.ParseInt(input, 10, 64)
if err == nil {
ret = time.Unix(i/1000, (i%1000)*1000000).UnixNano() / 1000000
}
} else {
ret = t.UnixNano() / 1000000
}
return ret
}
func (c *ElasticSearchClient) parseQueryResult(operation int, param v1alpha2.QueryParameters, body []byte) *v1alpha2.QueryResult {
func (c *ElasticSearchClient) parseQueryResult(operation int, body []byte) (*v1alpha2.QueryResult, error) {
var queryResult v1alpha2.QueryResult
var response v1alpha2.Response
err := jsonIter.Unmarshal(body, &response)
if err != nil {
klog.Errorln(err)
queryResult.Status = http.StatusInternalServerError
queryResult.Error = err.Error()
return &queryResult
}
if response.Status != 0 {
//Elastic error, eg, es_rejected_execute_exception
err := "The query failed with no response"
queryResult.Status = response.Status
queryResult.Error = err
klog.Errorln(err)
return &queryResult
klog.Error(err)
return nil, err
}
if response.Shards.Successful != response.Shards.Total {
@@ -337,14 +292,12 @@ func (c *ElasticSearchClient) parseQueryResult(operation int, param v1alpha2.Que
}
switch operation {
case OperationQuery:
case v1alpha2.OperationQuery:
var readResult v1alpha2.ReadResult
readResult.Total = c.client.GetTotalHitCount(response.Hits.Total)
readResult.From = param.From
readResult.Size = param.Size
for _, hit := range response.Hits.Hits {
var logRecord v1alpha2.LogRecord
logRecord.Time = calcTimestamp(hit.Source.Time)
logRecord.Time = hit.Source.Time
logRecord.Log = hit.Source.Log
logRecord.Namespace = hit.Source.Kubernetes.Namespace
logRecord.Pod = hit.Source.Kubernetes.Pod
@@ -354,32 +307,23 @@ func (c *ElasticSearchClient) parseQueryResult(operation int, param v1alpha2.Que
readResult.Records = append(readResult.Records, logRecord)
}
queryResult.Read = &readResult
case OperationStatistics:
case v1alpha2.OperationStatistics:
var statisticsResponse v1alpha2.StatisticsResponseAggregations
err := jsonIter.Unmarshal(response.Aggregations, &statisticsResponse)
if err != nil && response.Aggregations != nil {
klog.Errorln(err)
queryResult.Status = http.StatusInternalServerError
queryResult.Error = err.Error()
return &queryResult
klog.Error(err)
return nil, err
}
queryResult.Statistics = &v1alpha2.StatisticsResult{Containers: statisticsResponse.ContainerCount.Value, Logs: c.client.GetTotalHitCount(response.Hits.Total)}
case OperationHistogram:
case v1alpha2.OperationHistogram:
var histogramResult v1alpha2.HistogramResult
histogramResult.Total = c.client.GetTotalHitCount(response.Hits.Total)
histogramResult.StartTime = calcTimestamp(param.StartTime)
histogramResult.EndTime = calcTimestamp(param.EndTime)
histogramResult.Interval = param.Interval
var histogramAggregations v1alpha2.HistogramAggregations
err = jsonIter.Unmarshal(response.Aggregations, &histogramAggregations)
if err != nil && response.Aggregations != nil {
klog.Errorln(err)
queryResult.Status = http.StatusInternalServerError
queryResult.Error = err.Error()
return &queryResult
klog.Error(err)
return nil, err
}
for _, histogram := range histogramAggregations.HistogramAggregation.Histograms {
var histogramRecord v1alpha2.HistogramRecord
@@ -390,58 +334,61 @@ func (c *ElasticSearchClient) parseQueryResult(operation int, param v1alpha2.Que
}
queryResult.Histogram = &histogramResult
case v1alpha2.OperationExport:
var readResult v1alpha2.ReadResult
readResult.ScrollID = response.ScrollId
for _, hit := range response.Hits.Hits {
var logRecord v1alpha2.LogRecord
logRecord.Log = hit.Source.Log
readResult.Records = append(readResult.Records, logRecord)
}
queryResult.Read = &readResult
}
queryResult.Status = http.StatusOK
return &queryResult
return &queryResult, nil
}
func (c *ElasticSearchClient) Query(param v1alpha2.QueryParameters) *v1alpha2.QueryResult {
func (c *ElasticSearchClient) Query(param v1alpha2.QueryParameters) (*v1alpha2.QueryResult, error) {
var queryResult = new(v1alpha2.QueryResult)
if param.NamespaceNotFound {
queryResult = new(v1alpha2.QueryResult)
queryResult.Status = http.StatusOK
switch param.Operation {
case "statistics":
case v1alpha2.OperationStatistics:
queryResult.Statistics = new(v1alpha2.StatisticsResult)
case "histogram":
queryResult.Histogram = &v1alpha2.HistogramResult{
StartTime: calcTimestamp(param.StartTime),
EndTime: calcTimestamp(param.EndTime),
Interval: param.Interval}
case v1alpha2.OperationHistogram:
queryResult.Histogram = new(v1alpha2.HistogramResult)
default:
queryResult.Read = new(v1alpha2.ReadResult)
}
return queryResult
return queryResult, nil
}
if c.client == nil {
queryResult.Status = http.StatusBadRequest
queryResult.Error = "can not create elastic search client"
return queryResult
}
operation, query, err := createQueryRequest(param)
query, err := createQueryRequest(param)
if err != nil {
klog.Errorln(err)
queryResult.Status = http.StatusInternalServerError
queryResult.Error = err.Error()
return queryResult
klog.Error(err)
return nil, err
}
body, err := c.client.Search(query)
body, err := c.client.Search(query, param.ScrollTimeout)
if err != nil {
klog.Errorln(err)
queryResult = new(v1alpha2.QueryResult)
queryResult.Status = http.StatusInternalServerError
queryResult.Error = err.Error()
return queryResult
klog.Error(err)
return nil, err
}
queryResult = c.parseQueryResult(operation, param, body)
return queryResult
return c.parseQueryResult(param.Operation, body)
}
func (c *ElasticSearchClient) Scroll(scrollId string) (*v1alpha2.QueryResult, error) {
body, err := c.client.Scroll(scrollId, time.Minute)
if err != nil {
klog.Error(err)
return nil, err
}
return c.parseQueryResult(v1alpha2.OperationExport, body)
}
func (c *ElasticSearchClient) ClearScroll(scrollId string) {
c.client.ClearScroll(scrollId)
}