Merge pull request #656 from huanggze/log-2.1

logging: use elastic client for go
This commit is contained in:
KubeSphere CI Bot
2019-09-09 13:34:09 +08:00
committed by GitHub
799 changed files with 174665 additions and 69 deletions

View File

@@ -333,7 +333,7 @@ func syncFluentbitCRDOutputWithConfigMap(outputs []fb.OutputPlugin) error {
}
// Parse es host, port and index
func ParseEsOutputParams(params []fb.Parameter) *es.ESConfigs {
func ParseEsOutputParams(params []fb.Parameter) *es.Config {
var (
isEsFound bool
@@ -377,5 +377,5 @@ func ParseEsOutputParams(params []fb.Parameter) *es.ESConfigs {
}
}
return &es.ESConfigs{Host: host, Port: port, Index: index}
return &es.Config{Host: host, Port: port, Index: index}
}

View File

@@ -13,11 +13,9 @@ limitations under the License.
package esclient
import (
"bytes"
"encoding/json"
"fmt"
"github.com/golang/glog"
"io/ioutil"
"net/http"
"strconv"
"strings"
@@ -30,28 +28,31 @@ import (
var jsonIter = jsoniter.ConfigCompatibleWithStandardLibrary
var (
mu sync.RWMutex
esConfigs *ESConfigs
mu sync.Mutex
config *Config
client Client
)
type ESConfigs struct {
Host string
Port string
Index string
type Config struct {
Host string
Port string
Index string
VersionMajor string
}
func readESConfigs() *ESConfigs {
mu.RLock()
defer mu.RUnlock()
return esConfigs
}
func (configs *ESConfigs) WriteESConfigs() {
func (cfg *Config) WriteESConfigs() {
mu.Lock()
defer mu.Unlock()
esConfigs = configs
config = cfg
if err := detectVersionMajor(config); err != nil {
glog.Errorln(err)
client = nil
return
}
client = NewForConfig(config)
}
type Request struct {
@@ -303,8 +304,9 @@ type Shards struct {
}
type Hits struct {
Total int64 `json:"total"`
Hits []Hit `json:"hits"`
// As of ElasticSearch v7.x, hits.total is changed
Total interface{} `json:"total"`
Hits []Hit `json:"hits"`
}
type Hit struct {
@@ -427,7 +429,7 @@ func calcTimestamp(input string) int64 {
return ret
}
func parseQueryResult(operation int, param QueryParameters, body []byte, query []byte) *QueryResult {
func parseQueryResult(operation int, param QueryParameters, body []byte) *QueryResult {
var queryResult QueryResult
var response Response
@@ -457,7 +459,7 @@ func parseQueryResult(operation int, param QueryParameters, body []byte, query [
switch operation {
case OperationQuery:
var readResult ReadResult
readResult.Total = response.Hits.Total
readResult.Total = client.GetTotalHitCount(response.Hits.Total)
readResult.From = param.From
readResult.Size = param.Size
for _, hit := range response.Hits.Hits {
@@ -476,24 +478,24 @@ func parseQueryResult(operation int, param QueryParameters, body []byte, query [
case OperationStatistics:
var statisticsResponse StatisticsResponseAggregations
err := jsonIter.Unmarshal(response.Aggregations, &statisticsResponse)
if err != nil {
if err != nil && response.Aggregations != nil {
glog.Errorln(err)
queryResult.Status = http.StatusInternalServerError
queryResult.Error = err.Error()
return &queryResult
}
queryResult.Statistics = &StatisticsResult{Containers: statisticsResponse.ContainerCount.Value, Logs: response.Hits.Total}
queryResult.Statistics = &StatisticsResult{Containers: statisticsResponse.ContainerCount.Value, Logs: client.GetTotalHitCount(response.Hits.Total)}
case OperationHistogram:
var histogramResult HistogramResult
histogramResult.Total = response.Hits.Total
histogramResult.Total = client.GetTotalHitCount(response.Hits.Total)
histogramResult.StartTime = calcTimestamp(param.StartTime)
histogramResult.EndTime = calcTimestamp(param.EndTime)
histogramResult.Interval = param.Interval
var histogramAggregations HistogramAggregations
err := jsonIter.Unmarshal(response.Aggregations, &histogramAggregations)
if err != nil {
if err != nil && response.Aggregations != nil {
glog.Errorln(err)
queryResult.Status = http.StatusInternalServerError
queryResult.Error = err.Error()
@@ -541,58 +543,25 @@ type QueryParameters struct {
Size int64
}
func stubResult() *QueryResult {
var queryResult QueryResult
queryResult.Status = http.StatusOK
return &queryResult
}
func Query(param QueryParameters) *QueryResult {
var queryResult *QueryResult
client := &http.Client{}
var queryResult = new(QueryResult)
if client == nil {
queryResult.Status = http.StatusBadRequest
queryResult.Error = fmt.Sprintf("Invalid elasticsearch address: host=%s, port=%s", config.Host, config.Port)
return queryResult
}
operation, query, err := createQueryRequest(param)
if err != nil {
queryResult = new(QueryResult)
queryResult.Status = http.StatusInternalServerError
queryResult.Error = err.Error()
return queryResult
}
es := readESConfigs()
if es == nil {
queryResult = new(QueryResult)
queryResult.Status = http.StatusInternalServerError
queryResult.Error = "Elasticsearch configurations not found. Please check if they are properly configured."
return queryResult
}
url := fmt.Sprintf("http://%s:%s/%s*/_search", es.Host, es.Port, es.Index)
request, err := http.NewRequest("GET", url, bytes.NewBuffer(query))
if err != nil {
glog.Errorln(err)
queryResult = new(QueryResult)
queryResult.Status = http.StatusInternalServerError
queryResult.Error = err.Error()
return queryResult
}
request.Header.Set("Content-Type", "application/json; charset=utf-8")
response, err := client.Do(request)
if err != nil {
glog.Errorln(err)
queryResult = new(QueryResult)
queryResult.Status = http.StatusInternalServerError
queryResult.Error = err.Error()
return queryResult
}
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
body, err := client.Search(query)
if err != nil {
glog.Errorln(err)
queryResult = new(QueryResult)
@@ -601,7 +570,7 @@ func Query(param QueryParameters) *QueryResult {
return queryResult
}
queryResult = parseQueryResult(operation, param, body, query)
queryResult = parseQueryResult(operation, param, body)
return queryResult
}

View File

@@ -0,0 +1,72 @@
package esclient
import (
"context"
"encoding/json"
"fmt"
v5 "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch/versions/v5"
v6 "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch/versions/v6"
v7 "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch/versions/v7"
"strings"
)
const (
ElasticV5 = "5"
ElasticV6 = "6"
ElasticV7 = "7"
)
type Client interface {
// Perform Search API
Search(body []byte) ([]byte, error)
GetTotalHitCount(v interface{}) int64
}
func NewForConfig(cfg *Config) Client {
address := fmt.Sprintf("http://%s:%s", cfg.Host, cfg.Port)
index := cfg.Index
switch cfg.VersionMajor {
case ElasticV5:
return v5.New(address, index)
case ElasticV6:
return v6.New(address, index)
case ElasticV7:
return v7.New(address, index)
default:
return nil
}
}
func detectVersionMajor(cfg *Config) error {
// Info APIs are backward compatible with versions of v5.x, v6.x and v7.x
address := fmt.Sprintf("http://%s:%s", cfg.Host, cfg.Port)
es := v6.New(address, "")
res, err := es.Client.Info(
es.Client.Info.WithContext(context.Background()),
)
if err != nil {
return err
}
defer res.Body.Close()
var b map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&b); err != nil {
return err
}
if res.IsError() {
// Print the response status and error information.
e, _ := b["error"].(map[string]interface{})
return fmt.Errorf("[%s] %s: %s", res.Status(), e["type"], e["reason"])
}
// get the major version
version, _ := b["version"].(map[string]interface{})
number, _ := version["number"].(string)
if number == "" {
return fmt.Errorf("failed to detect elastic version number")
}
cfg.VersionMajor = strings.Split(number, ".")[0]
return nil
}

View File

@@ -0,0 +1,55 @@
package v5
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v5"
"io/ioutil"
)
type Elastic struct {
client *elasticsearch.Client
index string
}
func New(address string, index string) Elastic {
client, _ := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{address},
})
return Elastic{client: client, index: index}
}
func (e Elastic) Search(body []byte) ([]byte, error) {
response, err := e.client.Search(
e.client.Search.WithContext(context.Background()),
e.client.Search.WithIndex(fmt.Sprintf("%s*", e.index)),
e.client.Search.WithBody(bytes.NewBuffer(body)),
)
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(response.Body).Decode(&e); err != nil {
return nil, err
} else {
// Print the response status and error information.
e, _ := e["error"].(map[string]interface{})
return nil, fmt.Errorf("[%s] %s: %s", response.Status(), e["type"], e["reason"])
}
}
return ioutil.ReadAll(response.Body)
}
func (e Elastic) GetTotalHitCount(v interface{}) int64 {
f, _ := v.(float64)
return int64(f)
}

View File

@@ -0,0 +1,55 @@
package v6
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v6"
"io/ioutil"
)
type Elastic struct {
Client *elasticsearch.Client
index string
}
func New(address string, index string) Elastic {
client, _ := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{address},
})
return Elastic{Client: client, index: index}
}
func (e Elastic) Search(body []byte) ([]byte, error) {
response, err := e.Client.Search(
e.Client.Search.WithContext(context.Background()),
e.Client.Search.WithIndex(fmt.Sprintf("%s*", e.index)),
e.Client.Search.WithBody(bytes.NewBuffer(body)),
)
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(response.Body).Decode(&e); err != nil {
return nil, err
} else {
// Print the response status and error information.
e, _ := e["error"].(map[string]interface{})
return nil, fmt.Errorf("[%s] %s: %s", response.Status(), e["type"], e["reason"])
}
}
return ioutil.ReadAll(response.Body)
}
func (e Elastic) GetTotalHitCount(v interface{}) int64 {
f, _ := v.(float64)
return int64(f)
}

View File

@@ -0,0 +1,57 @@
package v7
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v7"
"io/ioutil"
)
type Elastic struct {
client *elasticsearch.Client
index string
}
func New(address string, index string) Elastic {
client, _ := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{address},
})
return Elastic{client: client, index: index}
}
func (e Elastic) Search(body []byte) ([]byte, error) {
response, err := e.client.Search(
e.client.Search.WithContext(context.Background()),
e.client.Search.WithIndex(fmt.Sprintf("%s*", e.index)),
e.client.Search.WithTrackTotalHits(true),
e.client.Search.WithBody(bytes.NewBuffer(body)),
)
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(response.Body).Decode(&e); err != nil {
return nil, err
} else {
// Print the response status and error information.
e, _ := e["error"].(map[string]interface{})
return nil, fmt.Errorf("[%s] %s: %s", response.Status(), e["type"], e["reason"])
}
}
return ioutil.ReadAll(response.Body)
}
func (e Elastic) GetTotalHitCount(v interface{}) int64 {
m, _ := v.(map[string]interface{})
f, _ := m["value"].(float64)
return int64(f)
}