add events search apis

This commit is contained in:
junotx
2020-05-11 15:29:25 +08:00
parent 044dd8eba3
commit 8f5ca7673d
19 changed files with 1756 additions and 327 deletions

View File

@@ -0,0 +1,147 @@
package elasticsearch
import (
"fmt"
es5 "github.com/elastic/go-elasticsearch/v5"
es5api "github.com/elastic/go-elasticsearch/v5/esapi"
es6 "github.com/elastic/go-elasticsearch/v6"
es6api "github.com/elastic/go-elasticsearch/v6/esapi"
es7 "github.com/elastic/go-elasticsearch/v7"
es7api "github.com/elastic/go-elasticsearch/v7/esapi"
jsoniter "github.com/json-iterator/go"
"io"
"net/http"
)
type Request struct {
Index string
Body io.Reader
}
type Response struct {
Hits Hits `json:"hits"`
Aggregations map[string]jsoniter.RawMessage `json:"aggregations"`
}
type Hits struct {
Total int64 `json:"total"`
Hits jsoniter.RawMessage `json:"hits"`
}
type Error struct {
Type string `json:"type"`
Reason string `json:"reason"`
Status int `json:"status"`
}
func (e Error) Error() string {
return fmt.Sprintf("%s %s: %s", http.StatusText(e.Status), e.Type, e.Reason)
}
type ClientV5 es5.Client
func (c *ClientV5) ExSearch(r *Request) (*Response, error) {
return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body)))
}
func (c *ClientV5) parse(resp *es5api.Response, err error) (*Response, error) {
if err != nil {
return nil, fmt.Errorf("error getting response: %s", err)
}
defer resp.Body.Close()
if resp.IsError() {
return nil, fmt.Errorf(resp.String())
}
var r struct {
Hits struct {
Total int64 `json:"total"`
Hits jsoniter.RawMessage `json:"hits"`
} `json:"hits"`
Aggregations map[string]jsoniter.RawMessage `json:"aggregations"`
}
if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
return nil, fmt.Errorf("error parsing the response body: %s", err)
}
return &Response{
Hits: Hits{Total: r.Hits.Total, Hits: r.Hits.Hits},
Aggregations: r.Aggregations,
}, nil
}
func (c *ClientV5) Version() (string, error) {
res, err := c.Info()
if err != nil {
return "", err
}
defer res.Body.Close()
if res.IsError() {
return "", fmt.Errorf(res.String())
}
var r map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
return "", fmt.Errorf("error parsing the response body: %s", err)
}
return fmt.Sprintf("%s", r["version"].(map[string]interface{})["number"]), nil
}
type ClientV6 es6.Client
func (c *ClientV6) ExSearch(r *Request) (*Response, error) {
return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body)))
}
func (c *ClientV6) parse(resp *es6api.Response, err error) (*Response, error) {
if err != nil {
return nil, fmt.Errorf("error getting response: %s", err)
}
defer resp.Body.Close()
if resp.IsError() {
return nil, fmt.Errorf(resp.String())
}
var r struct {
Hits *struct {
Total int64 `json:"total"`
Hits jsoniter.RawMessage `json:"hits"`
} `json:"hits"`
Aggregations map[string]jsoniter.RawMessage `json:"aggregations"`
}
if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
return nil, fmt.Errorf("error parsing the response body: %s", err)
}
return &Response{
Hits: Hits{Total: r.Hits.Total, Hits: r.Hits.Hits},
Aggregations: r.Aggregations,
}, nil
}
type ClientV7 es7.Client
func (c *ClientV7) ExSearch(r *Request) (*Response, error) {
return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body)))
}
func (c *ClientV7) parse(resp *es7api.Response, err error) (*Response, error) {
if err != nil {
return nil, fmt.Errorf("error getting response: %s", err)
}
defer resp.Body.Close()
if resp.IsError() {
return nil, fmt.Errorf(resp.String())
}
var r struct {
Hits *struct {
Total struct {
Value int64 `json:"value"`
} `json:"total"`
Hits jsoniter.RawMessage `json:"hits"`
} `json:"hits"`
Aggregations map[string]jsoniter.RawMessage `json:"aggregations"`
}
if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
return nil, fmt.Errorf("error parsing the response body: %s", err)
}
return &Response{
Hits: Hits{Total: r.Hits.Total.Value, Hits: r.Hits.Hits},
Aggregations: r.Aggregations,
}, nil
}
type client interface {
ExSearch(r *Request) (*Response, error)
}

