[WIP] logging refactor (#1794)

* refactor logging

Signed-off-by: huanggze <loganhuang@yunify.com>

* refactor logging

Signed-off-by: huanggze <loganhuang@yunify.com>
This commit is contained in:
Guangzhe Huang
2020-03-02 10:53:43 +08:00
committed by GitHub
parent a9e1183f3c
commit 6c6bfb2677
60 changed files with 1582 additions and 2966 deletions

View File

@@ -0,0 +1,277 @@
package elasticsearch
import (
"fmt"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/simple/client/logging"
"time"
)
const (
podNameMaxLength = 63
podNameSuffixLength = 6 // 5 characters + 1 hyphen
replicaSetSuffixMaxLength = 11 // max 10 characters + 1 hyphen
)
type bodyBuilder struct {
Body
}
func newBodyBuilder() *bodyBuilder {
return &bodyBuilder{}
}
func (bb *bodyBuilder) bytes() ([]byte, error) {
return json.Marshal(bb.Body)
}
// The mainBody func builds api body for query.
// TODO: Should use an elegant pakcage for building query body, but `elastic/go-elasticsearch` doesn't provide it currently.
//
// Example:
// GET kapis/logging.kubesphere.io/v1alpha2/cluster?start_time=0&end_time=156576063993&namespaces=kubesphere-system&pod_query=ks-apiserver
// -----
//{
// "from":0,
// "size":10,
// "sort":[
// {
// "time": "desc"
// }
// ],
// "query":{
// "bool":{
// "filter":[
// {
// "bool":{
// "should":[
// {
// "bool":{
// "filter":[
// {
// "match_phrase":{
// "kubernetes.namespace_name.keyword":"kubesphere-system"
// }
// },
// {
// "range":{
// "time":{
// "gte":"1572315987000"
// }
// }
// }
// ]
// }
// }
// ],
// "minimum_should_match":1
// }
// },
// {
// "bool":{
// "should":[
// {
// "match_phrase_prefix":{
// "kubernetes.pod_name":"ks-apiserver"
// }
// }
// ],
// "minimum_should_match":1
// }
// },
// {
// "range":{
// "time":{
// "gte":"0",
// "lte":"156576063993"
// }
// }
// }
// ]
// }
// }
//}
func (bb *bodyBuilder) mainBool(sf logging.SearchFilter) *bodyBuilder {
var ms []Match
// literal matching
if len(sf.NamespaceFilter) != 0 {
var b Bool
for ns := range sf.NamespaceFilter {
match := Match{
Bool: &Bool{
Filter: []Match{
{
MatchPhrase: map[string]string{
"kubernetes.namespace_name.keyword": ns,
},
},
{
Range: &Range{
Time: &Time{
Gte: func() *time.Time { t := sf.NamespaceFilter[ns]; return &t }(),
},
},
},
},
},
}
b.Should = append(b.Should, match)
}
b.MinimumShouldMatch = 1
ms = append(ms, Match{Bool: &b})
}
if sf.WorkloadFilter != nil {
var b Bool
for _, wk := range sf.WorkloadFilter {
b.Should = append(b.Should, Match{Regexp: map[string]string{"kubernetes.pod_name.keyword": podNameRegexp(wk)}})
}
b.MinimumShouldMatch = 1
ms = append(ms, Match{Bool: &b})
}
if sf.PodFilter != nil {
var b Bool
for _, po := range sf.PodFilter {
b.Should = append(b.Should, Match{MatchPhrase: map[string]string{"kubernetes.pod_name.keyword": po}})
}
b.MinimumShouldMatch = 1
ms = append(ms, Match{Bool: &b})
}
if sf.ContainerFilter != nil {
var b Bool
for _, c := range sf.ContainerFilter {
b.Should = append(b.Should, Match{MatchPhrase: map[string]string{"kubernetes.container_name.keyword": c}})
}
b.MinimumShouldMatch = 1
ms = append(ms, Match{Bool: &b})
}
// fuzzy matching
if sf.WorkloadSearch != nil {
var b Bool
for _, wk := range sf.WorkloadSearch {
b.Should = append(b.Should, Match{MatchPhrasePrefix: map[string]string{"kubernetes.pod_name": wk}})
}
b.MinimumShouldMatch = 1
ms = append(ms, Match{Bool: &b})
}
if sf.PodSearch != nil {
var b Bool
for _, po := range sf.PodSearch {
b.Should = append(b.Should, Match{MatchPhrasePrefix: map[string]string{"kubernetes.pod_name": po}})
}
b.MinimumShouldMatch = 1
ms = append(ms, Match{Bool: &b})
}
if sf.ContainerSearch != nil {
var b Bool
for _, c := range sf.ContainerSearch {
b.Should = append(b.Should, Match{MatchPhrasePrefix: map[string]string{"kubernetes.container_name": c}})
}
b.MinimumShouldMatch = 1
ms = append(ms, Match{Bool: &b})
}
if sf.LogSearch != nil {
var b Bool
for _, l := range sf.LogSearch {
b.Should = append(b.Should, Match{MatchPhrasePrefix: map[string]string{"log": l}})
}
b.MinimumShouldMatch = 1
ms = append(ms, Match{Bool: &b})
}
if !sf.Starttime.IsZero() || !sf.Endtime.IsZero() {
fromTo := Match{
Range: &Range{&Time{
Gte: &sf.Starttime,
Lte: &sf.Endtime,
}},
}
ms = append(ms, fromTo)
}
bb.Body.Query = &Query{Bool{Filter: ms}}
return bb
}
func (bb *bodyBuilder) cardinalityAggregation() *bodyBuilder {
bb.Body.Aggs = &Aggs{
CardinalityAggregation: &CardinalityAggregation{
&Cardinality{
Field: "kubernetes.docker_id.keyword",
},
},
}
return bb
}
func (bb *bodyBuilder) dateHistogramAggregation(interval string) *bodyBuilder {
if interval == "" {
interval = "15m"
}
bb.Body.Aggs = &Aggs{
DateHistogramAggregation: &DateHistogramAggregation{
&DateHistogram{
Field: "time",
Interval: interval,
},
},
}
return bb
}
func (bb *bodyBuilder) from(n int64) *bodyBuilder {
bb.From = n
return bb
}
func (bb *bodyBuilder) size(n int64) *bodyBuilder {
bb.Size = n
return bb
}
func (bb *bodyBuilder) sort(o string) *bodyBuilder {
if o != "asc" {
o = "desc"
}
bb.Sorts = []map[string]string{{"time": o}}
return bb
}
func podNameRegexp(workloadName string) string {
var regexp string
if len(workloadName) <= podNameMaxLength-replicaSetSuffixMaxLength-podNameSuffixLength {
// match deployment pods, eg. <deploy>-579dfbcddd-24znw
// replicaset rand string is limited to vowels
// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/rand/rand.go#L83
regexp += workloadName + "-[bcdfghjklmnpqrstvwxz2456789]{1,10}-[a-z0-9]{5}|"
// match statefulset pods, eg. <sts>-0
regexp += workloadName + "-[0-9]+|"
// match pods of daemonset or job, eg. <ds>-29tdk, <job>-5xqvl
regexp += workloadName + "-[a-z0-9]{5}"
} else if len(workloadName) <= podNameMaxLength-podNameSuffixLength {
replicaSetSuffixLength := podNameMaxLength - podNameSuffixLength - len(workloadName)
regexp += fmt.Sprintf("%s%d%s", workloadName+"-[bcdfghjklmnpqrstvwxz2456789]{", replicaSetSuffixLength, "}[a-z0-9]{5}|")
regexp += workloadName + "-[0-9]+|"
regexp += workloadName + "-[a-z0-9]{5}"
} else {
// Rand suffix may overwrites the workload name if the name is too long
// This won't happen for StatefulSet because long name will cause ReplicaSet fails during StatefulSet creation.
regexp += workloadName[:podNameMaxLength-podNameSuffixLength+1] + "[a-z0-9]{5}|"
regexp += workloadName + "-[0-9]+"
}
return regexp
}
func parseResponse(body []byte) (Response, error) {
var res Response
err := json.Unmarshal(body, &res)
if err != nil {
klog.Error(err)
return Response{}, err
}
return res, nil
}

View File

@@ -0,0 +1,122 @@
package elasticsearch
import (
"github.com/google/go-cmp/cmp"
"kubesphere.io/kubesphere/pkg/simple/client/logging"
"testing"
"time"
)
func TestMainBool(t *testing.T) {
var tests = []struct {
description string
searchFilter logging.SearchFilter
expected *bodyBuilder
}{
{
description: "filter 2 namespaces",
searchFilter: logging.SearchFilter{
NamespaceFilter: map[string]time.Time{
"kubesphere-system": time.Unix(1582000000, 0),
"kubesphere-logging-system": time.Unix(1582969999, 0),
},
},
expected: &bodyBuilder{Body{
Query: &Query{
Bool: Bool{
Filter: []Match{
{
Bool: &Bool{
Should: []Match{
{
Bool: &Bool{
Filter: []Match{
{
MatchPhrase: map[string]string{"kubernetes.namespace_name.keyword": "kubesphere-system"},
},
{
Range: &Range{&Time{Gte: func() *time.Time { t := time.Unix(1582000000, 0); return &t }()}},
},
},
},
},
{
Bool: &Bool{
Filter: []Match{
{
MatchPhrase: map[string]string{"kubernetes.namespace_name.keyword": "kubesphere-logging-system"},
},
{
Range: &Range{&Time{Gte: func() *time.Time { t := time.Unix(1582969999, 0); return &t }()}},
},
},
},
},
},
MinimumShouldMatch: 1,
},
},
},
},
},
}},
},
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
body, err := newBodyBuilder().mainBool(test.searchFilter).bytes()
expected, _ := test.expected.bytes()
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(body, expected); diff != "" {
t.Fatalf("%T differ (-got, +want): %s", expected, diff)
}
})
}
}
func TestCardinalityAggregation(t *testing.T) {
var test = struct {
description string
searchFilter logging.SearchFilter
expected *bodyBuilder
}{
description: "add cardinality aggregation",
searchFilter: logging.SearchFilter{
LogSearch: []string{"info"},
},
expected: &bodyBuilder{Body{
Query: &Query{
Bool: Bool{
Filter: []Match{
{
Bool: &Bool{
Should: []Match{
{
MatchPhrasePrefix: map[string]string{"log": "info"},
},
},
MinimumShouldMatch: 1,
},
},
},
},
},
Aggs: &Aggs{
CardinalityAggregation: &CardinalityAggregation{
Cardinality: &Cardinality{Field: "kubernetes.docker_id.keyword"},
},
},
}},
}
t.Run(test.description, func(t *testing.T) {
body := newBodyBuilder().mainBool(test.searchFilter).cardinalityAggregation()
if diff := cmp.Diff(body, test.expected); diff != "" {
t.Fatalf("%T differ (-got, +want): %s", test.expected, diff)
}
})
}

