diff --git a/pkg/simple/client/logging/elasticsearch/elasticsearch.go b/pkg/simple/client/logging/elasticsearch/elasticsearch.go index 0b3d22fe3..c3acfe5e2 100644 --- a/pkg/simple/client/logging/elasticsearch/elasticsearch.go +++ b/pkg/simple/client/logging/elasticsearch/elasticsearch.go @@ -12,6 +12,7 @@ import ( "kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v7" "kubesphere.io/kubesphere/pkg/utils/stringutils" "strings" + "sync" ) const ( @@ -22,7 +23,12 @@ const ( // Elasticsearch implement logging interface type Elasticsearch struct { - c client + host string + version string + index string + + c client + mux sync.Mutex } // versioned es client interface @@ -34,76 +40,105 @@ type client interface { } func NewElasticsearch(options *Options) (*Elasticsearch, error) { - var version, index string - es := &Elasticsearch{} - - if options.Version == "" { - var err error - version, err = detectVersionMajor(options.Host) - if err != nil { - return nil, err - } - } else { - version = options.Version + var err error + es := &Elasticsearch{ + host: options.Host, + version: options.Version, + index: options.IndexPrefix, } - if options.IndexPrefix != "" { - index = options.IndexPrefix - } else { - index = "logstash" - } - - switch version { + switch es.version { case ElasticV5: - es.c = v5.New(options.Host, index) + es.c, err = v5.New(es.host, es.index) case ElasticV6: - es.c = v6.New(options.Host, index) + es.c, err = v6.New(es.host, es.index) case ElasticV7: - es.c = v7.New(options.Host, index) + es.c, err = v7.New(es.host, es.index) + case "": + es.c = nil default: - return nil, fmt.Errorf("unsupported elasticsearch version %s", version) + return nil, fmt.Errorf("unsupported elasticsearch version %s", es.version) } - return es, nil + return es, err } -func (es *Elasticsearch) ES() *client { - return &es.c -} +func (es *Elasticsearch) loadClient() error { + // Check if Elasticsearch client has been initialized. + if es.c != nil { + return nil + } -func detectVersionMajor(host string) (string, error) { - // Info APIs are backward compatible with versions of v5.x, v6.x and v7.x - es := v6.New(host, "") - res, err := es.Client.Info( - es.Client.Info.WithContext(context.Background()), + // Create Elasticsearch client. + es.mux.Lock() + defer es.mux.Unlock() + + if es.c != nil { + return nil + } + + // Detect Elasticsearch server version using Info API. + // Info API is backward compatible across v5, v6 and v7. + esv6, err := v6.New(es.host, "") + if err != nil { + return err + } + + res, err := esv6.Client.Info( + esv6.Client.Info.WithContext(context.Background()), ) if err != nil { - return "", err + return err } defer res.Body.Close() var b map[string]interface{} if err = jsoniter.NewDecoder(res.Body).Decode(&b); err != nil { - return "", err + return err } if res.IsError() { // Print the response status and error information. e, _ := b["error"].(map[string]interface{}) - return "", fmt.Errorf("[%s] type: %v, reason: %v", res.Status(), e["type"], e["reason"]) + return fmt.Errorf("[%s] type: %v, reason: %v", res.Status(), e["type"], e["reason"]) } // get the major version version, _ := b["version"].(map[string]interface{}) number, _ := version["number"].(string) if number == "" { - return "", fmt.Errorf("failed to detect elastic version number") + return fmt.Errorf("failed to detect elastic version number") } + var c client v := strings.Split(number, ".")[0] - return v, nil + switch v { + case ElasticV5: + c, err = v5.New(es.host, es.index) + case ElasticV6: + c, err = v6.New(es.host, es.index) + case ElasticV7: + c, err = v7.New(es.host, es.index) + default: + err = fmt.Errorf("unsupported elasticsearch version %s", version) + } + + if err != nil { + return err + } + + es.c = c + es.version = v + return nil } -func (es Elasticsearch) GetCurrentStats(sf logging.SearchFilter) (logging.Statistics, error) { +func (es *Elasticsearch) GetCurrentStats(sf logging.SearchFilter) (logging.Statistics, error) { + var err error + + err = es.loadClient() + if err != nil { + return logging.Statistics{}, err + } + body, err := newBodyBuilder(). mainBool(sf). cardinalityAggregation(). @@ -129,7 +164,14 @@ func (es Elasticsearch) GetCurrentStats(sf logging.SearchFilter) (logging.Statis nil } -func (es Elasticsearch) CountLogsByInterval(sf logging.SearchFilter, interval string) (logging.Histogram, error) { +func (es *Elasticsearch) CountLogsByInterval(sf logging.SearchFilter, interval string) (logging.Histogram, error) { + var err error + + err = es.loadClient() + if err != nil { + return logging.Histogram{}, err + } + body, err := newBodyBuilder(). mainBool(sf). dateHistogramAggregation(interval). @@ -159,7 +201,14 @@ func (es Elasticsearch) CountLogsByInterval(sf logging.SearchFilter, interval st return h, nil } -func (es Elasticsearch) SearchLogs(sf logging.SearchFilter, f, s int64, o string) (logging.Logs, error) { +func (es *Elasticsearch) SearchLogs(sf logging.SearchFilter, f, s int64, o string) (logging.Logs, error) { + var err error + + err = es.loadClient() + if err != nil { + return logging.Logs{}, err + } + body, err := newBodyBuilder(). mainBool(sf). from(f). @@ -194,10 +243,16 @@ func (es Elasticsearch) SearchLogs(sf logging.SearchFilter, f, s int64, o string return l, nil } -func (es Elasticsearch) ExportLogs(sf logging.SearchFilter, w io.Writer) error { +func (es *Elasticsearch) ExportLogs(sf logging.SearchFilter, w io.Writer) error { + var err error var id string var data []string + err = es.loadClient() + if err != nil { + return err + } + // Initial Search body, err := newBodyBuilder(). mainBool(sf). diff --git a/pkg/simple/client/logging/elasticsearch/elasticsearch_test.go b/pkg/simple/client/logging/elasticsearch/elasticsearch_test.go index 5c43370c3..362064cae 100644 --- a/pkg/simple/client/logging/elasticsearch/elasticsearch_test.go +++ b/pkg/simple/client/logging/elasticsearch/elasticsearch_test.go @@ -14,7 +14,7 @@ import ( "testing" ) -func TestDetectVersionMajor(t *testing.T) { +func TestInitClient(t *testing.T) { var tests = []struct { fakeResp string expected string @@ -34,12 +34,13 @@ func TestDetectVersionMajor(t *testing.T) { es := mockElasticsearchService("/", test.fakeResp, http.StatusOK) defer es.Close() - result, err := detectVersionMajor(es.URL) + client := &Elasticsearch{host: es.URL} + err := client.loadClient() if err != nil { t.Fatal(err) } - if diff := cmp.Diff(result, test.expected); diff != "" { + if diff := cmp.Diff(client.version, test.expected); diff != "" { t.Fatalf("%T differ (-got, +want): %s", test.expected, diff) } }) @@ -222,15 +223,18 @@ func mockElasticsearchService(pattern, fakeResp string, fakeCode int) *httptest. return httptest.NewServer(mux) } -func newElasticsearchClient(srv *httptest.Server, version string) Elasticsearch { - var es Elasticsearch +func newElasticsearchClient(srv *httptest.Server, version string) *Elasticsearch { + var es *Elasticsearch switch version { case ElasticV5: - es = Elasticsearch{c: v5.New(srv.URL, "ks-logstash-log")} + client, _ := v5.New(srv.URL, "ks-logstash-log") + es = &Elasticsearch{c: client} case ElasticV6: - es = Elasticsearch{c: v6.New(srv.URL, "ks-logstash-log")} + client, _ := v6.New(srv.URL, "ks-logstash-log") + es = &Elasticsearch{c: client} case ElasticV7: - es = Elasticsearch{c: v7.New(srv.URL, "ks-logstash-log")} + client, _ := v7.New(srv.URL, "ks-logstash-log") + es = &Elasticsearch{c: client} } return es } diff --git a/pkg/simple/client/logging/elasticsearch/versions/v5/v5.go b/pkg/simple/client/logging/elasticsearch/versions/v5/v5.go index b3344df9a..6cbe7a219 100644 --- a/pkg/simple/client/logging/elasticsearch/versions/v5/v5.go +++ b/pkg/simple/client/logging/elasticsearch/versions/v5/v5.go @@ -8,7 +8,6 @@ import ( "github.com/elastic/go-elasticsearch/v5" "github.com/elastic/go-elasticsearch/v5/esapi" "io/ioutil" - "k8s.io/klog" "time" ) @@ -17,16 +16,12 @@ type Elastic struct { index string } -func New(address string, index string) *Elastic { +func New(address string, index string) (*Elastic, error) { client, err := elasticsearch.NewClient(elasticsearch.Config{ Addresses: []string{address}, }) - if err != nil { - klog.Error(err) - return nil - } - return &Elastic{client: client, index: index} + return &Elastic{client: client, index: index}, err } func (e *Elastic) Search(body []byte, scroll bool) ([]byte, error) { diff --git a/pkg/simple/client/logging/elasticsearch/versions/v6/v6.go b/pkg/simple/client/logging/elasticsearch/versions/v6/v6.go index 198acdb7b..9616d5d65 100644 --- a/pkg/simple/client/logging/elasticsearch/versions/v6/v6.go +++ b/pkg/simple/client/logging/elasticsearch/versions/v6/v6.go @@ -8,7 +8,6 @@ import ( "github.com/elastic/go-elasticsearch/v6" "github.com/elastic/go-elasticsearch/v6/esapi" "io/ioutil" - "k8s.io/klog" "time" ) @@ -17,16 +16,12 @@ type Elastic struct { index string } -func New(address string, index string) *Elastic { +func New(address string, index string) (*Elastic, error) { client, err := elasticsearch.NewClient(elasticsearch.Config{ Addresses: []string{address}, }) - if err != nil { - klog.Error(err) - return nil - } - return &Elastic{Client: client, index: index} + return &Elastic{Client: client, index: index}, err } func (e *Elastic) Search(body []byte, scroll bool) ([]byte, error) { diff --git a/pkg/simple/client/logging/elasticsearch/versions/v7/v7.go b/pkg/simple/client/logging/elasticsearch/versions/v7/v7.go index 4ff3d1e70..f3dd3b0d3 100644 --- a/pkg/simple/client/logging/elasticsearch/versions/v7/v7.go +++ b/pkg/simple/client/logging/elasticsearch/versions/v7/v7.go @@ -8,7 +8,6 @@ import ( "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esapi" "io/ioutil" - "k8s.io/klog" "time" ) @@ -17,16 +16,12 @@ type Elastic struct { index string } -func New(address string, index string) *Elastic { +func New(address string, index string) (*Elastic, error) { client, err := elasticsearch.NewClient(elasticsearch.Config{ Addresses: []string{address}, }) - if err != nil { - klog.Error(err) - return nil - } - return &Elastic{client: client, index: index} + return &Elastic{client: client, index: index}, err } func (e *Elastic) Search(body []byte, scroll bool) ([]byte, error) {