lazy initializing es client

Signed-off-by: huanggze <loganhuang@yunify.com>
This commit is contained in:
huanggze
2020-07-02 19:54:16 +08:00
parent 78159e9636
commit 4df7ae2636
5 changed files with 114 additions and 70 deletions

View File

@@ -12,6 +12,7 @@ import (
"kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v7" "kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v7"
"kubesphere.io/kubesphere/pkg/utils/stringutils" "kubesphere.io/kubesphere/pkg/utils/stringutils"
"strings" "strings"
"sync"
) )
const ( const (
@@ -22,7 +23,12 @@ const (
// Elasticsearch implement logging interface // Elasticsearch implement logging interface
type Elasticsearch struct { type Elasticsearch struct {
c client host string
version string
index string
c client
mux sync.Mutex
} }
// versioned es client interface // versioned es client interface
@@ -34,76 +40,105 @@ type client interface {
} }
func NewElasticsearch(options *Options) (*Elasticsearch, error) { func NewElasticsearch(options *Options) (*Elasticsearch, error) {
var version, index string var err error
es := &Elasticsearch{} es := &Elasticsearch{
host: options.Host,
if options.Version == "" { version: options.Version,
var err error index: options.IndexPrefix,
version, err = detectVersionMajor(options.Host)
if err != nil {
return nil, err
}
} else {
version = options.Version
} }
if options.IndexPrefix != "" { switch es.version {
index = options.IndexPrefix
} else {
index = "logstash"
}
switch version {
case ElasticV5: case ElasticV5:
es.c = v5.New(options.Host, index) es.c, err = v5.New(es.host, es.index)
case ElasticV6: case ElasticV6:
es.c = v6.New(options.Host, index) es.c, err = v6.New(es.host, es.index)
case ElasticV7: case ElasticV7:
es.c = v7.New(options.Host, index) es.c, err = v7.New(es.host, es.index)
case "":
es.c = nil
default: default:
return nil, fmt.Errorf("unsupported elasticsearch version %s", version) return nil, fmt.Errorf("unsupported elasticsearch version %s", es.version)
} }
return es, nil return es, err
} }
func (es *Elasticsearch) ES() *client { func (es *Elasticsearch) loadClient() error {
return &es.c // Check if Elasticsearch client has been initialized.
} if es.c != nil {
return nil
}
func detectVersionMajor(host string) (string, error) { // Create Elasticsearch client.
// Info APIs are backward compatible with versions of v5.x, v6.x and v7.x es.mux.Lock()
es := v6.New(host, "") defer es.mux.Unlock()
res, err := es.Client.Info(
es.Client.Info.WithContext(context.Background()), if es.c != nil {
return nil
}
// Detect Elasticsearch server version using Info API.
// Info API is backward compatible across v5, v6 and v7.
esv6, err := v6.New(es.host, "")
if err != nil {
return err
}
res, err := esv6.Client.Info(
esv6.Client.Info.WithContext(context.Background()),
) )
if err != nil { if err != nil {
return "", err return err
} }
defer res.Body.Close() defer res.Body.Close()
var b map[string]interface{} var b map[string]interface{}
if err = jsoniter.NewDecoder(res.Body).Decode(&b); err != nil { if err = jsoniter.NewDecoder(res.Body).Decode(&b); err != nil {
return "", err return err
} }
if res.IsError() { if res.IsError() {
// Print the response status and error information. // Print the response status and error information.
e, _ := b["error"].(map[string]interface{}) e, _ := b["error"].(map[string]interface{})
return "", fmt.Errorf("[%s] type: %v, reason: %v", res.Status(), e["type"], e["reason"]) return fmt.Errorf("[%s] type: %v, reason: %v", res.Status(), e["type"], e["reason"])
} }
// get the major version // get the major version
version, _ := b["version"].(map[string]interface{}) version, _ := b["version"].(map[string]interface{})
number, _ := version["number"].(string) number, _ := version["number"].(string)
if number == "" { if number == "" {
return "", fmt.Errorf("failed to detect elastic version number") return fmt.Errorf("failed to detect elastic version number")
} }
var c client
v := strings.Split(number, ".")[0] v := strings.Split(number, ".")[0]
return v, nil switch v {
case ElasticV5:
c, err = v5.New(es.host, es.index)
case ElasticV6:
c, err = v6.New(es.host, es.index)
case ElasticV7:
c, err = v7.New(es.host, es.index)
default:
err = fmt.Errorf("unsupported elasticsearch version %s", version)
}
if err != nil {
return err
}
es.c = c
es.version = v
return nil
} }
func (es Elasticsearch) GetCurrentStats(sf logging.SearchFilter) (logging.Statistics, error) { func (es *Elasticsearch) GetCurrentStats(sf logging.SearchFilter) (logging.Statistics, error) {
var err error
err = es.loadClient()
if err != nil {
return logging.Statistics{}, err
}
body, err := newBodyBuilder(). body, err := newBodyBuilder().
mainBool(sf). mainBool(sf).
cardinalityAggregation(). cardinalityAggregation().
@@ -129,7 +164,14 @@ func (es Elasticsearch) GetCurrentStats(sf logging.SearchFilter) (logging.Statis
nil nil
} }
func (es Elasticsearch) CountLogsByInterval(sf logging.SearchFilter, interval string) (logging.Histogram, error) { func (es *Elasticsearch) CountLogsByInterval(sf logging.SearchFilter, interval string) (logging.Histogram, error) {
var err error
err = es.loadClient()
if err != nil {
return logging.Histogram{}, err
}
body, err := newBodyBuilder(). body, err := newBodyBuilder().
mainBool(sf). mainBool(sf).
dateHistogramAggregation(interval). dateHistogramAggregation(interval).
@@ -159,7 +201,14 @@ func (es Elasticsearch) CountLogsByInterval(sf logging.SearchFilter, interval st
return h, nil return h, nil
} }
func (es Elasticsearch) SearchLogs(sf logging.SearchFilter, f, s int64, o string) (logging.Logs, error) { func (es *Elasticsearch) SearchLogs(sf logging.SearchFilter, f, s int64, o string) (logging.Logs, error) {
var err error
err = es.loadClient()
if err != nil {
return logging.Logs{}, err
}
body, err := newBodyBuilder(). body, err := newBodyBuilder().
mainBool(sf). mainBool(sf).
from(f). from(f).
@@ -194,10 +243,16 @@ func (es Elasticsearch) SearchLogs(sf logging.SearchFilter, f, s int64, o string
return l, nil return l, nil
} }
func (es Elasticsearch) ExportLogs(sf logging.SearchFilter, w io.Writer) error { func (es *Elasticsearch) ExportLogs(sf logging.SearchFilter, w io.Writer) error {
var err error
var id string var id string
var data []string var data []string
err = es.loadClient()
if err != nil {
return err
}
// Initial Search // Initial Search
body, err := newBodyBuilder(). body, err := newBodyBuilder().
mainBool(sf). mainBool(sf).

View File

@@ -14,7 +14,7 @@ import (
"testing" "testing"
) )
func TestDetectVersionMajor(t *testing.T) { func TestInitClient(t *testing.T) {
var tests = []struct { var tests = []struct {
fakeResp string fakeResp string
expected string expected string
@@ -34,12 +34,13 @@ func TestDetectVersionMajor(t *testing.T) {
es := mockElasticsearchService("/", test.fakeResp, http.StatusOK) es := mockElasticsearchService("/", test.fakeResp, http.StatusOK)
defer es.Close() defer es.Close()
result, err := detectVersionMajor(es.URL) client := &Elasticsearch{host: es.URL}
err := client.loadClient()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if diff := cmp.Diff(result, test.expected); diff != "" { if diff := cmp.Diff(client.version, test.expected); diff != "" {
t.Fatalf("%T differ (-got, +want): %s", test.expected, diff) t.Fatalf("%T differ (-got, +want): %s", test.expected, diff)
} }
}) })
@@ -222,15 +223,18 @@ func mockElasticsearchService(pattern, fakeResp string, fakeCode int) *httptest.
return httptest.NewServer(mux) return httptest.NewServer(mux)
} }
func newElasticsearchClient(srv *httptest.Server, version string) Elasticsearch { func newElasticsearchClient(srv *httptest.Server, version string) *Elasticsearch {
var es Elasticsearch var es *Elasticsearch
switch version { switch version {
case ElasticV5: case ElasticV5:
es = Elasticsearch{c: v5.New(srv.URL, "ks-logstash-log")} client, _ := v5.New(srv.URL, "ks-logstash-log")
es = &Elasticsearch{c: client}
case ElasticV6: case ElasticV6:
es = Elasticsearch{c: v6.New(srv.URL, "ks-logstash-log")} client, _ := v6.New(srv.URL, "ks-logstash-log")
es = &Elasticsearch{c: client}
case ElasticV7: case ElasticV7:
es = Elasticsearch{c: v7.New(srv.URL, "ks-logstash-log")} client, _ := v7.New(srv.URL, "ks-logstash-log")
es = &Elasticsearch{c: client}
} }
return es return es
} }

View File

@@ -8,7 +8,6 @@ import (
"github.com/elastic/go-elasticsearch/v5" "github.com/elastic/go-elasticsearch/v5"
"github.com/elastic/go-elasticsearch/v5/esapi" "github.com/elastic/go-elasticsearch/v5/esapi"
"io/ioutil" "io/ioutil"
"k8s.io/klog"
"time" "time"
) )
@@ -17,16 +16,12 @@ type Elastic struct {
index string index string
} }
func New(address string, index string) *Elastic { func New(address string, index string) (*Elastic, error) {
client, err := elasticsearch.NewClient(elasticsearch.Config{ client, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{address}, Addresses: []string{address},
}) })
if err != nil {
klog.Error(err)
return nil
}
return &Elastic{client: client, index: index} return &Elastic{client: client, index: index}, err
} }
func (e *Elastic) Search(body []byte, scroll bool) ([]byte, error) { func (e *Elastic) Search(body []byte, scroll bool) ([]byte, error) {

View File

@@ -8,7 +8,6 @@ import (
"github.com/elastic/go-elasticsearch/v6" "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi" "github.com/elastic/go-elasticsearch/v6/esapi"
"io/ioutil" "io/ioutil"
"k8s.io/klog"
"time" "time"
) )
@@ -17,16 +16,12 @@ type Elastic struct {
index string index string
} }
func New(address string, index string) *Elastic { func New(address string, index string) (*Elastic, error) {
client, err := elasticsearch.NewClient(elasticsearch.Config{ client, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{address}, Addresses: []string{address},
}) })
if err != nil {
klog.Error(err)
return nil
}
return &Elastic{Client: client, index: index} return &Elastic{Client: client, index: index}, err
} }
func (e *Elastic) Search(body []byte, scroll bool) ([]byte, error) { func (e *Elastic) Search(body []byte, scroll bool) ([]byte, error) {

View File

@@ -8,7 +8,6 @@ import (
"github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi" "github.com/elastic/go-elasticsearch/v7/esapi"
"io/ioutil" "io/ioutil"
"k8s.io/klog"
"time" "time"
) )
@@ -17,16 +16,12 @@ type Elastic struct {
index string index string
} }
func New(address string, index string) *Elastic { func New(address string, index string) (*Elastic, error) {
client, err := elasticsearch.NewClient(elasticsearch.Config{ client, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{address}, Addresses: []string{address},
}) })
if err != nil {
klog.Error(err)
return nil
}
return &Elastic{client: client, index: index} return &Elastic{client: client, index: index}, err
} }
func (e *Elastic) Search(body []byte, scroll bool) ([]byte, error) { func (e *Elastic) Search(body []byte, scroll bool) ([]byte, error) {