View File

@@ -0,0 +1,127 @@
package elasticsearch
import "time"
// --------------------------------------------- Request Body ---------------------------------------------
// More info: https://www.elastic.co/guide/en/elasticsearch/reference/current/getting-started-search-API.html
type Body struct {
From int64 `json:"from,omitempty"`
Size int64 `json:"size,omitempty"`
Sorts []map[string]string `json:"sort,omitempty"`
*Query `json:"query,omitempty"`
*Aggs `json:"aggs,omitempty"`
}
type Query struct {
Bool `json:"bool,omitempty"`
}
// Example:
// {bool: {filter: <[]Match>}}
// {bool: {should: <[]Match>, minimum_should_match: 1}}
type Bool struct {
Filter []Match `json:"filter,omitempty"`
Should []Match `json:"should,omitempty"`
MinimumShouldMatch int32 `json:"minimum_should_match,omitempty"`
}
// Example: []Match
// [
// {
// bool: <Bool>
// },
// {
// match_phrase: {
// <string>: <string>
// }
// },
// ...
// ]
type Match struct {
*Bool `json:"bool,omitempty"`
MatchPhrase map[string]string `json:"match_phrase,omitempty"`
MatchPhrasePrefix map[string]string `json:"match_phrase_prefix,omitempty"`
Regexp map[string]string `json:"regexp,omitempty"`
*Range `json:"range,omitempty"`
}
type Range struct {
*Time `json:"time,omitempty"`
}
type Time struct {
Gte *time.Time `json:"gte,omitempty"`
Lte *time.Time `json:"lte,omitempty"`
}
type Aggs struct {
*CardinalityAggregation `json:"container_count,omitempty"`
*DateHistogramAggregation `json:"log_count_over_time,omitempty"`
}
type CardinalityAggregation struct {
*Cardinality `json:"cardinality,omitempty"`
}
type Cardinality struct {
Field string `json:"field,omitempty"`
}
type DateHistogramAggregation struct {
*DateHistogram `json:"date_histogram,omitempty"`
}
type DateHistogram struct {
Field string `json:"field,omitempty"`
Interval string `json:"interval,omitempty"`
}
// --------------------------------------------- Response Body ---------------------------------------------
type Response struct {
ScrollId string `json:"_scroll_id,omitempty"`
Hits `json:"hits,omitempty"`
Aggregations `json:"aggregations,omitempty"`
}
type Hits struct {
Total interface{} `json:"total"` // `As of Elasticsearch v7.x, hits.total is changed incompatibly
AllHits []Hit `json:"hits"`
}
type Hit struct {
Source `json:"_source"`
Sort []int64 `json:"sort"`
}
type Source struct {
Log string `json:"log"`
Time string `json:"time"`
Kubernetes `json:"kubernetes"`
}
type Kubernetes struct {
Namespace string `json:"namespace_name"`
Pod string `json:"pod_name"`
Container string `json:"container_name"`
Host string `json:"host"`
}
type Aggregations struct {
ContainerCount `json:"container_count"`
LogCountOverTime `json:"log_count_over_time"`
}
type ContainerCount struct {
Value int64 `json:"value"`
}
type LogCountOverTime struct {
Buckets []Bucket `json:"buckets"`
}
type Bucket struct {
Time int64 `json:"key"`
Count int64 `json:"doc_count"`
}

