refactor logging options

This commit is contained in:
Jeff
2019-09-17 15:52:35 +08:00
parent fa1e62f6ac
commit f61c5e09ba
20 changed files with 444 additions and 259 deletions

View File

@@ -13,13 +13,17 @@ limitations under the License.
package esclient
import (
"context"
"encoding/json"
"fmt"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/api/logging/v1alpha2"
v5 "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch/versions/v5"
v6 "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch/versions/v6"
v7 "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch/versions/v7"
"net/http"
"strconv"
"strings"
"sync"
"time"
"github.com/json-iterator/go"
@@ -49,48 +53,109 @@ const (
fieldContainerNameKeyword = "kubernetes.container_name.keyword"
)
var jsonIter = jsoniter.ConfigCompatibleWithStandardLibrary
var (
mu sync.Mutex
config *Config
client Client
const (
ElasticV5 = "5"
ElasticV6 = "6"
ElasticV7 = "7"
)
func (cfg *Config) WriteESConfigs() {
mu.Lock()
defer mu.Unlock()
var jsonIter = jsoniter.ConfigCompatibleWithStandardLibrary
config = cfg
if err := detectVersionMajor(config); err != nil {
klog.Errorln(err)
client = nil
return
}
client = NewForConfig(config)
type ElasticSearchClient struct {
client Client
}
func createQueryRequest(param QueryParameters) (int, []byte, error) {
var request Request
var mainBoolQuery BoolFilter
func NewLoggingClient(options *ElasticSearchOptions) (*ElasticSearchClient, error) {
version := "6"
esClient := &ElasticSearchClient{}
if options.Version == "" {
var err error
version, err = detectVersionMajor(options.Host)
if err != nil {
return nil, err
}
}
if options.LogstashFormat {
if options.LogstashPrefix != "" {
options.Index = options.LogstashPrefix
} else {
options.Index = "logstash"
}
}
switch version {
case ElasticV5:
esClient.client = v5.New(options.Host, options.Index)
case ElasticV6:
esClient.client = v6.New(options.Host, options.Index)
case ElasticV7:
esClient.client = v7.New(options.Host, options.Index)
default:
return nil, fmt.Errorf("unsupported elasticsearch version %s", version)
}
return esClient, nil
}
func (c *ElasticSearchClient) ES() *Client {
return &c.client
}
func detectVersionMajor(host string) (string, error) {
// Info APIs are backward compatible with versions of v5.x, v6.x and v7.x
es := v6.New(host, "")
res, err := es.Client.Info(
es.Client.Info.WithContext(context.Background()),
)
if err != nil {
return "", err
}
defer res.Body.Close()
var b map[string]interface{}
if err = json.NewDecoder(res.Body).Decode(&b); err != nil {
return "", err
}
if res.IsError() {
// Print the response status and error information.
e, _ := b["error"].(map[string]interface{})
return "", fmt.Errorf("[%s] %s: %s", res.Status(), e["type"], e["reason"])
}
// get the major version
version, _ := b["version"].(map[string]interface{})
number, _ := version["number"].(string)
if number == "" {
return "", fmt.Errorf("failed to detect elastic version number")
}
v := strings.Split(number, ".")[0]
return v, nil
}
func createQueryRequest(param v1alpha2.QueryParameters) (int, []byte, error) {
var request v1alpha2.Request
var mainBoolQuery v1alpha2.BoolFilter
if len(param.NamespaceWithCreationTime) != 0 {
var boolShould BoolShould
var boolShould v1alpha2.BoolShould
for namespace, creationTime := range param.NamespaceWithCreationTime {
var boolFilter BoolFilter
var boolFilter v1alpha2.BoolFilter
matchPhrase := MatchPhrase{MatchPhrase: map[string]string{fieldNamespaceNameKeyword: namespace}}
rangeQuery := RangeQuery{RangeSpec{TimeRange{creationTime, ""}}}
matchPhrase := v1alpha2.MatchPhrase{MatchPhrase: map[string]string{fieldNamespaceNameKeyword: namespace}}
rangeQuery := v1alpha2.RangeQuery{RangeSpec: v1alpha2.RangeSpec{TimeRange: v1alpha2.TimeRange{Gte: creationTime, Lte: ""}}}
boolFilter.Filter = append(boolFilter.Filter, matchPhrase)
boolFilter.Filter = append(boolFilter.Filter, rangeQuery)
boolShould.Should = append(boolShould.Should, BoolQuery{Bool: boolFilter})
boolShould.Should = append(boolShould.Should, v1alpha2.BoolQuery{Bool: boolFilter})
}
boolShould.MinimumShouldMatch = 1
mainBoolQuery.Filter = append(mainBoolQuery.Filter, BoolQuery{Bool: boolShould})
mainBoolQuery.Filter = append(mainBoolQuery.Filter, v1alpha2.BoolQuery{Bool: boolShould})
}
if param.WorkloadFilter != nil {
boolQuery := makeBoolShould(regexpQuery, fieldPodNameKeyword, param.WorkloadFilter)
@@ -122,15 +187,15 @@ func createQueryRequest(param QueryParameters) (int, []byte, error) {
mainBoolQuery.Filter = append(mainBoolQuery.Filter, boolQuery)
}
rangeQuery := RangeQuery{RangeSpec{TimeRange{param.StartTime, param.EndTime}}}
rangeQuery := v1alpha2.RangeQuery{RangeSpec: v1alpha2.RangeSpec{TimeRange: v1alpha2.TimeRange{Gte: param.StartTime, Lte: param.EndTime}}}
mainBoolQuery.Filter = append(mainBoolQuery.Filter, rangeQuery)
var operation int
if param.Operation == "statistics" {
operation = OperationStatistics
containerAgg := AggField{"kubernetes.docker_id.keyword"}
statisticAggs := StatisticsAggs{ContainerAgg{containerAgg}}
containerAgg := v1alpha2.AggField{Field: "kubernetes.docker_id.keyword"}
statisticAggs := v1alpha2.StatisticsAggs{ContainerAgg: v1alpha2.ContainerAgg{Cardinality: containerAgg}}
request.Aggs = statisticAggs
request.Size = 0
} else if param.Operation == "histogram" {
@@ -142,7 +207,7 @@ func createQueryRequest(param QueryParameters) (int, []byte, error) {
interval = "15m"
}
param.Interval = interval
request.Aggs = HistogramAggs{HistogramAgg{DateHistogram{"time", interval}}}
request.Aggs = v1alpha2.HistogramAggs{HistogramAgg: v1alpha2.HistogramAgg{DateHistogram: v1alpha2.DateHistogram{Field: "time", Interval: interval}}}
request.Size = 0
} else {
operation = OperationQuery
@@ -154,25 +219,25 @@ func createQueryRequest(param QueryParameters) (int, []byte, error) {
} else {
order = "desc"
}
request.Sorts = append(request.Sorts, Sort{Order{order}})
request.Sorts = append(request.Sorts, v1alpha2.Sort{Order: v1alpha2.Order{Order: order}})
var mainHighLight MainHighLight
mainHighLight.Fields = append(mainHighLight.Fields, LogHighLightField{})
mainHighLight.Fields = append(mainHighLight.Fields, NamespaceHighLightField{})
mainHighLight.Fields = append(mainHighLight.Fields, PodHighLightField{})
mainHighLight.Fields = append(mainHighLight.Fields, ContainerHighLightField{})
var mainHighLight v1alpha2.MainHighLight
mainHighLight.Fields = append(mainHighLight.Fields, v1alpha2.LogHighLightField{})
mainHighLight.Fields = append(mainHighLight.Fields, v1alpha2.NamespaceHighLightField{})
mainHighLight.Fields = append(mainHighLight.Fields, v1alpha2.PodHighLightField{})
mainHighLight.Fields = append(mainHighLight.Fields, v1alpha2.ContainerHighLightField{})
mainHighLight.FragmentSize = 0
request.MainHighLight = mainHighLight
}
request.MainQuery = BoolQuery{mainBoolQuery}
request.MainQuery = v1alpha2.BoolQuery{Bool: mainBoolQuery}
queryRequest, err := json.Marshal(request)
return operation, queryRequest, err
}
func makeBoolShould(queryType int, field string, list []string) BoolQuery {
func makeBoolShould(queryType int, field string, list []string) v1alpha2.BoolQuery {
var should []interface{}
for _, phrase := range list {
@@ -180,18 +245,18 @@ func makeBoolShould(queryType int, field string, list []string) BoolQuery {
switch queryType {
case matchPhrase:
q = MatchPhrase{MatchPhrase: map[string]string{field: phrase}}
q = v1alpha2.MatchPhrase{MatchPhrase: map[string]string{field: phrase}}
case matchPhrasePrefix:
q = MatchPhrasePrefix{MatchPhrasePrefix: map[string]string{field: phrase}}
q = v1alpha2.MatchPhrasePrefix{MatchPhrasePrefix: map[string]string{field: phrase}}
case regexpQuery:
q = RegexpQuery{Regexp: map[string]string{field: makePodNameRegexp(phrase)}}
q = v1alpha2.RegexpQuery{Regexp: map[string]string{field: makePodNameRegexp(phrase)}}
}
should = append(should, q)
}
return BoolQuery{
Bool: BoolShould{
return v1alpha2.BoolQuery{
Bool: v1alpha2.BoolShould{
Should: should,
MinimumShouldMatch: 1,
},
@@ -244,10 +309,10 @@ func calcTimestamp(input string) int64 {
return ret
}
func parseQueryResult(operation int, param QueryParameters, body []byte) *QueryResult {
var queryResult QueryResult
func (c *ElasticSearchClient) parseQueryResult(operation int, param v1alpha2.QueryParameters, body []byte) *v1alpha2.QueryResult {
var queryResult v1alpha2.QueryResult
var response Response
var response v1alpha2.Response
err := jsonIter.Unmarshal(body, &response)
if err != nil {
klog.Errorln(err)
@@ -273,12 +338,12 @@ func parseQueryResult(operation int, param QueryParameters, body []byte) *QueryR
switch operation {
case OperationQuery:
var readResult ReadResult
readResult.Total = client.GetTotalHitCount(response.Hits.Total)
var readResult v1alpha2.ReadResult
readResult.Total = c.client.GetTotalHitCount(response.Hits.Total)
readResult.From = param.From
readResult.Size = param.Size
for _, hit := range response.Hits.Hits {
var logRecord LogRecord
var logRecord v1alpha2.LogRecord
logRecord.Time = calcTimestamp(hit.Source.Time)
logRecord.Log = hit.Source.Log
logRecord.Namespace = hit.Source.Kubernetes.Namespace
@@ -291,7 +356,7 @@ func parseQueryResult(operation int, param QueryParameters, body []byte) *QueryR
queryResult.Read = &readResult
case OperationStatistics:
var statisticsResponse StatisticsResponseAggregations
var statisticsResponse v1alpha2.StatisticsResponseAggregations
err := jsonIter.Unmarshal(response.Aggregations, &statisticsResponse)
if err != nil && response.Aggregations != nil {
klog.Errorln(err)
@@ -299,17 +364,17 @@ func parseQueryResult(operation int, param QueryParameters, body []byte) *QueryR
queryResult.Error = err.Error()
return &queryResult
}
queryResult.Statistics = &StatisticsResult{Containers: statisticsResponse.ContainerCount.Value, Logs: client.GetTotalHitCount(response.Hits.Total)}
queryResult.Statistics = &v1alpha2.StatisticsResult{Containers: statisticsResponse.ContainerCount.Value, Logs: c.client.GetTotalHitCount(response.Hits.Total)}
case OperationHistogram:
var histogramResult HistogramResult
histogramResult.Total = client.GetTotalHitCount(response.Hits.Total)
var histogramResult v1alpha2.HistogramResult
histogramResult.Total = c.client.GetTotalHitCount(response.Hits.Total)
histogramResult.StartTime = calcTimestamp(param.StartTime)
histogramResult.EndTime = calcTimestamp(param.EndTime)
histogramResult.Interval = param.Interval
var histogramAggregations HistogramAggregations
err := jsonIter.Unmarshal(response.Aggregations, &histogramAggregations)
var histogramAggregations v1alpha2.HistogramAggregations
err = jsonIter.Unmarshal(response.Aggregations, &histogramAggregations)
if err != nil && response.Aggregations != nil {
klog.Errorln(err)
queryResult.Status = http.StatusInternalServerError
@@ -317,7 +382,7 @@ func parseQueryResult(operation int, param QueryParameters, body []byte) *QueryR
return &queryResult
}
for _, histogram := range histogramAggregations.HistogramAggregation.Histograms {
var histogramRecord HistogramRecord
var histogramRecord v1alpha2.HistogramRecord
histogramRecord.Time = histogram.Time
histogramRecord.Count = histogram.Count
@@ -332,30 +397,30 @@ func parseQueryResult(operation int, param QueryParameters, body []byte) *QueryR
return &queryResult
}
func Query(param QueryParameters) *QueryResult {
func (c *ElasticSearchClient) Query(param v1alpha2.QueryParameters) *v1alpha2.QueryResult {
var queryResult = new(QueryResult)
var queryResult = new(v1alpha2.QueryResult)
if param.NamespaceNotFound {
queryResult = new(QueryResult)
queryResult = new(v1alpha2.QueryResult)
queryResult.Status = http.StatusOK
switch param.Operation {
case "statistics":
queryResult.Statistics = new(StatisticsResult)
queryResult.Statistics = new(v1alpha2.StatisticsResult)
case "histogram":
queryResult.Histogram = &HistogramResult{
queryResult.Histogram = &v1alpha2.HistogramResult{
StartTime: calcTimestamp(param.StartTime),
EndTime: calcTimestamp(param.EndTime),
Interval: param.Interval}
default:
queryResult.Read = new(ReadResult)
queryResult.Read = new(v1alpha2.ReadResult)
}
return queryResult
}
if client == nil {
if c.client == nil {
queryResult.Status = http.StatusBadRequest
queryResult.Error = fmt.Sprintf("Invalid elasticsearch address: host=%s, port=%s", config.Host, config.Port)
queryResult.Error = "can not create elastic search client"
return queryResult
}
@@ -367,16 +432,16 @@ func Query(param QueryParameters) *QueryResult {
return queryResult
}
body, err := client.Search(query)
body, err := c.client.Search(query)
if err != nil {
klog.Errorln(err)
queryResult = new(QueryResult)
queryResult = new(v1alpha2.QueryResult)
queryResult.Status = http.StatusInternalServerError
queryResult.Error = err.Error()
return queryResult
}
queryResult = parseQueryResult(operation, param, body)
queryResult = c.parseQueryResult(operation, param, body)
return queryResult
}