View File

@@ -0,0 +1,338 @@
package elasticsearch
import (
"bytes"
"fmt"
"strings"
"time"
es5 "github.com/elastic/go-elasticsearch/v5"
es6 "github.com/elastic/go-elasticsearch/v6"
es7 "github.com/elastic/go-elasticsearch/v7"
jsoniter "github.com/json-iterator/go"
corev1 "k8s.io/api/core/v1"
"kubesphere.io/kubesphere/pkg/simple/client/events"
)
var json = jsoniter.ConfigCompatibleWithStandardLibrary
type Elasticsearch struct {
c client
opts struct {
index string
}
}
func (es *Elasticsearch) SearchEvents(filter *events.Filter, from, size int64,
sort string) (*events.Events, error) {
queryPart := parseToQueryPart(filter)
if sort == "" {
sort = "desc"
}
sortPart := []map[string]interface{}{{
"lastTimestamp": map[string]string{"order": sort},
}}
b := map[string]interface{}{
"from": from,
"size": size,
"query": queryPart,
"sort": sortPart,
}
body, err := json.Marshal(b)
if err != nil {
return nil, err
}
resp, err := es.c.ExSearch(&Request{
Index: es.opts.index,
Body: bytes.NewBuffer(body),
})
if err != nil || resp == nil {
return nil, err
}
var innerHits []struct {
*corev1.Event `json:"_source"`
}
if err := json.Unmarshal(resp.Hits.Hits, &innerHits); err != nil {
return nil, err
}
evts := events.Events{Total: resp.Hits.Total}
for _, hit := range innerHits {
evts.Records = append(evts.Records, hit.Event)
}
return &evts, nil
}
func (es *Elasticsearch) CountOverTime(filter *events.Filter, interval string) (*events.Histogram, error) {
if interval == "" {
interval = "15m"
}
queryPart := parseToQueryPart(filter)
aggName := "events_count_over_lasttimestamp"
aggsPart := map[string]interface{}{
aggName: map[string]interface{}{
"date_histogram": map[string]string{
"field": "lastTimestamp",
"interval": interval,
},
},
}
b := map[string]interface{}{
"query": queryPart,
"aggs": aggsPart,
"size": 0, // do not get docs
}
body, err := json.Marshal(b)
if err != nil {
return nil, err
}
resp, err := es.c.ExSearch(&Request{
Index: es.opts.index,
Body: bytes.NewBuffer(body),
})
if err != nil || resp == nil {
return nil, err
}
raw := resp.Aggregations[aggName]
var agg struct {
Buckets []struct {
KeyAsString string `json:"key_as_string"`
Key int64 `json:"key"`
DocCount int64 `json:"doc_count"`
} `json:"buckets"`
}
if err := json.Unmarshal(raw, &agg); err != nil {
return nil, err
}
histo := events.Histogram{Total: int64(len(agg.Buckets))}
for _, b := range agg.Buckets {
histo.Buckets = append(histo.Buckets,
events.Bucket{Time: b.Key, Count: b.DocCount})
}
return &histo, nil
}
func (es *Elasticsearch) StatisticsOnResources(filter *events.Filter) (*events.Statistics, error) {
queryPart := parseToQueryPart(filter)
aggName := "resources_count"
aggsPart := map[string]interface{}{
aggName: map[string]interface{}{
"cardinality": map[string]string{
"field": "involvedObject.uid.keyword",
},
},
}
b := map[string]interface{}{
"query": queryPart,
"aggs": aggsPart,
"size": 0, // do not get docs
}
body, err := json.Marshal(b)
if err != nil {
return nil, err
}
resp, err := es.c.ExSearch(&Request{
Index: es.opts.index,
Body: bytes.NewBuffer(body),
})
if err != nil || resp == nil {
return nil, err
}
raw := resp.Aggregations[aggName]
var agg struct {
Value int64 `json:"value"`
}
if err := json.Unmarshal(raw, &agg); err != nil {
return nil, err
}
return &events.Statistics{
Resources: agg.Value,
Events: resp.Hits.Total,
}, nil
}
func NewClient(options *Options) (*Elasticsearch, error) {
clientV5 := func() (*ClientV5, error) {
c, err := es5.NewClient(es5.Config{Addresses: []string{options.Host}})
if err != nil {
return nil, err
}
return (*ClientV5)(c), nil
}
clientV6 := func() (*ClientV6, error) {
c, err := es6.NewClient(es6.Config{Addresses: []string{options.Host}})
if err != nil {
return nil, err
}
return (*ClientV6)(c), nil
}
clientV7 := func() (*ClientV7, error) {
c, err := es7.NewClient(es7.Config{Addresses: []string{options.Host}})
if err != nil {
return nil, err
}
return (*ClientV7)(c), nil
}
var (
version = options.Version
es = Elasticsearch{}
err error
)
es.opts.index = fmt.Sprintf("%s*", options.IndexPrefix)
if options.Version == "" {
var c5 *ClientV5
if c5, err = clientV5(); err == nil {
if version, err = c5.Version(); err == nil {
es.c = c5
}
}
}
if err != nil {
return nil, err
}
switch strings.Split(version, ".")[0] {
case "5":
if es.c == nil {
es.c, err = clientV5()
}
case "6":
es.c, err = clientV6()
case "7":
es.c, err = clientV7()
default:
err = fmt.Errorf("unsupported elasticsearch version %s", version)
}
if err != nil {
return nil, err
}
return &es, nil
}
func parseToQueryPart(f *events.Filter) interface{} {
if f == nil {
return nil
}
type BoolBody struct {
Filter []map[string]interface{} `json:"filter,omitempty"`
Should []map[string]interface{} `json:"should,omitempty"`
MinimumShouldMatch *int `json:"minimum_should_match,omitempty"`
}
var mini = 1
b := BoolBody{}
queryBody := map[string]interface{}{
"bool": &b,
}
if len(f.InvolvedObjectNamespaceMap) > 0 {
bi := BoolBody{MinimumShouldMatch: &mini}
for k, v := range f.InvolvedObjectNamespaceMap {
bi.Should = append(bi.Should, map[string]interface{}{
"bool": &BoolBody{
Filter: []map[string]interface{}{{
"match_phrase": map[string]string{"involvedObject.namespace.keyword": k},
}, {
"range": map[string]interface{}{
"lastTimestamp": map[string]interface{}{
"gte": v,
},
},
}},
},
})
}
if len(bi.Should) > 0 {
b.Filter = append(b.Filter, map[string]interface{}{"bool": &bi})
}
}
shouldBoolbody := func(mtype, fieldName string, fieldValues []string, fieldValueMutate func(string) string) *BoolBody {
bi := BoolBody{MinimumShouldMatch: &mini}
for _, v := range fieldValues {
if fieldValueMutate != nil {
v = fieldValueMutate(v)
}
bi.Should = append(bi.Should, map[string]interface{}{
mtype: map[string]string{fieldName: v},
})
}
if len(bi.Should) == 0 {
return nil
}
return &bi
}
if len(f.InvolvedObjectNames) > 0 {
if bi := shouldBoolbody("match_phrase", "involvedObject.name.keyword",
f.InvolvedObjectNames, nil); bi != nil {
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
}
}
if len(f.InvolvedObjectNameFuzzy) > 0 {
if bi := shouldBoolbody("match_phrase_prefix", "involvedObject.name",
f.InvolvedObjectNameFuzzy, nil); bi != nil {
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
}
}
if len(f.InvolvedObjectkinds) > 0 {
// involvedObject.kind is single word and here is not field keyword for case ignoring
if bi := shouldBoolbody("match_phrase", "involvedObject.kind",
f.InvolvedObjectkinds, nil); bi != nil {
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
}
}
if len(f.Reasons) > 0 {
// reason is single word and here is not field keyword for case ignoring
if bi := shouldBoolbody("match_phrase", "reason",
f.Reasons, nil); bi != nil {
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
}
}
if len(f.ReasonFuzzy) > 0 {
if bi := shouldBoolbody("wildcard", "reason",
f.ReasonFuzzy, func(s string) string {
return fmt.Sprintf("*" + s + "*")
}); bi != nil {
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
}
}
if len(f.MessageFuzzy) > 0 {
if bi := shouldBoolbody("match_phrase_prefix", "message",
f.MessageFuzzy, nil); bi != nil {
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
}
}
if len(f.Type) > 0 {
// type is single word and here is not field keyword for case ignoring
if bi := shouldBoolbody("match_phrase", "type",
[]string{f.Type}, nil); bi != nil {
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
}
}
if f.StartTime != nil || f.EndTime != nil {
m := make(map[string]*time.Time)
if f.StartTime != nil {
m["gte"] = f.StartTime
}
if f.EndTime != nil {
m["lte"] = f.EndTime
}
b.Filter = append(b.Filter, map[string]interface{}{
"range": map[string]interface{}{"lastTimestamp": m},
})
}
return queryBody
}