View File

@@ -0,0 +1,263 @@
package elasticsearch
import (
"bytes"
"context"
"fmt"
jsoniter "github.com/json-iterator/go"
"io"
"kubesphere.io/kubesphere/pkg/simple/client/logging"
v5 "kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v5"
v6 "kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v6"
v7 "kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v7"
"kubesphere.io/kubesphere/pkg/utils/stringutils"
"strings"
)
const (
ElasticV5 = "5"
ElasticV6 = "6"
ElasticV7 = "7"
)
var json = jsoniter.ConfigCompatibleWithStandardLibrary
// Elasticsearch implement logging interface
type Elasticsearch struct {
c client
}
// versioned es client interface
type client interface {
// Perform Search API
Search(body []byte) ([]byte, error)
Scroll(id string) ([]byte, error)
ClearScroll(id string)
GetTotalHitCount(v interface{}) int64
}
func NewElasticsearch(options *Options) (*Elasticsearch, error) {
var version, index string
es := &Elasticsearch{}
if options.Version == "" {
var err error
version, err = detectVersionMajor(options.Host)
if err != nil {
return nil, err
}
} else {
version = options.Version
}
if options.IndexPrefix != "" {
index = options.IndexPrefix
} else {
index = "logstash"
}
switch version {
case ElasticV5:
es.c = v5.New(options.Host, index)
case ElasticV6:
es.c = v6.New(options.Host, index)
case ElasticV7:
es.c = v7.New(options.Host, index)
default:
return nil, fmt.Errorf("unsupported elasticsearch version %s", version)
}
return es, nil
}
func (es *Elasticsearch) ES() *client {
return &es.c
}
func detectVersionMajor(host string) (string, error) {
// Info APIs are backward compatible with versions of v5.x, v6.x and v7.x
es := v6.New(host, "")
res, err := es.Client.Info(
es.Client.Info.WithContext(context.Background()),
)
if err != nil {
return "", err
}
defer res.Body.Close()
var b map[string]interface{}
if err = json.NewDecoder(res.Body).Decode(&b); err != nil {
return "", err
}
if res.IsError() {
// Print the response status and error information.
e, _ := b["error"].(map[string]interface{})
return "", fmt.Errorf("[%s] type: %v, reason: %v", res.Status(), e["type"], e["reason"])
}
// get the major version
version, _ := b["version"].(map[string]interface{})
number, _ := version["number"].(string)
if number == "" {
return "", fmt.Errorf("failed to detect elastic version number")
}
v := strings.Split(number, ".")[0]
return v, nil
}
func (es Elasticsearch) GetCurrentStats(sf logging.SearchFilter) (logging.Statistics, error) {
body, err := newBodyBuilder().
mainBool(sf).
cardinalityAggregation().
bytes()
if err != nil {
return logging.Statistics{}, err
}
b, err := es.c.Search(body)
if err != nil {
return logging.Statistics{}, err
}
res, err := parseResponse(b)
if err != nil {
return logging.Statistics{}, err
}
return logging.Statistics{
Containers: res.Value,
Logs: es.c.GetTotalHitCount(res.Total),
},
nil
}
func (es Elasticsearch) CountLogsByInterval(sf logging.SearchFilter, interval string) (logging.Histogram, error) {
body, err := newBodyBuilder().
mainBool(sf).
dateHistogramAggregation(interval).
bytes()
if err != nil {
return logging.Histogram{}, err
}
b, err := es.c.Search(body)
if err != nil {
return logging.Histogram{}, err
}
res, err := parseResponse(b)
if err != nil {
return logging.Histogram{}, err
}
var h logging.Histogram
h.Total = es.c.GetTotalHitCount(res.Total)
for _, b := range res.Buckets {
h.Buckets = append(h.Buckets, logging.Bucket{
Time: b.Time,
Count: b.Count,
})
}
return h, nil
}
func (es Elasticsearch) SearchLogs(sf logging.SearchFilter, f, s int64, o string) (logging.Logs, error) {
body, err := newBodyBuilder().
mainBool(sf).
from(f).
size(s).
sort(o).
bytes()
if err != nil {
return logging.Logs{}, err
}
b, err := es.c.Search(body)
if err != nil {
return logging.Logs{}, err
}
res, err := parseResponse(b)
if err != nil {
return logging.Logs{}, err
}
var l logging.Logs
l.Total = es.c.GetTotalHitCount(res.Total)
for _, hit := range res.AllHits {
l.Records = append(l.Records, logging.Record{
Log: hit.Log,
Time: hit.Time,
Namespace: hit.Namespace,
Pod: hit.Pod,
Container: hit.Container,
})
}
return l, nil
}
func (es Elasticsearch) ExportLogs(sf logging.SearchFilter, w io.Writer) error {
var id string
var from int64 = 0
var size int64 = 1000
res, err := es.SearchLogs(sf, from, size, "desc")
defer es.ClearScroll(id)
if err != nil {
return err
}
if res.Records == nil || len(res.Records) == 0 {
return nil
}
// limit to retrieve max 100k records
for i := 0; i < 100; i++ {
res, id, err = es.scroll(id)
if err != nil {
return err
}
if res.Records == nil || len(res.Records) == 0 {
return nil
}
output := new(bytes.Buffer)
for _, r := range res.Records {
output.WriteString(fmt.Sprintf(`%s`, stringutils.StripAnsi(r.Log)))
}
_, err = io.Copy(w, output)
if err != nil {
return err
}
}
return nil
}
func (es *Elasticsearch) scroll(id string) (logging.Logs, string, error) {
b, err := es.c.Scroll(id)
if err != nil {
return logging.Logs{}, id, err
}
res, err := parseResponse(b)
if err != nil {
return logging.Logs{}, id, err
}
var l logging.Logs
for _, hit := range res.AllHits {
l.Records = append(l.Records, logging.Record{
Log: hit.Log,
})
}
return l, res.ScrollId, nil
}
func (es *Elasticsearch) ClearScroll(id string) {
if id != "" {
es.c.ClearScroll(id)
}
}

