@@ -507,7 +507,7 @@ func syncFluentbitCRDOutputWithConfigMap(outputs []fb.OutputPlugin) error {
|
||||
}
|
||||
|
||||
// Parse es host, port and index
|
||||
func ParseEsOutputParams(params []fb.Parameter) *es.ESConfigs {
|
||||
func ParseEsOutputParams(params []fb.Parameter) *es.Config {
|
||||
|
||||
var (
|
||||
isEsFound bool
|
||||
@@ -551,5 +551,5 @@ func ParseEsOutputParams(params []fb.Parameter) *es.ESConfigs {
|
||||
}
|
||||
}
|
||||
|
||||
return &es.ESConfigs{Host: host, Port: port, Index: index}
|
||||
return &es.Config{Host: host, Port: port, Index: index}
|
||||
}
|
||||
|
||||
@@ -13,11 +13,9 @@ limitations under the License.
|
||||
package esclient
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/golang/glog"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -30,28 +28,31 @@ import (
|
||||
var jsonIter = jsoniter.ConfigCompatibleWithStandardLibrary
|
||||
|
||||
var (
|
||||
mu sync.RWMutex
|
||||
esConfigs *ESConfigs
|
||||
mu sync.Mutex
|
||||
config *Config
|
||||
|
||||
client Client
|
||||
)
|
||||
|
||||
type ESConfigs struct {
|
||||
Host string
|
||||
Port string
|
||||
Index string
|
||||
type Config struct {
|
||||
Host string
|
||||
Port string
|
||||
Index string
|
||||
VersionMajor string
|
||||
}
|
||||
|
||||
func readESConfigs() *ESConfigs {
|
||||
mu.RLock()
|
||||
defer mu.RUnlock()
|
||||
|
||||
return esConfigs
|
||||
}
|
||||
|
||||
func (configs *ESConfigs) WriteESConfigs() {
|
||||
func (cfg *Config) WriteESConfigs() {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
esConfigs = configs
|
||||
config = cfg
|
||||
if err := detectVersionMajor(config); err != nil {
|
||||
glog.Errorln(err)
|
||||
client = nil
|
||||
return
|
||||
}
|
||||
|
||||
client = NewForConfig(config)
|
||||
}
|
||||
|
||||
type Request struct {
|
||||
@@ -303,8 +304,9 @@ type Shards struct {
|
||||
}
|
||||
|
||||
type Hits struct {
|
||||
Total int64 `json:"total"`
|
||||
Hits []Hit `json:"hits"`
|
||||
// As of ElasticSearch v7.x, hits.total is changed
|
||||
Total interface{} `json:"total"`
|
||||
Hits []Hit `json:"hits"`
|
||||
}
|
||||
|
||||
type Hit struct {
|
||||
@@ -457,7 +459,7 @@ func parseQueryResult(operation int, param QueryParameters, body []byte, query [
|
||||
switch operation {
|
||||
case OperationQuery:
|
||||
var readResult ReadResult
|
||||
readResult.Total = response.Hits.Total
|
||||
readResult.Total = client.GetTotalHitCount(response.Hits.Total)
|
||||
readResult.From = param.From
|
||||
readResult.Size = param.Size
|
||||
for _, hit := range response.Hits.Hits {
|
||||
@@ -476,24 +478,24 @@ func parseQueryResult(operation int, param QueryParameters, body []byte, query [
|
||||
case OperationStatistics:
|
||||
var statisticsResponse StatisticsResponseAggregations
|
||||
err := jsonIter.Unmarshal(response.Aggregations, &statisticsResponse)
|
||||
if err != nil {
|
||||
if err != nil && response.Aggregations != nil {
|
||||
glog.Errorln(err)
|
||||
queryResult.Status = http.StatusInternalServerError
|
||||
queryResult.Error = err.Error()
|
||||
return &queryResult
|
||||
}
|
||||
queryResult.Statistics = &StatisticsResult{Containers: statisticsResponse.ContainerCount.Value, Logs: response.Hits.Total}
|
||||
queryResult.Statistics = &StatisticsResult{Containers: statisticsResponse.ContainerCount.Value, Logs: client.GetTotalHitCount(response.Hits.Total)}
|
||||
|
||||
case OperationHistogram:
|
||||
var histogramResult HistogramResult
|
||||
histogramResult.Total = response.Hits.Total
|
||||
histogramResult.Total = client.GetTotalHitCount(response.Hits.Total)
|
||||
histogramResult.StartTime = calcTimestamp(param.StartTime)
|
||||
histogramResult.EndTime = calcTimestamp(param.EndTime)
|
||||
histogramResult.Interval = param.Interval
|
||||
|
||||
var histogramAggregations HistogramAggregations
|
||||
err := jsonIter.Unmarshal(response.Aggregations, &histogramAggregations)
|
||||
if err != nil {
|
||||
if err != nil && response.Aggregations != nil {
|
||||
glog.Errorln(err)
|
||||
queryResult.Status = http.StatusInternalServerError
|
||||
queryResult.Error = err.Error()
|
||||
@@ -541,58 +543,25 @@ type QueryParameters struct {
|
||||
Size int64
|
||||
}
|
||||
|
||||
func stubResult() *QueryResult {
|
||||
var queryResult QueryResult
|
||||
|
||||
queryResult.Status = http.StatusOK
|
||||
|
||||
return &queryResult
|
||||
}
|
||||
|
||||
func Query(param QueryParameters) *QueryResult {
|
||||
var queryResult *QueryResult
|
||||
|
||||
client := &http.Client{}
|
||||
var queryResult = new(QueryResult)
|
||||
|
||||
if client == nil {
|
||||
queryResult.Status = http.StatusBadRequest
|
||||
queryResult.Error = fmt.Sprintf("Invalid elasticsearch address: host=%s, port=%s", config.Host, config.Port)
|
||||
return queryResult
|
||||
}
|
||||
|
||||
operation, query, err := createQueryRequest(param)
|
||||
if err != nil {
|
||||
queryResult = new(QueryResult)
|
||||
queryResult.Status = http.StatusInternalServerError
|
||||
queryResult.Error = err.Error()
|
||||
return queryResult
|
||||
}
|
||||
|
||||
es := readESConfigs()
|
||||
if es == nil {
|
||||
queryResult = new(QueryResult)
|
||||
queryResult.Status = http.StatusInternalServerError
|
||||
queryResult.Error = "Elasticsearch configurations not found. Please check if they are properly configured."
|
||||
return queryResult
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("http://%s:%s/%s*/_search", es.Host, es.Port, es.Index)
|
||||
|
||||
request, err := http.NewRequest("GET", url, bytes.NewBuffer(query))
|
||||
if err != nil {
|
||||
glog.Errorln(err)
|
||||
queryResult = new(QueryResult)
|
||||
queryResult.Status = http.StatusInternalServerError
|
||||
queryResult.Error = err.Error()
|
||||
return queryResult
|
||||
}
|
||||
request.Header.Set("Content-Type", "application/json; charset=utf-8")
|
||||
|
||||
response, err := client.Do(request)
|
||||
if err != nil {
|
||||
glog.Errorln(err)
|
||||
queryResult = new(QueryResult)
|
||||
queryResult.Status = http.StatusInternalServerError
|
||||
queryResult.Error = err.Error()
|
||||
return queryResult
|
||||
}
|
||||
defer response.Body.Close()
|
||||
|
||||
body, err := ioutil.ReadAll(response.Body)
|
||||
body, err := client.Search(query, config.Index)
|
||||
if err != nil {
|
||||
glog.Errorln(err)
|
||||
queryResult = new(QueryResult)
|
||||
|
||||
69
pkg/simple/client/elasticsearch/interface.go
Normal file
69
pkg/simple/client/elasticsearch/interface.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package esclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
v5 "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch/versions/v5"
|
||||
v6 "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch/versions/v6"
|
||||
v7 "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch/versions/v7"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
ElasticV5 = "5"
|
||||
ElasticV6 = "6"
|
||||
ElasticV7 = "7"
|
||||
)
|
||||
|
||||
type Client interface {
|
||||
// Perform Search API
|
||||
Search(body []byte, indexPrefix string) ([]byte, error)
|
||||
GetTotalHitCount(v interface{}) int64
|
||||
}
|
||||
|
||||
func NewForConfig(cfg *Config) Client {
|
||||
address := fmt.Sprintf("http://%s:%s", cfg.Host, cfg.Port)
|
||||
switch cfg.VersionMajor {
|
||||
case ElasticV5:
|
||||
return v5.New(address)
|
||||
case ElasticV6:
|
||||
return v6.New(address)
|
||||
case ElasticV7:
|
||||
return v7.New(address)
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func detectVersionMajor(cfg *Config) error {
|
||||
|
||||
// Info APIs are backward compatible with versions of v5.x, v6.x and v7.x
|
||||
address := fmt.Sprintf("http://%s:%s", cfg.Host, cfg.Port)
|
||||
es := v6.New(address)
|
||||
res, err := es.Client.Info(
|
||||
es.Client.Info.WithContext(context.Background()),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
var b map[string]interface{}
|
||||
if err := json.NewDecoder(res.Body).Decode(&b); err != nil {
|
||||
return err
|
||||
}
|
||||
if res.IsError() {
|
||||
// Print the response status and error information.
|
||||
return fmt.Errorf("[%s] %s: %s",
|
||||
res.Status(),
|
||||
b["error"].(map[string]interface{})["type"],
|
||||
b["error"].(map[string]interface{})["reason"],
|
||||
)
|
||||
}
|
||||
|
||||
// get the major version
|
||||
version := fmt.Sprintf("%v", b["version"].(map[string]interface{})["number"])
|
||||
cfg.VersionMajor = strings.Split(version, ".")[0]
|
||||
return nil
|
||||
}
|
||||
56
pkg/simple/client/elasticsearch/versions/v5/elasticsearch.go
Normal file
56
pkg/simple/client/elasticsearch/versions/v5/elasticsearch.go
Normal file
@@ -0,0 +1,56 @@
|
||||
package v5
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/elastic/go-elasticsearch/v5"
|
||||
"io/ioutil"
|
||||
)
|
||||
|
||||
type Elastic struct {
|
||||
Client *elasticsearch.Client
|
||||
}
|
||||
|
||||
func New(address string) Elastic {
|
||||
|
||||
client, _ := elasticsearch.NewClient(elasticsearch.Config{
|
||||
Addresses: []string{address},
|
||||
})
|
||||
|
||||
return Elastic{Client: client}
|
||||
}
|
||||
|
||||
func (e Elastic) Search(body []byte, indexPrefix string) ([]byte, error) {
|
||||
|
||||
response, err := e.Client.Search(
|
||||
e.Client.Search.WithContext(context.Background()),
|
||||
e.Client.Search.WithIndex(fmt.Sprintf("%s*", indexPrefix)),
|
||||
e.Client.Search.WithBody(bytes.NewBuffer(body)),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer response.Body.Close()
|
||||
|
||||
if response.IsError() {
|
||||
var e map[string]interface{}
|
||||
if err := json.NewDecoder(response.Body).Decode(&e); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
// Print the response status and error information.
|
||||
return nil, fmt.Errorf("[%s] %s: %s",
|
||||
response.Status(),
|
||||
e["error"].(map[string]interface{})["type"],
|
||||
e["error"].(map[string]interface{})["reason"],
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return ioutil.ReadAll(response.Body)
|
||||
}
|
||||
|
||||
func (e Elastic) GetTotalHitCount(v interface{}) int64 {
|
||||
return int64(v.(float64))
|
||||
}
|
||||
56
pkg/simple/client/elasticsearch/versions/v6/elasticsearch.go
Normal file
56
pkg/simple/client/elasticsearch/versions/v6/elasticsearch.go
Normal file
@@ -0,0 +1,56 @@
|
||||
package v6
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/elastic/go-elasticsearch/v6"
|
||||
"io/ioutil"
|
||||
)
|
||||
|
||||
type Elastic struct {
|
||||
Client *elasticsearch.Client
|
||||
}
|
||||
|
||||
func New(address string) Elastic {
|
||||
|
||||
client, _ := elasticsearch.NewClient(elasticsearch.Config{
|
||||
Addresses: []string{address},
|
||||
})
|
||||
|
||||
return Elastic{Client: client}
|
||||
}
|
||||
|
||||
func (e Elastic) Search(body []byte, indexPrefix string) ([]byte, error) {
|
||||
|
||||
response, err := e.Client.Search(
|
||||
e.Client.Search.WithContext(context.Background()),
|
||||
e.Client.Search.WithIndex(fmt.Sprintf("%s*", indexPrefix)),
|
||||
e.Client.Search.WithBody(bytes.NewBuffer(body)),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer response.Body.Close()
|
||||
|
||||
if response.IsError() {
|
||||
var e map[string]interface{}
|
||||
if err := json.NewDecoder(response.Body).Decode(&e); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
// Print the response status and error information.
|
||||
return nil, fmt.Errorf("[%s] %s: %s",
|
||||
response.Status(),
|
||||
e["error"].(map[string]interface{})["type"],
|
||||
e["error"].(map[string]interface{})["reason"],
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return ioutil.ReadAll(response.Body)
|
||||
}
|
||||
|
||||
func (e Elastic) GetTotalHitCount(v interface{}) int64 {
|
||||
return int64(v.(float64))
|
||||
}
|
||||
57
pkg/simple/client/elasticsearch/versions/v7/elasticsearch.go
Normal file
57
pkg/simple/client/elasticsearch/versions/v7/elasticsearch.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package v7
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/elastic/go-elasticsearch/v7"
|
||||
"io/ioutil"
|
||||
)
|
||||
|
||||
type Elastic struct {
|
||||
Client *elasticsearch.Client
|
||||
}
|
||||
|
||||
func New(address string) Elastic {
|
||||
|
||||
client, _ := elasticsearch.NewClient(elasticsearch.Config{
|
||||
Addresses: []string{address},
|
||||
})
|
||||
|
||||
return Elastic{Client: client}
|
||||
}
|
||||
|
||||
func (e Elastic) Search(body []byte, indexPrefix string) ([]byte, error) {
|
||||
|
||||
response, err := e.Client.Search(
|
||||
e.Client.Search.WithContext(context.Background()),
|
||||
e.Client.Search.WithIndex(fmt.Sprintf("%s*", indexPrefix)),
|
||||
e.Client.Search.WithTrackTotalHits(true),
|
||||
e.Client.Search.WithBody(bytes.NewBuffer(body)),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer response.Body.Close()
|
||||
|
||||
if response.IsError() {
|
||||
var e map[string]interface{}
|
||||
if err := json.NewDecoder(response.Body).Decode(&e); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
// Print the response status and error information.
|
||||
return nil, fmt.Errorf("[%s] %s: %s",
|
||||
response.Status(),
|
||||
e["error"].(map[string]interface{})["type"],
|
||||
e["error"].(map[string]interface{})["reason"],
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return ioutil.ReadAll(response.Body)
|
||||
}
|
||||
|
||||
func (e Elastic) GetTotalHitCount(v interface{}) int64 {
|
||||
return int64(v.(map[string]interface{})["value"].(float64))
|
||||
}
|
||||
Reference in New Issue
Block a user