From 6a5738d66aca571162a7df1ef322e00b4cea62f4 Mon Sep 17 00:00:00 2001 From: huanggze Date: Tue, 14 Jul 2020 19:18:05 +0800 Subject: [PATCH] significantly improve log search performance Signed-off-by: huanggze --- .../logging/elasticsearch/elasticsearch.go | 11 ++-- .../elasticsearch/elasticsearch_test.go | 11 ++-- .../logging/elasticsearch/versions/v5/v5.go | 5 +- .../logging/elasticsearch/versions/v6/v6.go | 5 +- .../logging/elasticsearch/versions/v7/v7.go | 5 +- pkg/utils/esutil/esutil.go | 32 +++++++++++ pkg/utils/esutil/esutil_test.go | 56 +++++++++++++++++++ 7 files changed, 107 insertions(+), 18 deletions(-) create mode 100644 pkg/utils/esutil/esutil.go create mode 100644 pkg/utils/esutil/esutil_test.go diff --git a/pkg/simple/client/logging/elasticsearch/elasticsearch.go b/pkg/simple/client/logging/elasticsearch/elasticsearch.go index c3acfe5e2..e96b39179 100644 --- a/pkg/simple/client/logging/elasticsearch/elasticsearch.go +++ b/pkg/simple/client/logging/elasticsearch/elasticsearch.go @@ -10,6 +10,7 @@ import ( "kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v5" "kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v6" "kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v7" + "kubesphere.io/kubesphere/pkg/utils/esutil" "kubesphere.io/kubesphere/pkg/utils/stringutils" "strings" "sync" @@ -33,7 +34,7 @@ type Elasticsearch struct { // versioned es client interface type client interface { - Search(body []byte, scroll bool) ([]byte, error) + Search(indices string, body []byte, scroll bool) ([]byte, error) Scroll(id string) ([]byte, error) ClearScroll(id string) GetTotalHitCount(v interface{}) int64 @@ -147,7 +148,7 @@ func (es *Elasticsearch) GetCurrentStats(sf logging.SearchFilter) (logging.Stati return logging.Statistics{}, err } - b, err := es.c.Search(body, true) + b, err := es.c.Search(esutil.ResolveIndexNames(es.index, sf.Starttime, sf.Endtime), body, true) if err != nil { return logging.Statistics{}, err } @@ -180,7 +181,7 @@ func (es *Elasticsearch) CountLogsByInterval(sf logging.SearchFilter, interval s return logging.Histogram{}, err } - b, err := es.c.Search(body, false) + b, err := es.c.Search(esutil.ResolveIndexNames(es.index, sf.Starttime, sf.Endtime), body, false) if err != nil { return logging.Histogram{}, err } @@ -219,7 +220,7 @@ func (es *Elasticsearch) SearchLogs(sf logging.SearchFilter, f, s int64, o strin return logging.Logs{}, err } - b, err := es.c.Search(body, false) + b, err := es.c.Search(esutil.ResolveIndexNames(es.index, sf.Starttime, sf.Endtime), body, false) if err != nil { return logging.Logs{}, err } @@ -264,7 +265,7 @@ func (es *Elasticsearch) ExportLogs(sf logging.SearchFilter, w io.Writer) error return err } - b, err := es.c.Search(body, true) + b, err := es.c.Search(esutil.ResolveIndexNames(es.index, sf.Starttime, sf.Endtime), body, true) defer es.ClearScroll(id) if err != nil { return err diff --git a/pkg/simple/client/logging/elasticsearch/elasticsearch_test.go b/pkg/simple/client/logging/elasticsearch/elasticsearch_test.go index 362064cae..d06cda97a 100644 --- a/pkg/simple/client/logging/elasticsearch/elasticsearch_test.go +++ b/pkg/simple/client/logging/elasticsearch/elasticsearch_test.go @@ -224,17 +224,14 @@ func mockElasticsearchService(pattern, fakeResp string, fakeCode int) *httptest. } func newElasticsearchClient(srv *httptest.Server, version string) *Elasticsearch { - var es *Elasticsearch + es := &Elasticsearch{index: "ks-logstash-log"} switch version { case ElasticV5: - client, _ := v5.New(srv.URL, "ks-logstash-log") - es = &Elasticsearch{c: client} + es.c, _ = v5.New(srv.URL, "ks-logstash-log") case ElasticV6: - client, _ := v6.New(srv.URL, "ks-logstash-log") - es = &Elasticsearch{c: client} + es.c, _ = v6.New(srv.URL, "ks-logstash-log") case ElasticV7: - client, _ := v7.New(srv.URL, "ks-logstash-log") - es = &Elasticsearch{c: client} + es.c, _ = v7.New(srv.URL, "ks-logstash-log") } return es } diff --git a/pkg/simple/client/logging/elasticsearch/versions/v5/v5.go b/pkg/simple/client/logging/elasticsearch/versions/v5/v5.go index 6cbe7a219..f3a9a29d5 100644 --- a/pkg/simple/client/logging/elasticsearch/versions/v5/v5.go +++ b/pkg/simple/client/logging/elasticsearch/versions/v5/v5.go @@ -24,10 +24,11 @@ func New(address string, index string) (*Elastic, error) { return &Elastic{client: client, index: index}, err } -func (e *Elastic) Search(body []byte, scroll bool) ([]byte, error) { +func (e *Elastic) Search(indices string, body []byte, scroll bool) ([]byte, error) { opts := []func(*esapi.SearchRequest){ e.client.Search.WithContext(context.Background()), - e.client.Search.WithIndex(fmt.Sprintf("%s*", e.index)), + e.client.Search.WithIndex(indices), + e.client.Search.WithIgnoreUnavailable(true), e.client.Search.WithBody(bytes.NewBuffer(body)), } if scroll { diff --git a/pkg/simple/client/logging/elasticsearch/versions/v6/v6.go b/pkg/simple/client/logging/elasticsearch/versions/v6/v6.go index 9616d5d65..3901299be 100644 --- a/pkg/simple/client/logging/elasticsearch/versions/v6/v6.go +++ b/pkg/simple/client/logging/elasticsearch/versions/v6/v6.go @@ -24,10 +24,11 @@ func New(address string, index string) (*Elastic, error) { return &Elastic{Client: client, index: index}, err } -func (e *Elastic) Search(body []byte, scroll bool) ([]byte, error) { +func (e *Elastic) Search(indices string, body []byte, scroll bool) ([]byte, error) { opts := []func(*esapi.SearchRequest){ e.Client.Search.WithContext(context.Background()), - e.Client.Search.WithIndex(fmt.Sprintf("%s*", e.index)), + e.Client.Search.WithIndex(indices), + e.Client.Search.WithIgnoreUnavailable(true), e.Client.Search.WithBody(bytes.NewBuffer(body)), } if scroll { diff --git a/pkg/simple/client/logging/elasticsearch/versions/v7/v7.go b/pkg/simple/client/logging/elasticsearch/versions/v7/v7.go index f3dd3b0d3..251c417e5 100644 --- a/pkg/simple/client/logging/elasticsearch/versions/v7/v7.go +++ b/pkg/simple/client/logging/elasticsearch/versions/v7/v7.go @@ -24,10 +24,11 @@ func New(address string, index string) (*Elastic, error) { return &Elastic{client: client, index: index}, err } -func (e *Elastic) Search(body []byte, scroll bool) ([]byte, error) { +func (e *Elastic) Search(indices string, body []byte, scroll bool) ([]byte, error) { opts := []func(*esapi.SearchRequest){ e.client.Search.WithContext(context.Background()), - e.client.Search.WithIndex(fmt.Sprintf("%s*", e.index)), + e.client.Search.WithIndex(indices), + e.client.Search.WithIgnoreUnavailable(true), e.client.Search.WithBody(bytes.NewBuffer(body)), } if scroll { diff --git a/pkg/utils/esutil/esutil.go b/pkg/utils/esutil/esutil.go new file mode 100644 index 000000000..0dc5b42c0 --- /dev/null +++ b/pkg/utils/esutil/esutil.go @@ -0,0 +1,32 @@ +// TODO: refactor +package esutil + +import ( + "fmt" + "strings" + "time" +) + +// KubeSphere uses the layout `yyyy.MM.dd`. +const layoutISO = "2006.01.02" + +func ResolveIndexNames(prefix string, start, end time.Time) string { + if end.IsZero() { + end = time.Now() + } + + // In case of no start time or a broad query range over 30 days, search all indices. + if start.IsZero() || end.Sub(start).Hours() > 24*30 { + return fmt.Sprintf("%s*", prefix) + } + + var indices []string + for i := 0; i <= int(end.Sub(start).Hours()/24); i++ { + suffix := end.Add(time.Duration(-i) * 24 * time.Hour).Format(layoutISO) + indices = append(indices, fmt.Sprintf("%s-%s", prefix, suffix)) + } + + // If start is after end, ResolveIndexNames returns "". + // However, query parameter validation will prevent it from happening at the very beginning (Bad Request). + return strings.Join(indices, ",") +} diff --git a/pkg/utils/esutil/esutil_test.go b/pkg/utils/esutil/esutil_test.go new file mode 100644 index 000000000..24b1ee505 --- /dev/null +++ b/pkg/utils/esutil/esutil_test.go @@ -0,0 +1,56 @@ +package esutil + +import ( + "fmt" + "github.com/google/go-cmp/cmp" + "testing" + "time" +) + +func TestResolveIndexNames(t *testing.T) { + var tests = []struct { + prefix string + start time.Time + end time.Time + expected string + }{ + { + prefix: "ks-logstash-log", + start: time.Date(2020, time.February, 1, 0, 0, 0, 0, time.UTC), + end: time.Date(2020, time.February, 1, 0, 0, 0, 0, time.UTC), + expected: "ks-logstash-log-2020.02.01", + }, + { + prefix: "ks-logstash-log", + start: time.Date(2020, time.February, 1, 0, 0, 0, 0, time.UTC), + end: time.Date(2020, time.February, 3, 0, 0, 0, 0, time.UTC), + expected: "ks-logstash-log-2020.02.03,ks-logstash-log-2020.02.02,ks-logstash-log-2020.02.01", + }, + { + prefix: "ks-logstash-log", + end: time.Date(2020, time.February, 3, 0, 0, 0, 0, time.UTC), + expected: "ks-logstash-log*", + }, + { + prefix: "ks-logstash-log", + start: time.Date(2000, time.February, 1, 0, 0, 0, 0, time.UTC), + end: time.Date(2020, time.February, 3, 0, 0, 0, 0, time.UTC), + expected: "ks-logstash-log*", + }, + { + prefix: "ks-logstash-log", + start: time.Date(2020, time.February, 3, 0, 0, 0, 0, time.UTC), + end: time.Date(2020, time.February, 1, 0, 0, 0, 0, time.UTC), + expected: "", + }, + } + + for i, test := range tests { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + result := ResolveIndexNames(test.prefix, test.start, test.end) + if diff := cmp.Diff(result, test.expected); diff != "" { + t.Fatalf("%T differ (-got, +want): %s", test.expected, diff) + } + }) + } +}