View File

@@ -0,0 +1,368 @@
package elasticsearch
import (
"github.com/google/go-cmp/cmp"
"kubesphere.io/kubesphere/pkg/simple/client/logging"
v5 "kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v5"
v6 "kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v6"
v7 "kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v7"
"net/http"
"net/http/httptest"
"testing"
"time"
)
func MockElasticsearchService(pattern string, fakeResp string) *httptest.Server {
mux := http.NewServeMux()
mux.HandleFunc(pattern, func(res http.ResponseWriter, req *http.Request) {
res.Write([]byte(fakeResp))
})
return httptest.NewServer(mux)
}
func TestDetectVersionMajor(t *testing.T) {
var tests = []struct {
description string
fakeResp string
expected string
expectedError bool
}{
{
description: "detect es 6.x version number",
fakeResp: `{
"name" : "elasticsearch-logging-data-0",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "uLm0838MSd60T1XEh5P2Qg",
"version" : {
"number" : "6.7.0",
"build_flavor" : "oss",
"build_type" : "docker",
"build_hash" : "8453f77",
"build_date" : "2019-03-21T15:32:29.844721Z",
"build_snapshot" : false,
"lucene_version" : "7.7.0",
"minimum_wire_compatibility_version" : "5.6.0",
"minimum_index_compatibility_version" : "5.0.0"
},
"tagline" : "You Know, for Search"
}`,
expected: ElasticV6,
expectedError: false,
},
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
es := MockElasticsearchService("/", test.fakeResp)
defer es.Close()
v, err := detectVersionMajor(es.URL)
if err == nil && test.expectedError {
t.Fatalf("expected error while got nothing")
} else if err != nil && !test.expectedError {
t.Fatal(err)
}
if v != test.expected {
t.Fatalf("expected get version %s, but got %s", test.expected, v)
}
})
}
}
func TestGetCurrentStats(t *testing.T) {
var tests = []struct {
description string
searchFilter logging.SearchFilter
fakeVersion string
fakeResp string
expected logging.Statistics
expectedError bool
}{
{
description: "[es 6.x] run as admin",
searchFilter: logging.SearchFilter{},
fakeVersion: ElasticV6,
fakeResp: `{
"took": 171,
"timed_out": false,
"_shards": {
"total": 10,
"successful": 10,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 241222,
"max_score": 1.0,
"hits": [
{
"_index": "ks-logstash-log-2020.02.28",
"_type": "flb_type",
"_id": "Hn1GjXABMO5aQxyNsyxy",
"_score": 1.0,
"_source": {
"@timestamp": "2020-02-28T19:25:29.015Z",
"log": " value: \"hostpath\"\n",
"time": "2020-02-28T19:25:29.015492329Z",
"kubernetes": {
"pod_name": "openebs-localpv-provisioner-55c66b57b4-jgtjc",
"namespace_name": "kube-system",
"host": "ks-allinone",
"container_name": "openebs-localpv-provisioner",
"docker_id": "cac01cd01cc79d8a8903ddbe6fbde9ac7497919a3f33c61861443703a9e08b39",
"container_hash": "25d789bcd3d12a4ba50bbb56eed1de33279d04352adbba8fd7e3b7b938aec806"
}
}
},
{
"_index": "ks-logstash-log-2020.02.28",
"_type": "flb_type",
"_id": "I31GjXABMO5aQxyNsyxy",
"_score": 1.0,
"_source": {
"@timestamp": "2020-02-28T19:25:33.103Z",
"log": "I0228 19:25:33.102631 1 controller.go:1040] provision \"kubesphere-system/redis-pvc\" class \"local\": trying to save persistentvolume \"pvc-be6d127d-9366-4ea8-b1ce-f30c1b3a447b\"\n",
"time": "2020-02-28T19:25:33.103075891Z",
"kubernetes": {
"pod_name": "openebs-localpv-provisioner-55c66b57b4-jgtjc",
"namespace_name": "kube-system",
"host": "ks-allinone",
"container_name": "openebs-localpv-provisioner",
"docker_id": "cac01cd01cc79d8a8903ddbe6fbde9ac7497919a3f33c61861443703a9e08b39",
"container_hash": "25d789bcd3d12a4ba50bbb56eed1de33279d04352adbba8fd7e3b7b938aec806"
}
}
},
{
"_index": "ks-logstash-log-2020.02.28",
"_type": "flb_type",
"_id": "JX1GjXABMO5aQxyNsyxy",
"_score": 1.0,
"_source": {
"@timestamp": "2020-02-28T19:25:33.113Z",
"log": "I0228 19:25:33.112200 1 controller.go:1088] provision \"kubesphere-system/redis-pvc\" class \"local\": succeeded\n",
"time": "2020-02-28T19:25:33.113110332Z",
"kubernetes": {
"pod_name": "openebs-localpv-provisioner-55c66b57b4-jgtjc",
"namespace_name": "kube-system",
"host": "ks-allinone",
"container_name": "openebs-localpv-provisioner",
"docker_id": "cac01cd01cc79d8a8903ddbe6fbde9ac7497919a3f33c61861443703a9e08b39",
"container_hash": "25d789bcd3d12a4ba50bbb56eed1de33279d04352adbba8fd7e3b7b938aec806"
}
}
},
{
"_index": "ks-logstash-log-2020.02.28",
"_type": "flb_type",
"_id": "Kn1GjXABMO5aQxyNsyxy",
"_score": 1.0,
"_source": {
"@timestamp": "2020-02-28T19:25:34.168Z",
"log": " value: \"hostpath\"\n",
"time": "2020-02-28T19:25:34.168983384Z",
"kubernetes": {
"pod_name": "openebs-localpv-provisioner-55c66b57b4-jgtjc",
"namespace_name": "kube-system",
"host": "ks-allinone",
"container_name": "openebs-localpv-provisioner",
"docker_id": "cac01cd01cc79d8a8903ddbe6fbde9ac7497919a3f33c61861443703a9e08b39",
"container_hash": "25d789bcd3d12a4ba50bbb56eed1de33279d04352adbba8fd7e3b7b938aec806"
}
}
},
{
"_index": "ks-logstash-log-2020.02.28",
"_type": "flb_type",
"_id": "LH1GjXABMO5aQxyNsyxy",
"_score": 1.0,
"_source": {
"@timestamp": "2020-02-28T19:25:34.168Z",
"log": " value: \"/var/openebs/local/\"\n",
"time": "2020-02-28T19:25:34.168997393Z",
"kubernetes": {
"pod_name": "openebs-localpv-provisioner-55c66b57b4-jgtjc",
"namespace_name": "kube-system",
"host": "ks-allinone",
"container_name": "openebs-localpv-provisioner",
"docker_id": "cac01cd01cc79d8a8903ddbe6fbde9ac7497919a3f33c61861443703a9e08b39",
"container_hash": "25d789bcd3d12a4ba50bbb56eed1de33279d04352adbba8fd7e3b7b938aec806"
}
}
},
{
"_index": "ks-logstash-log-2020.02.28",
"_type": "flb_type",
"_id": "NX1GjXABMO5aQxyNsyxy",
"_score": 1.0,
"_source": {
"@timestamp": "2020-02-28T19:25:42.868Z",
"log": "I0228 19:25:42.868413 1 config.go:83] SC local has config:- name: StorageType\n",
"time": "2020-02-28T19:25:42.868578188Z",
"kubernetes": {
"pod_name": "openebs-localpv-provisioner-55c66b57b4-jgtjc",
"namespace_name": "kube-system",
"host": "ks-allinone",
"container_name": "openebs-localpv-provisioner",
"docker_id": "cac01cd01cc79d8a8903ddbe6fbde9ac7497919a3f33c61861443703a9e08b39",
"container_hash": "25d789bcd3d12a4ba50bbb56eed1de33279d04352adbba8fd7e3b7b938aec806"
}
}
},
{
"_index": "ks-logstash-log-2020.02.28",
"_type": "flb_type",
"_id": "Q31GjXABMO5aQxyNsyxy",
"_score": 1.0,
"_source": {
"@timestamp": "2020-02-28T19:26:13.881Z",
"log": "- name: BasePath\n",
"time": "2020-02-28T19:26:13.881180681Z",
"kubernetes": {
"pod_name": "openebs-localpv-provisioner-55c66b57b4-jgtjc",
"namespace_name": "kube-system",
"host": "ks-allinone",
"container_name": "openebs-localpv-provisioner",
"docker_id": "cac01cd01cc79d8a8903ddbe6fbde9ac7497919a3f33c61861443703a9e08b39",
"container_hash": "25d789bcd3d12a4ba50bbb56eed1de33279d04352adbba8fd7e3b7b938aec806"
}
}
},
{
"_index": "ks-logstash-log-2020.02.28",
"_type": "flb_type",
"_id": "S31GjXABMO5aQxyNsyxy",
"_score": 1.0,
"_source": {
"@timestamp": "2020-02-28T19:26:14.597Z",
"log": " value: \"/var/openebs/local/\"\n",
"time": "2020-02-28T19:26:14.597702238Z",
"kubernetes": {
"pod_name": "openebs-localpv-provisioner-55c66b57b4-jgtjc",
"namespace_name": "kube-system",
"host": "ks-allinone",
"container_name": "openebs-localpv-provisioner",
"docker_id": "cac01cd01cc79d8a8903ddbe6fbde9ac7497919a3f33c61861443703a9e08b39",
"container_hash": "25d789bcd3d12a4ba50bbb56eed1de33279d04352adbba8fd7e3b7b938aec806"
}
}
},
{
"_index": "ks-logstash-log-2020.02.28",
"_type": "flb_type",
"_id": "TH1GjXABMO5aQxyNsyxy",
"_score": 1.0,
"_source": {
"@timestamp": "2020-02-28T19:26:14.597Z",
"log": "I0228 19:26:14.597007 1 provisioner_hostpath.go:42] Creating volume pvc-c3b1e67f-00d2-407d-8c45-690bb273c16a at ks-allinone:/var/openebs/local/pvc-c3b1e67f-00d2-407d-8c45-690bb273c16a\n",
"time": "2020-02-28T19:26:14.597708432Z",
"kubernetes": {
"pod_name": "openebs-localpv-provisioner-55c66b57b4-jgtjc",
"namespace_name": "kube-system",
"host": "ks-allinone",
"container_name": "openebs-localpv-provisioner",
"docker_id": "cac01cd01cc79d8a8903ddbe6fbde9ac7497919a3f33c61861443703a9e08b39",
"container_hash": "25d789bcd3d12a4ba50bbb56eed1de33279d04352adbba8fd7e3b7b938aec806"
}
}
},
{
"_index": "ks-logstash-log-2020.02.28",
"_type": "flb_type",
"_id": "UX1GjXABMO5aQxyNsyxy",
"_score": 1.0,
"_source": {
"@timestamp": "2020-02-28T19:26:15.920Z",
"log": "I0228 19:26:15.915071 1 event.go:221] Event(v1.ObjectReference{Kind:\"PersistentVolumeClaim\", Namespace:\"kubesphere-system\", Name:\"mysql-pvc\", UID:\"1e87deb5-eaec-475f-8eb6-8613b3be80a4\", APIVersion:\"v1\", ResourceVersion:\"2397\", FieldPath:\"\"}): type: 'Normal' reason: 'ProvisioningSucceeded' Successfully provisioned volume pvc-1e87deb5-eaec-475f-8eb6-8613b3be80a4\n",
"time": "2020-02-28T19:26:15.920650572Z",
"kubernetes": {
"pod_name": "openebs-localpv-provisioner-55c66b57b4-jgtjc",
"namespace_name": "kube-system",
"host": "ks-allinone",
"container_name": "openebs-localpv-provisioner",
"docker_id": "cac01cd01cc79d8a8903ddbe6fbde9ac7497919a3f33c61861443703a9e08b39",
"container_hash": "25d789bcd3d12a4ba50bbb56eed1de33279d04352adbba8fd7e3b7b938aec806"
}
}
}
]
},
"aggregations": {
"container_count": {
"value": 93
}
}
}`,
expected: logging.Statistics{
Containers: 93,
Logs: 241222,
},
expectedError: false,
},
{
description: "[es 6.x] index not found",
searchFilter: logging.SearchFilter{
NamespaceFilter: map[string]time.Time{
"workspace-1-project-a": time.Unix(1582000000, 0),
"workspace-1-project-b": time.Unix(1582333333, 0),
},
},
fakeVersion: ElasticV6,
fakeResp: `{
"error": {
"root_cause": [
{
"type": "index_not_found_exception",
"reason": "no such index",
"resource.type": "index_or_alias",
"resource.id": "ks-lsdfsdfsdfs",
"index_uuid": "_na_",
"index": "ks-lsdfsdfsdfs"
}
],
"type": "index_not_found_exception",
"reason": "no such index",
"resource.type": "index_or_alias",
"resource.id": "ks-lsdfsdfsdfs",
"index_uuid": "_na_",
"index": "ks-lsdfsdfsdfs"
},
"status": 404
}`,
expected: logging.Statistics{
Containers: 0,
Logs: 0,
},
expectedError: true,
},
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
es := MockElasticsearchService("/", test.fakeResp)
defer es.Close()
clientv5 := Elasticsearch{c: v5.New(es.URL, "ks-logstash-log")}
clientv6 := Elasticsearch{c: v6.New(es.URL, "ks-logstash-log")}
clientv7 := Elasticsearch{c: v7.New(es.URL, "ks-logstash-log")}
var stats logging.Statistics
var err error
switch test.fakeVersion {
case ElasticV5:
stats, err = clientv5.GetCurrentStats(test.searchFilter)
case ElasticV6:
stats, err = clientv6.GetCurrentStats(test.searchFilter)
case ElasticV7:
stats, err = clientv7.GetCurrentStats(test.searchFilter)
}
if err != nil && !test.expectedError {
t.Fatal(err)
} else if diff := cmp.Diff(stats, test.expected); diff != "" {
t.Fatalf("%T differ (-got, +want): %s", test.expected, diff)
}
})
}
}