View File

@@ -0,0 +1,221 @@
package elasticsearch
import (
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"kubesphere.io/kubesphere/pkg/simple/client/events"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
"time"
)
func MockElasticsearchService(pattern string, fakeCode int, fakeResp string) *httptest.Server {
mux := http.NewServeMux()
mux.HandleFunc(pattern, func(res http.ResponseWriter, req *http.Request) {
res.WriteHeader(fakeCode)
res.Write([]byte(fakeResp))
})
return httptest.NewServer(mux)
}
func TestStatisticsOnResources(t *testing.T) {
var tests = []struct {
description string
filter events.Filter
fakeVersion string
fakeCode int
fakeResp string
expected events.Statistics
expectedError bool
}{{
description: "ES index exists",
filter: events.Filter{},
fakeVersion: "6",
fakeCode: 200,
fakeResp: `
{
"took": 16,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 10000,
"max_score": null,
"hits": [
]
},
"aggregations": {
"resources_count": {
"value": 100
}
}
}
`,
expected: events.Statistics{
Events: 10000,
Resources: 100,
},
expectedError: false,
}, {
description: "ES index not exists",
filter: events.Filter{},
fakeVersion: "6",
fakeCode: 404,
fakeResp: `
{
"error": {
"root_cause": [
{
"type": "index_not_found_exception",
"reason": "no such index [events]",
"resource.type": "index_or_alias",
"resource.id": "events",
"index_uuid": "_na_",
"index": "events"
}
],
"type": "index_not_found_exception",
"reason": "no such index [events]",
"resource.type": "index_or_alias",
"resource.id": "events",
"index_uuid": "_na_",
"index": "events"
},
"status": 404
}
`,
expectedError: true,
}}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
mes := MockElasticsearchService("/", test.fakeCode, test.fakeResp)
defer mes.Close()
es, err := NewClient(&Options{Host: mes.URL, IndexPrefix: "ks-logstash-events", Version: "6"})
if err != nil {
t.Fatal(err)
}
stats, err := es.StatisticsOnResources(&test.filter)
if test.expectedError {
if err == nil {
t.Fatalf("expected err like %s", test.fakeResp)
} else if !strings.Contains(err.Error(), strconv.Itoa(test.fakeCode)) {
t.Fatalf("err does not contain expected code: %d", test.fakeCode)
}
} else {
if err != nil {
t.Fatal(err)
} else if diff := cmp.Diff(stats, &test.expected); diff != "" {
t.Fatalf("%T differ (-got, +want): %s", test.expected, diff)
}
}
})
}
}
func TestParseToQueryPart(t *testing.T) {
q := `
{
"bool": {
"filter": [
{
"bool": {
"should": [
{
"bool": {
"filter": [
{
"match_phrase": {
"involvedObject.namespace.keyword": "kubesphere-system"
}
},
{
"range": {
"lastTimestamp": {
"gte": "2020-01-01T01:01:01.000000001Z"
}
}
}
]
}
}
],
"minimum_should_match": 1
}
},
{
"bool": {
"should": [
{
"match_phrase_prefix": {
"involvedObject.name": "istio"
}
}
],
"minimum_should_match": 1
}
},
{
"bool": {
"should": [
{
"match_phrase": {
"reason": "unhealthy"
}
}
],
"minimum_should_match": 1
}
},
{
"range": {
"lastTimestamp": {
"gte": "2019-12-01T01:01:01.000000001Z"
}
}
}
]
}
}
`
nsCreateTime := time.Date(2020, time.Month(1), 1, 1, 1, 1, 1, time.UTC)
startTime := nsCreateTime.AddDate(0, -1, 0)
filter := &events.Filter{
InvolvedObjectNamespaceMap: map[string]time.Time{
"kubesphere-system": nsCreateTime,
},
InvolvedObjectNameFuzzy: []string{"istio"},
Reasons: []string{"unhealthy"},
StartTime: &startTime,
}
qp := parseToQueryPart(filter)
bs, err := json.Marshal(qp)
if err != nil {
panic(err)
}
queryPart := &map[string]interface{}{}
if err := json.Unmarshal(bs, queryPart); err != nil {
panic(err)
}
expectedQueryPart := &map[string]interface{}{}
if err := json.Unmarshal([]byte(q), expectedQueryPart); err != nil {
panic(err)
}
assert.Equal(t, expectedQueryPart, queryPart)
}

