lazy init events client and optimize events search
This commit is contained in:
@@ -41,7 +41,8 @@ func (e Error) Error() string {
|
||||
type ClientV5 es5.Client
|
||||
|
||||
func (c *ClientV5) ExSearch(r *Request) (*Response, error) {
|
||||
return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body)))
|
||||
return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body),
|
||||
c.Search.WithIgnoreUnavailable(true)))
|
||||
}
|
||||
func (c *ClientV5) parse(resp *es5api.Response, err error) (*Response, error) {
|
||||
if err != nil {
|
||||
@@ -85,7 +86,8 @@ func (c *ClientV5) Version() (string, error) {
|
||||
type ClientV6 es6.Client
|
||||
|
||||
func (c *ClientV6) ExSearch(r *Request) (*Response, error) {
|
||||
return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body)))
|
||||
return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body),
|
||||
c.Search.WithIgnoreUnavailable(true)))
|
||||
}
|
||||
func (c *ClientV6) parse(resp *es6api.Response, err error) (*Response, error) {
|
||||
if err != nil {
|
||||
@@ -114,7 +116,8 @@ func (c *ClientV6) parse(resp *es6api.Response, err error) (*Response, error) {
|
||||
type ClientV7 es7.Client
|
||||
|
||||
func (c *ClientV7) ExSearch(r *Request) (*Response, error) {
|
||||
return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body)))
|
||||
return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body),
|
||||
c.Search.WithIgnoreUnavailable(true)))
|
||||
}
|
||||
func (c *ClientV7) parse(resp *es7api.Response, err error) (*Response, error) {
|
||||
if err != nil {
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
es5 "github.com/elastic/go-elasticsearch/v5"
|
||||
@@ -12,18 +13,19 @@ import (
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/events"
|
||||
"kubesphere.io/kubesphere/pkg/utils/esutil"
|
||||
)
|
||||
|
||||
var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
||||
|
||||
type Elasticsearch struct {
|
||||
type elasticsearch struct {
|
||||
c client
|
||||
opts struct {
|
||||
index string
|
||||
indexPrefix string
|
||||
}
|
||||
}
|
||||
|
||||
func (es *Elasticsearch) SearchEvents(filter *events.Filter, from, size int64,
|
||||
func (es *elasticsearch) SearchEvents(filter *events.Filter, from, size int64,
|
||||
sort string) (*events.Events, error) {
|
||||
queryPart := parseToQueryPart(filter)
|
||||
if sort == "" {
|
||||
@@ -44,7 +46,7 @@ func (es *Elasticsearch) SearchEvents(filter *events.Filter, from, size int64,
|
||||
return nil, err
|
||||
}
|
||||
resp, err := es.c.ExSearch(&Request{
|
||||
Index: es.opts.index,
|
||||
Index: resolveIndexNames(es.opts.indexPrefix, filter.StartTime, filter.EndTime),
|
||||
Body: bytes.NewBuffer(body),
|
||||
})
|
||||
if err != nil || resp == nil {
|
||||
@@ -64,7 +66,7 @@ func (es *Elasticsearch) SearchEvents(filter *events.Filter, from, size int64,
|
||||
return &evts, nil
|
||||
}
|
||||
|
||||
func (es *Elasticsearch) CountOverTime(filter *events.Filter, interval string) (*events.Histogram, error) {
|
||||
func (es *elasticsearch) CountOverTime(filter *events.Filter, interval string) (*events.Histogram, error) {
|
||||
if interval == "" {
|
||||
interval = "15m"
|
||||
}
|
||||
@@ -90,7 +92,7 @@ func (es *Elasticsearch) CountOverTime(filter *events.Filter, interval string) (
|
||||
return nil, err
|
||||
}
|
||||
resp, err := es.c.ExSearch(&Request{
|
||||
Index: es.opts.index,
|
||||
Index: resolveIndexNames(es.opts.indexPrefix, filter.StartTime, filter.EndTime),
|
||||
Body: bytes.NewBuffer(body),
|
||||
})
|
||||
if err != nil || resp == nil {
|
||||
@@ -116,7 +118,7 @@ func (es *Elasticsearch) CountOverTime(filter *events.Filter, interval string) (
|
||||
return &histo, nil
|
||||
}
|
||||
|
||||
func (es *Elasticsearch) StatisticsOnResources(filter *events.Filter) (*events.Statistics, error) {
|
||||
func (es *elasticsearch) StatisticsOnResources(filter *events.Filter) (*events.Statistics, error) {
|
||||
queryPart := parseToQueryPart(filter)
|
||||
aggName := "resources_count"
|
||||
aggsPart := map[string]interface{}{
|
||||
@@ -137,7 +139,7 @@ func (es *Elasticsearch) StatisticsOnResources(filter *events.Filter) (*events.S
|
||||
return nil, err
|
||||
}
|
||||
resp, err := es.c.ExSearch(&Request{
|
||||
Index: es.opts.index,
|
||||
Index: resolveIndexNames(es.opts.indexPrefix, filter.StartTime, filter.EndTime),
|
||||
Body: bytes.NewBuffer(body),
|
||||
})
|
||||
if err != nil || resp == nil {
|
||||
@@ -158,7 +160,7 @@ func (es *Elasticsearch) StatisticsOnResources(filter *events.Filter) (*events.S
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewClient(options *Options) (*Elasticsearch, error) {
|
||||
func newClient(options *Options) (*elasticsearch, error) {
|
||||
clientV5 := func() (*ClientV5, error) {
|
||||
c, err := es5.NewClient(es5.Config{Addresses: []string{options.Host}})
|
||||
if err != nil {
|
||||
@@ -183,10 +185,10 @@ func NewClient(options *Options) (*Elasticsearch, error) {
|
||||
|
||||
var (
|
||||
version = options.Version
|
||||
es = Elasticsearch{}
|
||||
es = elasticsearch{}
|
||||
err error
|
||||
)
|
||||
es.opts.index = fmt.Sprintf("%s*", options.IndexPrefix)
|
||||
es.opts.indexPrefix = options.IndexPrefix
|
||||
|
||||
if options.Version == "" {
|
||||
var c5 *ClientV5
|
||||
@@ -218,6 +220,58 @@ func NewClient(options *Options) (*Elasticsearch, error) {
|
||||
return &es, nil
|
||||
}
|
||||
|
||||
type Elasticsearch struct {
|
||||
innerEs *elasticsearch
|
||||
options Options
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
func (es *Elasticsearch) SearchEvents(filter *events.Filter, from, size int64,
|
||||
sort string) (*events.Events, error) {
|
||||
ies, e := es.getInnerEs()
|
||||
if e != nil {
|
||||
return nil, e
|
||||
}
|
||||
return ies.SearchEvents(filter, from, size, sort)
|
||||
}
|
||||
|
||||
func (es *Elasticsearch) CountOverTime(filter *events.Filter, interval string) (*events.Histogram, error) {
|
||||
ies, e := es.getInnerEs()
|
||||
if e != nil {
|
||||
return nil, e
|
||||
}
|
||||
return ies.CountOverTime(filter, interval)
|
||||
}
|
||||
|
||||
func (es *Elasticsearch) StatisticsOnResources(filter *events.Filter) (*events.Statistics, error) {
|
||||
ies, e := es.getInnerEs()
|
||||
if e != nil {
|
||||
return nil, e
|
||||
}
|
||||
return ies.StatisticsOnResources(filter)
|
||||
}
|
||||
|
||||
func (es *Elasticsearch) getInnerEs() (*elasticsearch, error) {
|
||||
if es.innerEs != nil {
|
||||
return es.innerEs, nil
|
||||
}
|
||||
es.mutex.Lock()
|
||||
defer es.mutex.Unlock()
|
||||
if es.innerEs != nil {
|
||||
return es.innerEs, nil
|
||||
}
|
||||
ies, err := newClient(&es.options)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
es.innerEs = ies
|
||||
return es.innerEs, nil
|
||||
}
|
||||
|
||||
func NewClient(options *Options) (*Elasticsearch, error) {
|
||||
return &Elasticsearch{options: *options}, nil
|
||||
}
|
||||
|
||||
func parseToQueryPart(f *events.Filter) interface{} {
|
||||
if f == nil {
|
||||
return nil
|
||||
@@ -347,3 +401,14 @@ func parseToQueryPart(f *events.Filter) interface{} {
|
||||
|
||||
return queryBody
|
||||
}
|
||||
|
||||
func resolveIndexNames(prefix string, start, end *time.Time) string {
|
||||
var s, e time.Time
|
||||
if start != nil {
|
||||
s = *start
|
||||
}
|
||||
if end != nil {
|
||||
e = *end
|
||||
}
|
||||
return esutil.ResolveIndexNames(prefix, s, e)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user