View File

@@ -0,0 +1,46 @@
package elasticsearch
import (
"github.com/spf13/pflag"
"kubesphere.io/kubesphere/pkg/utils/reflectutils"
)
type Options struct {
Host string `json:"host" yaml:"host"`
IndexPrefix string `json:"indexPrefix,omitempty" yaml:"indexPrefix"`
Version string `json:"version" yaml:"version"`
}
func NewElasticSearchOptions() *Options {
return &Options{
Host: "",
IndexPrefix: "fluentbit",
Version: "",
}
}
func (s *Options) ApplyTo(options *Options) {
if s.Host != "" {
reflectutils.Override(options, s)
}
}
func (s *Options) Validate() []error {
errs := []error{}
return errs
}
func (s *Options) AddFlags(fs *pflag.FlagSet, c *Options) {
fs.StringVar(&s.Host, "elasticsearch-host", c.Host, ""+
"Elasticsearch logging service host. KubeSphere is using elastic as log store, "+
"if this filed left blank, KubeSphere will use kubernetes builtin log API instead, and"+
" the following elastic search options will be ignored.")
fs.StringVar(&s.IndexPrefix, "index-prefix", c.IndexPrefix, ""+
"Index name prefix. KubeSphere will retrieve logs against indices matching the prefix.")
fs.StringVar(&s.Version, "elasticsearch-version", c.Version, ""+
"Elasticsearch major version, e.g. 5/6/7, if left blank, will detect automatically."+
"Currently, minimum supported version is 5.x")
}