View File

@@ -0,0 +1,46 @@
package elasticsearch
import (
"github.com/spf13/pflag"
"kubesphere.io/kubesphere/pkg/utils/reflectutils"
)
type Options struct {
Host string `json:"host" yaml:"host"`
IndexPrefix string `json:"indexPrefix,omitempty" yaml:"indexPrefix"`
Version string `json:"version" yaml:"version"`
}
func NewElasticSearchOptions() *Options {
return &Options{
Host: "",
IndexPrefix: "ks-logstash-events",
Version: "",
}
}
func (s *Options) ApplyTo(options *Options) {
if s.Host != "" {
reflectutils.Override(options, s)
}
}
func (s *Options) Validate() []error {
errs := []error{}
return errs
}
func (s *Options) AddFlags(fs *pflag.FlagSet, c *Options) {
fs.StringVar(&s.Host, "elasticsearch-host", c.Host, ""+
"Elasticsearch service host. KubeSphere is using elastic as event store, "+
"if this filed left blank, KubeSphere will use kubernetes builtin event API instead, and"+
" the following elastic search options will be ignored.")
fs.StringVar(&s.IndexPrefix, "index-prefix", c.IndexPrefix, ""+
"Index name prefix. KubeSphere will retrieve events against indices matching the prefix.")
fs.StringVar(&s.Version, "elasticsearch-version", c.Version, ""+
"Elasticsearch major version, e.g. 5/6/7, if left blank, will detect automatically."+
"Currently, minimum supported version is 5.x")
}

View File

@@ -0,0 +1,44 @@
package events
import (
v1 "k8s.io/api/core/v1"
"time"
)
type Client interface {
SearchEvents(filter *Filter, from, size int64, sort string) (*Events, error)
CountOverTime(filter *Filter, interval string) (*Histogram, error)
StatisticsOnResources(filter *Filter) (*Statistics, error)
}
type Filter struct {
InvolvedObjectNamespaceMap map[string]time.Time
InvolvedObjectNames []string
InvolvedObjectNameFuzzy []string
InvolvedObjectkinds []string
Reasons []string
ReasonFuzzy []string
MessageFuzzy []string
Type string
StartTime *time.Time
EndTime *time.Time
}
type Events struct {
Total int64 `json:"total" description:"total number of matched results"`
Records []*v1.Event `json:"records" description:"actual array of results"`
}
type Histogram struct {
Total int64 `json:"total" description:"total number of events"`
Buckets []Bucket `json:"buckets" description:"actual array of histogram results"`
}
type Bucket struct {
Time int64 `json:"time" description:"timestamp"`
Count int64 `json:"count" description:"total number of events at intervals"`
}
type Statistics struct {
Resources int64 `json:"resources" description:"total number of resources"`
Events int64 `json:"events" description:"total number of events"`
}