intergrate opensearch v2 (#5044)

intergrate opensearch

Signed-off-by: chengdehao <dehaocheng@kubesphere.io>

Co-authored-by: chengdehao <dehaocheng@kubesphere.io>
This commit is contained in:
Elon Cheng
2022-07-10 02:14:38 -05:00
committed by GitHub
parent 9818aafa65
commit 5e2f41f0b9
191 changed files with 46773 additions and 7 deletions

View File

@@ -27,6 +27,7 @@ import (
"kubesphere.io/kubesphere/pkg/simple/client/es/query"
"kubesphere.io/kubesphere/pkg/simple/client/es/versions"
v2 "kubesphere.io/kubesphere/pkg/simple/client/es/versions/opensearchv2"
v5 "kubesphere.io/kubesphere/pkg/simple/client/es/versions/v5"
v6 "kubesphere.io/kubesphere/pkg/simple/client/es/versions/v6"
v7 "kubesphere.io/kubesphere/pkg/simple/client/es/versions/v7"
@@ -34,9 +35,10 @@ import (
)
const (
ElasticV5 = "5"
ElasticV6 = "6"
ElasticV7 = "7"
ElasticV5 = "5"
ElasticV6 = "6"
ElasticV7 = "7"
OpenSearchV2 = "opensearchv2"
)
// Elasticsearch client
@@ -64,6 +66,8 @@ func NewClient(host string, basicAuth bool, username, password, indexPrefix, ver
}
switch es.version {
case OpenSearchV2:
es.c, err = v2.New(es.host, es.basicAuth, es.username, es.password, es.index)
case ElasticV5:
es.c, err = v5.New(es.host, es.basicAuth, es.username, es.password, es.index)
case ElasticV6:
@@ -130,7 +134,13 @@ func (c *Client) loadClient() error {
var vc versions.Client
v := strings.Split(number, ".")[0]
distribution, _ := version["distribution"].(string)
if distribution == "opensearch" {
v = "opensearchv" + v
}
switch v {
case OpenSearchV2:
vc, err = v2.New(c.host, c.basicAuth, c.username, c.password, c.index)
case ElasticV5:
vc, err = v5.New(c.host, c.basicAuth, c.username, c.password, c.index)
case ElasticV6:

View File

@@ -0,0 +1,128 @@
/*
Copyright 2020 KubeSphere Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v2
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"time"
"github.com/opensearch-project/opensearch-go/v2"
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
"kubesphere.io/kubesphere/pkg/simple/client/es/versions"
)
type OpenSearch struct {
client *opensearch.Client
index string
}
func New(address string, basicAuth bool, username, password, index string) (*OpenSearch, error) {
var client *opensearch.Client
var err error
if !basicAuth {
username = ""
password = ""
}
client, err = opensearch.NewClient(opensearch.Config{
Addresses: []string{address},
Username: username,
Password: password,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
})
return &OpenSearch{client: client, index: index}, err
}
func (o *OpenSearch) Search(indices string, body []byte, scroll bool) ([]byte, error) {
opts := []func(*opensearchapi.SearchRequest){
o.client.Search.WithContext(context.Background()),
o.client.Search.WithIndex(indices),
o.client.Search.WithIgnoreUnavailable(true),
o.client.Search.WithBody(bytes.NewBuffer(body)),
}
if scroll {
opts = append(opts, o.client.Search.WithScroll(time.Minute))
}
response, err := o.client.Search(opts...)
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.IsError() {
return nil, parseError(response)
}
return ioutil.ReadAll(response.Body)
}
func (o *OpenSearch) Scroll(id string) ([]byte, error) {
response, err := o.client.Scroll(
o.client.Scroll.WithContext(context.Background()),
o.client.Scroll.WithScrollID(id),
o.client.Scroll.WithScroll(time.Minute))
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.IsError() {
return nil, parseError(response)
}
return ioutil.ReadAll(response.Body)
}
func (o *OpenSearch) ClearScroll(scrollId string) {
response, _ := o.client.ClearScroll(
o.client.ClearScroll.WithContext(context.Background()),
o.client.ClearScroll.WithScrollID(scrollId))
defer response.Body.Close()
}
func (o *OpenSearch) GetTotalHitCount(v interface{}) int64 {
f, _ := v.(float64)
return int64(f)
}
func parseError(response *opensearchapi.Response) error {
var e versions.Error
if err := json.NewDecoder(response.Body).Decode(&e); err != nil {
return err
} else {
// Print the response status and error information.
if len(e.Details.RootCause) != 0 {
return fmt.Errorf("type: %v, reason: %v", e.Details.Type, e.Details.RootCause[0].Reason)
} else {
return fmt.Errorf("type: %v, reason: %v", e.Details.Type, e.Details.Reason)
}
}
}

View File

@@ -1,3 +1,4 @@
//go:build tools
// +build tools
/*

View File

@@ -1,3 +1,4 @@
//go:build !windows
// +build !windows
/*