View File

@@ -0,0 +1,88 @@
package v5
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v5"
"github.com/elastic/go-elasticsearch/v5/esapi"
"io/ioutil"
"k8s.io/klog"
"time"
)
type Elastic struct {
client *elasticsearch.Client
index string
}
func New(address string, index string) *Elastic {
client, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{address},
})
if err != nil {
klog.Error(err)
return nil
}
return &Elastic{client: client, index: index}
}
func (e *Elastic) Search(body []byte) ([]byte, error) {
response, err := e.client.Search(
e.client.Search.WithContext(context.Background()),
e.client.Search.WithIndex(fmt.Sprintf("%s*", e.index)),
e.client.Search.WithBody(bytes.NewBuffer(body)),
e.client.Search.WithScroll(time.Minute))
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.IsError() {
return nil, parseError(response)
}
return ioutil.ReadAll(response.Body)
}
func (e *Elastic) Scroll(id string) ([]byte, error) {
response, err := e.client.Scroll(
e.client.Scroll.WithContext(context.Background()),
e.client.Scroll.WithScrollID(id),
e.client.Scroll.WithScroll(time.Minute))
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.IsError() {
return nil, parseError(response)
}
return ioutil.ReadAll(response.Body)
}
func (e *Elastic) ClearScroll(scrollId string) {
response, _ := e.client.ClearScroll(
e.client.ClearScroll.WithContext(context.Background()),
e.client.ClearScroll.WithScrollID(scrollId))
defer response.Body.Close()
}
func (e *Elastic) GetTotalHitCount(v interface{}) int64 {
f, _ := v.(float64)
return int64(f)
}
func parseError(response *esapi.Response) error {
var e map[string]interface{}
if err := json.NewDecoder(response.Body).Decode(&e); err != nil {
return err
} else {
// Print the response status and error information.
e, _ := e["error"].(map[string]interface{})
return fmt.Errorf("type: %v, reason: %v", e["type"], e["reason"])
}
}

