significantly improve log search performance
Signed-off-by: huanggze <loganhuang@yunify.com>
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
|||||||
"kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v5"
|
"kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v5"
|
||||||
"kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v6"
|
"kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v6"
|
||||||
"kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v7"
|
"kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v7"
|
||||||
|
"kubesphere.io/kubesphere/pkg/utils/esutil"
|
||||||
"kubesphere.io/kubesphere/pkg/utils/stringutils"
|
"kubesphere.io/kubesphere/pkg/utils/stringutils"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -33,7 +34,7 @@ type Elasticsearch struct {
|
|||||||
|
|
||||||
// versioned es client interface
|
// versioned es client interface
|
||||||
type client interface {
|
type client interface {
|
||||||
Search(body []byte, scroll bool) ([]byte, error)
|
Search(indices string, body []byte, scroll bool) ([]byte, error)
|
||||||
Scroll(id string) ([]byte, error)
|
Scroll(id string) ([]byte, error)
|
||||||
ClearScroll(id string)
|
ClearScroll(id string)
|
||||||
GetTotalHitCount(v interface{}) int64
|
GetTotalHitCount(v interface{}) int64
|
||||||
@@ -147,7 +148,7 @@ func (es *Elasticsearch) GetCurrentStats(sf logging.SearchFilter) (logging.Stati
|
|||||||
return logging.Statistics{}, err
|
return logging.Statistics{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
b, err := es.c.Search(body, true)
|
b, err := es.c.Search(esutil.ResolveIndexNames(es.index, sf.Starttime, sf.Endtime), body, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return logging.Statistics{}, err
|
return logging.Statistics{}, err
|
||||||
}
|
}
|
||||||
@@ -180,7 +181,7 @@ func (es *Elasticsearch) CountLogsByInterval(sf logging.SearchFilter, interval s
|
|||||||
return logging.Histogram{}, err
|
return logging.Histogram{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
b, err := es.c.Search(body, false)
|
b, err := es.c.Search(esutil.ResolveIndexNames(es.index, sf.Starttime, sf.Endtime), body, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return logging.Histogram{}, err
|
return logging.Histogram{}, err
|
||||||
}
|
}
|
||||||
@@ -219,7 +220,7 @@ func (es *Elasticsearch) SearchLogs(sf logging.SearchFilter, f, s int64, o strin
|
|||||||
return logging.Logs{}, err
|
return logging.Logs{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
b, err := es.c.Search(body, false)
|
b, err := es.c.Search(esutil.ResolveIndexNames(es.index, sf.Starttime, sf.Endtime), body, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return logging.Logs{}, err
|
return logging.Logs{}, err
|
||||||
}
|
}
|
||||||
@@ -264,7 +265,7 @@ func (es *Elasticsearch) ExportLogs(sf logging.SearchFilter, w io.Writer) error
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
b, err := es.c.Search(body, true)
|
b, err := es.c.Search(esutil.ResolveIndexNames(es.index, sf.Starttime, sf.Endtime), body, true)
|
||||||
defer es.ClearScroll(id)
|
defer es.ClearScroll(id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|||||||
@@ -224,17 +224,14 @@ func mockElasticsearchService(pattern, fakeResp string, fakeCode int) *httptest.
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newElasticsearchClient(srv *httptest.Server, version string) *Elasticsearch {
|
func newElasticsearchClient(srv *httptest.Server, version string) *Elasticsearch {
|
||||||
var es *Elasticsearch
|
es := &Elasticsearch{index: "ks-logstash-log"}
|
||||||
switch version {
|
switch version {
|
||||||
case ElasticV5:
|
case ElasticV5:
|
||||||
client, _ := v5.New(srv.URL, "ks-logstash-log")
|
es.c, _ = v5.New(srv.URL, "ks-logstash-log")
|
||||||
es = &Elasticsearch{c: client}
|
|
||||||
case ElasticV6:
|
case ElasticV6:
|
||||||
client, _ := v6.New(srv.URL, "ks-logstash-log")
|
es.c, _ = v6.New(srv.URL, "ks-logstash-log")
|
||||||
es = &Elasticsearch{c: client}
|
|
||||||
case ElasticV7:
|
case ElasticV7:
|
||||||
client, _ := v7.New(srv.URL, "ks-logstash-log")
|
es.c, _ = v7.New(srv.URL, "ks-logstash-log")
|
||||||
es = &Elasticsearch{c: client}
|
|
||||||
}
|
}
|
||||||
return es
|
return es
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,10 +24,11 @@ func New(address string, index string) (*Elastic, error) {
|
|||||||
return &Elastic{client: client, index: index}, err
|
return &Elastic{client: client, index: index}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Elastic) Search(body []byte, scroll bool) ([]byte, error) {
|
func (e *Elastic) Search(indices string, body []byte, scroll bool) ([]byte, error) {
|
||||||
opts := []func(*esapi.SearchRequest){
|
opts := []func(*esapi.SearchRequest){
|
||||||
e.client.Search.WithContext(context.Background()),
|
e.client.Search.WithContext(context.Background()),
|
||||||
e.client.Search.WithIndex(fmt.Sprintf("%s*", e.index)),
|
e.client.Search.WithIndex(indices),
|
||||||
|
e.client.Search.WithIgnoreUnavailable(true),
|
||||||
e.client.Search.WithBody(bytes.NewBuffer(body)),
|
e.client.Search.WithBody(bytes.NewBuffer(body)),
|
||||||
}
|
}
|
||||||
if scroll {
|
if scroll {
|
||||||
|
|||||||
@@ -24,10 +24,11 @@ func New(address string, index string) (*Elastic, error) {
|
|||||||
return &Elastic{Client: client, index: index}, err
|
return &Elastic{Client: client, index: index}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Elastic) Search(body []byte, scroll bool) ([]byte, error) {
|
func (e *Elastic) Search(indices string, body []byte, scroll bool) ([]byte, error) {
|
||||||
opts := []func(*esapi.SearchRequest){
|
opts := []func(*esapi.SearchRequest){
|
||||||
e.Client.Search.WithContext(context.Background()),
|
e.Client.Search.WithContext(context.Background()),
|
||||||
e.Client.Search.WithIndex(fmt.Sprintf("%s*", e.index)),
|
e.Client.Search.WithIndex(indices),
|
||||||
|
e.Client.Search.WithIgnoreUnavailable(true),
|
||||||
e.Client.Search.WithBody(bytes.NewBuffer(body)),
|
e.Client.Search.WithBody(bytes.NewBuffer(body)),
|
||||||
}
|
}
|
||||||
if scroll {
|
if scroll {
|
||||||
|
|||||||
@@ -24,10 +24,11 @@ func New(address string, index string) (*Elastic, error) {
|
|||||||
return &Elastic{client: client, index: index}, err
|
return &Elastic{client: client, index: index}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Elastic) Search(body []byte, scroll bool) ([]byte, error) {
|
func (e *Elastic) Search(indices string, body []byte, scroll bool) ([]byte, error) {
|
||||||
opts := []func(*esapi.SearchRequest){
|
opts := []func(*esapi.SearchRequest){
|
||||||
e.client.Search.WithContext(context.Background()),
|
e.client.Search.WithContext(context.Background()),
|
||||||
e.client.Search.WithIndex(fmt.Sprintf("%s*", e.index)),
|
e.client.Search.WithIndex(indices),
|
||||||
|
e.client.Search.WithIgnoreUnavailable(true),
|
||||||
e.client.Search.WithBody(bytes.NewBuffer(body)),
|
e.client.Search.WithBody(bytes.NewBuffer(body)),
|
||||||
}
|
}
|
||||||
if scroll {
|
if scroll {
|
||||||
|
|||||||
32
pkg/utils/esutil/esutil.go
Normal file
32
pkg/utils/esutil/esutil.go
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
// TODO: refactor
|
||||||
|
package esutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// KubeSphere uses the layout `yyyy.MM.dd`.
|
||||||
|
const layoutISO = "2006.01.02"
|
||||||
|
|
||||||
|
func ResolveIndexNames(prefix string, start, end time.Time) string {
|
||||||
|
if end.IsZero() {
|
||||||
|
end = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
// In case of no start time or a broad query range over 30 days, search all indices.
|
||||||
|
if start.IsZero() || end.Sub(start).Hours() > 24*30 {
|
||||||
|
return fmt.Sprintf("%s*", prefix)
|
||||||
|
}
|
||||||
|
|
||||||
|
var indices []string
|
||||||
|
for i := 0; i <= int(end.Sub(start).Hours()/24); i++ {
|
||||||
|
suffix := end.Add(time.Duration(-i) * 24 * time.Hour).Format(layoutISO)
|
||||||
|
indices = append(indices, fmt.Sprintf("%s-%s", prefix, suffix))
|
||||||
|
}
|
||||||
|
|
||||||
|
// If start is after end, ResolveIndexNames returns "".
|
||||||
|
// However, query parameter validation will prevent it from happening at the very beginning (Bad Request).
|
||||||
|
return strings.Join(indices, ",")
|
||||||
|
}
|
||||||
56
pkg/utils/esutil/esutil_test.go
Normal file
56
pkg/utils/esutil/esutil_test.go
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
package esutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/google/go-cmp/cmp"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestResolveIndexNames(t *testing.T) {
|
||||||
|
var tests = []struct {
|
||||||
|
prefix string
|
||||||
|
start time.Time
|
||||||
|
end time.Time
|
||||||
|
expected string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
prefix: "ks-logstash-log",
|
||||||
|
start: time.Date(2020, time.February, 1, 0, 0, 0, 0, time.UTC),
|
||||||
|
end: time.Date(2020, time.February, 1, 0, 0, 0, 0, time.UTC),
|
||||||
|
expected: "ks-logstash-log-2020.02.01",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
prefix: "ks-logstash-log",
|
||||||
|
start: time.Date(2020, time.February, 1, 0, 0, 0, 0, time.UTC),
|
||||||
|
end: time.Date(2020, time.February, 3, 0, 0, 0, 0, time.UTC),
|
||||||
|
expected: "ks-logstash-log-2020.02.03,ks-logstash-log-2020.02.02,ks-logstash-log-2020.02.01",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
prefix: "ks-logstash-log",
|
||||||
|
end: time.Date(2020, time.February, 3, 0, 0, 0, 0, time.UTC),
|
||||||
|
expected: "ks-logstash-log*",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
prefix: "ks-logstash-log",
|
||||||
|
start: time.Date(2000, time.February, 1, 0, 0, 0, 0, time.UTC),
|
||||||
|
end: time.Date(2020, time.February, 3, 0, 0, 0, 0, time.UTC),
|
||||||
|
expected: "ks-logstash-log*",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
prefix: "ks-logstash-log",
|
||||||
|
start: time.Date(2020, time.February, 3, 0, 0, 0, 0, time.UTC),
|
||||||
|
end: time.Date(2020, time.February, 1, 0, 0, 0, 0, time.UTC),
|
||||||
|
expected: "",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, test := range tests {
|
||||||
|
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
|
||||||
|
result := ResolveIndexNames(test.prefix, test.start, test.end)
|
||||||
|
if diff := cmp.Diff(result, test.expected); diff != "" {
|
||||||
|
t.Fatalf("%T differ (-got, +want): %s", test.expected, diff)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user