View File

@@ -0,0 +1,88 @@
package v6
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi"
"io/ioutil"
"k8s.io/klog"
"time"
)
type Elastic struct {
Client *elasticsearch.Client
index string
}
func New(address string, index string) *Elastic {
client, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{address},
})
if err != nil {
klog.Error(err)
return nil
}
return &Elastic{Client: client, index: index}
}
func (e *Elastic) Search(body []byte) ([]byte, error) {
response, err := e.Client.Search(
e.Client.Search.WithContext(context.Background()),
e.Client.Search.WithIndex(fmt.Sprintf("%s*", e.index)),
e.Client.Search.WithBody(bytes.NewBuffer(body)),
e.Client.Search.WithScroll(time.Minute))
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.IsError() {
return nil, parseError(response)
}
return ioutil.ReadAll(response.Body)
}
func (e *Elastic) Scroll(id string) ([]byte, error) {
response, err := e.Client.Scroll(
e.Client.Scroll.WithContext(context.Background()),
e.Client.Scroll.WithScrollID(id),
e.Client.Scroll.WithScroll(time.Minute))
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.IsError() {
return nil, parseError(response)
}
return ioutil.ReadAll(response.Body)
}
func (e *Elastic) ClearScroll(scrollId string) {
response, _ := e.Client.ClearScroll(
e.Client.ClearScroll.WithContext(context.Background()),
e.Client.ClearScroll.WithScrollID(scrollId))
defer response.Body.Close()
}
func (e *Elastic) GetTotalHitCount(v interface{}) int64 {
f, _ := v.(float64)
return int64(f)
}
func parseError(response *esapi.Response) error {
var e map[string]interface{}
if err := json.NewDecoder(response.Body).Decode(&e); err != nil {
return err
} else {
// Print the response status and error information.
e, _ := e["error"].(map[string]interface{})
return fmt.Errorf("type: %v, reason: %v", e["type"], e["reason"])
}
}

View File

@@ -0,0 +1,91 @@
package v7
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"io/ioutil"
"k8s.io/klog"
"time"
)
type Elastic struct {
client *elasticsearch.Client
index string
}
func New(address string, index string) *Elastic {
client, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{address},
})
if err != nil {
klog.Error(err)
return nil
}
return &Elastic{client: client, index: index}
}
func (e *Elastic) Search(body []byte) ([]byte, error) {
response, err := e.client.Search(
e.client.Search.WithContext(context.Background()),
e.client.Search.WithIndex(fmt.Sprintf("%s*", e.index)),
e.client.Search.WithTrackTotalHits(true),
e.client.Search.WithBody(bytes.NewBuffer(body)),
e.client.Search.WithScroll(time.Minute))
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.IsError() {
return nil, parseError(response)
}
return ioutil.ReadAll(response.Body)
}
func (e *Elastic) Scroll(id string) ([]byte, error) {
response, err := e.client.Scroll(
e.client.Scroll.WithContext(context.Background()),
e.client.Scroll.WithScrollID(id),
e.client.Scroll.WithScroll(time.Minute))
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.IsError() {
return nil, parseError(response)
}
b, err := ioutil.ReadAll(response.Body)
return b, err
}
func (e *Elastic) ClearScroll(scrollId string) {
response, _ := e.client.ClearScroll(
e.client.ClearScroll.WithContext(context.Background()),
e.client.ClearScroll.WithScrollID(scrollId))
defer response.Body.Close()
}
func (e *Elastic) GetTotalHitCount(v interface{}) int64 {
m, _ := v.(map[string]interface{})
f, _ := m["value"].(float64)
return int64(f)
}
func parseError(response *esapi.Response) error {
var e map[string]interface{}
if err := json.NewDecoder(response.Body).Decode(&e); err != nil {
return err
} else {
// Print the response status and error information.
e, _ := e["error"].(map[string]interface{})
return fmt.Errorf("type: %v, reason: %v", e["type"], e["reason"])
}
}