add a es client for auditing, events, and logging

Signed-off-by: wanjunlei <wanjunlei@yunify.com>
This commit is contained in:
wanjunlei
2020-12-16 17:33:10 +08:00
parent 8a6ce2d7ac
commit 039507c9ae
42 changed files with 1889 additions and 2291 deletions

View File

@@ -18,288 +18,153 @@ package elasticsearch
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/json-iterator/go"
"io"
"kubesphere.io/kubesphere/pkg/simple/client/es"
"kubesphere.io/kubesphere/pkg/simple/client/es/query"
"kubesphere.io/kubesphere/pkg/simple/client/logging"
"kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v5"
"kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v6"
"kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch/versions/v7"
"kubesphere.io/kubesphere/pkg/utils/esutil"
"time"
"kubesphere.io/kubesphere/pkg/utils/stringutils"
"strings"
"sync"
)
const (
ElasticV5 = "5"
ElasticV6 = "6"
ElasticV7 = "7"
podNameMaxLength = 63
podNameSuffixLength = 6 // 5 characters + 1 hyphen
replicaSetSuffixMaxLength = 11 // max 10 characters + 1 hyphen
)
type Source struct {
Log string `json:"log"`
Time string `json:"time"`
Kubernetes `json:"kubernetes"`
}
type Kubernetes struct {
Namespace string `json:"namespace_name"`
Pod string `json:"pod_name"`
Container string `json:"container_name"`
Host string `json:"host"`
}
// Elasticsearch implement logging interface
type Elasticsearch struct {
host string
version string
index string
c client
mux sync.Mutex
type client struct {
c *es.Client
}
// versioned es client interface
type client interface {
Search(indices string, body []byte, scroll bool) ([]byte, error)
Scroll(id string) ([]byte, error)
ClearScroll(id string)
GetTotalHitCount(v interface{}) int64
}
func NewClient(options *logging.Options) (logging.Client, error) {
c := &client{}
func NewElasticsearch(options *Options) (*Elasticsearch, error) {
var err error
es := &Elasticsearch{
host: options.Host,
version: options.Version,
index: options.IndexPrefix,
}
switch es.version {
case ElasticV5:
es.c, err = v5.New(es.host, es.index)
case ElasticV6:
es.c, err = v6.New(es.host, es.index)
case ElasticV7:
es.c, err = v7.New(es.host, es.index)
case "":
es.c = nil
default:
return nil, fmt.Errorf("unsupported elasticsearch version %s", es.version)
}
return es, err
c.c, err = es.NewClient(options.Host, options.IndexPrefix, options.Version)
return c, err
}
func (es *Elasticsearch) loadClient() error {
// Check if Elasticsearch client has been initialized.
if es.c != nil {
return nil
}
// Create Elasticsearch client.
es.mux.Lock()
defer es.mux.Unlock()
if es.c != nil {
return nil
}
// Detect Elasticsearch server version using Info API.
// Info API is backward compatible across v5, v6 and v7.
esv6, err := v6.New(es.host, "")
if err != nil {
return err
}
res, err := esv6.Client.Info(
esv6.Client.Info.WithContext(context.Background()),
)
if err != nil {
return err
}
defer res.Body.Close()
var b map[string]interface{}
if err = jsoniter.NewDecoder(res.Body).Decode(&b); err != nil {
return err
}
if res.IsError() {
// Print the response status and error information.
e, _ := b["error"].(map[string]interface{})
return fmt.Errorf("[%s] type: %v, reason: %v", res.Status(), e["type"], e["reason"])
}
// get the major version
version, _ := b["version"].(map[string]interface{})
number, _ := version["number"].(string)
if number == "" {
return fmt.Errorf("failed to detect elastic version number")
}
var c client
v := strings.Split(number, ".")[0]
switch v {
case ElasticV5:
c, err = v5.New(es.host, es.index)
case ElasticV6:
c, err = v6.New(es.host, es.index)
case ElasticV7:
c, err = v7.New(es.host, es.index)
default:
err = fmt.Errorf("unsupported elasticsearch version %s", version)
}
if err != nil {
return err
}
es.c = c
es.version = v
return nil
}
func (es *Elasticsearch) GetCurrentStats(sf logging.SearchFilter) (logging.Statistics, error) {
func (c *client) GetCurrentStats(sf logging.SearchFilter) (logging.Statistics, error) {
var err error
err = es.loadClient()
if err != nil {
return logging.Statistics{}, err
}
b := query.NewBuilder().
WithQuery(parseToQueryPart(sf)).
WithAggregations(query.NewAggregations().
WithCardinalityAggregation("kubernetes.docker_id.keyword")).
WithSize(0)
body, err := newBodyBuilder().
mainBool(sf).
cardinalityAggregation().
bytes()
if err != nil {
return logging.Statistics{}, err
}
b, err := es.c.Search(esutil.ResolveIndexNames(es.index, sf.Starttime, sf.Endtime), body, true)
if err != nil {
return logging.Statistics{}, err
}
res, err := parseResponse(b)
resp, err := c.c.Search(b, sf.Starttime, sf.Endtime, false)
if err != nil {
return logging.Statistics{}, err
}
return logging.Statistics{
Containers: res.Value,
Logs: es.c.GetTotalHitCount(res.Total),
Containers: resp.Value,
Logs: c.c.GetTotalHitCount(resp.Total),
},
nil
}
func (es *Elasticsearch) CountLogsByInterval(sf logging.SearchFilter, interval string) (logging.Histogram, error) {
var err error
func (c *client) CountLogsByInterval(sf logging.SearchFilter, interval string) (logging.Histogram, error) {
err = es.loadClient()
b := query.NewBuilder().
WithQuery(parseToQueryPart(sf)).
WithAggregations(query.NewAggregations().
WithDateHistogramAggregation("time", interval)).
WithSize(0)
resp, err := c.c.Search(b, sf.Starttime, sf.Endtime, false)
if err != nil {
return logging.Histogram{}, err
}
body, err := newBodyBuilder().
mainBool(sf).
dateHistogramAggregation(interval).
bytes()
if err != nil {
return logging.Histogram{}, err
h := logging.Histogram{
Total: c.c.GetTotalHitCount(resp.Total),
}
b, err := es.c.Search(esutil.ResolveIndexNames(es.index, sf.Starttime, sf.Endtime), body, false)
if err != nil {
return logging.Histogram{}, err
}
res, err := parseResponse(b)
if err != nil {
return logging.Histogram{}, err
}
var h logging.Histogram
h.Total = es.c.GetTotalHitCount(res.Total)
for _, b := range res.Buckets {
for _, bucket := range resp.Buckets {
h.Buckets = append(h.Buckets, logging.Bucket{
Time: b.Time,
Count: b.Count,
Time: bucket.Key,
Count: bucket.Count,
})
}
return h, nil
}
func (es *Elasticsearch) SearchLogs(sf logging.SearchFilter, f, s int64, o string) (logging.Logs, error) {
var err error
func (c *client) SearchLogs(sf logging.SearchFilter, f, s int64, o string) (logging.Logs, error) {
err = es.loadClient()
b := query.NewBuilder().
WithQuery(parseToQueryPart(sf)).
WithSort("time", o).
WithFrom(f).
WithSize(s)
resp, err := c.c.Search(b, sf.Starttime, sf.Endtime, false)
if err != nil {
return logging.Logs{}, err
}
body, err := newBodyBuilder().
mainBool(sf).
from(f).
size(s).
sort(o).
bytes()
if err != nil {
return logging.Logs{}, err
l := logging.Logs{
Total: c.c.GetTotalHitCount(resp.Total),
}
b, err := es.c.Search(esutil.ResolveIndexNames(es.index, sf.Starttime, sf.Endtime), body, false)
if err != nil {
return logging.Logs{}, err
}
res, err := parseResponse(b)
if err != nil {
return logging.Logs{}, err
}
var l logging.Logs
l.Total = es.c.GetTotalHitCount(res.Total)
for _, hit := range res.AllHits {
for _, hit := range resp.AllHits {
s := c.getSource(hit.Source)
l.Records = append(l.Records, logging.Record{
Log: hit.Log,
Time: hit.Time,
Namespace: hit.Namespace,
Pod: hit.Pod,
Container: hit.Container,
Log: s.Log,
Time: s.Time,
Namespace: s.Namespace,
Pod: s.Pod,
Container: s.Container,
})
}
return l, nil
}
func (es *Elasticsearch) ExportLogs(sf logging.SearchFilter, w io.Writer) error {
var err error
func (c *client) ExportLogs(sf logging.SearchFilter, w io.Writer) error {
var id string
var data []string
err = es.loadClient()
b := query.NewBuilder().
WithQuery(parseToQueryPart(sf)).
WithSort("time", "desc").
WithFrom(0).
WithSize(1000)
resp, err := c.c.Search(b, sf.Starttime, sf.Endtime, true)
if err != nil {
return err
}
// Initial Search
body, err := newBodyBuilder().
mainBool(sf).
from(0).
size(1000).
sort("desc").
bytes()
if err != nil {
return err
}
defer c.c.ClearScroll(id)
b, err := es.c.Search(esutil.ResolveIndexNames(es.index, sf.Starttime, sf.Endtime), body, true)
defer es.ClearScroll(id)
if err != nil {
return err
}
res, err := parseResponse(b)
if err != nil {
return err
}
id = res.ScrollId
for _, hit := range res.AllHits {
data = append(data, hit.Log)
id = resp.ScrollId
for _, hit := range resp.AllHits {
data = append(data, c.getSource(hit.Source).Log)
}
// limit to retrieve max 100k records
for i := 0; i < 100; i++ {
if i != 0 {
data, id, err = es.scroll(id)
data, id, err = c.scroll(id)
if err != nil {
return err
}
@@ -320,26 +185,122 @@ func (es *Elasticsearch) ExportLogs(sf logging.SearchFilter, w io.Writer) error
return nil
}
func (es *Elasticsearch) scroll(id string) ([]string, string, error) {
b, err := es.c.Scroll(id)
if err != nil {
return nil, id, err
}
res, err := parseResponse(b)
func (c *client) scroll(id string) ([]string, string, error) {
resp, err := c.c.Scroll(id)
if err != nil {
return nil, id, err
}
var data []string
for _, hit := range res.AllHits {
data = append(data, hit.Log)
for _, hit := range resp.AllHits {
data = append(data, c.getSource(hit.Source).Log)
}
return data, res.ScrollId, nil
return data, resp.ScrollId, nil
}
func (es *Elasticsearch) ClearScroll(id string) {
if id != "" {
es.c.ClearScroll(id)
func (c *client) getSource(val interface{}) Source {
s := Source{}
bs, err := json.Marshal(val)
if err != nil {
return s
}
err = json.Unmarshal(bs, &s)
if err != nil {
return s
}
return s
}
func parseToQueryPart(sf logging.SearchFilter) *query.Query {
var mini int32 = 1
b := query.NewBool()
bi := query.NewBool().WithMinimumShouldMatch(mini)
for ns, t := range sf.NamespaceFilter {
ct := time.Time{}
if t != nil {
ct = *t
}
bi.AppendShould(query.NewBool().
AppendFilter(query.NewMatchPhrase("kubernetes.namespace_name.keyword", ns)).
AppendFilter(query.NewRange("time").WithGTE(ct)))
}
b.AppendFilter(bi)
if sf.WorkloadFilter != nil {
bi := query.NewBool().WithMinimumShouldMatch(mini)
for _, wk := range sf.WorkloadFilter {
bi.AppendShould(query.NewRegexp("kubernetes.pod_name.keyword", podNameRegexp(wk)))
}
b.AppendFilter(bi)
}
b.AppendFilter(query.NewBool().
AppendMultiShould(query.NewMultiMatchPhrase("kubernetes.pod_name.keyword", sf.PodFilter)).
WithMinimumShouldMatch(mini))
b.AppendFilter(query.NewBool().
AppendMultiShould(query.NewMultiMatchPhrase("kubernetes.container_name.keyword", sf.ContainerFilter)).
WithMinimumShouldMatch(mini))
// fuzzy matching
b.AppendFilter(query.NewBool().
AppendMultiShould(query.NewMultiMatchPhrasePrefix("kubernetes.pod_name", sf.WorkloadSearch)).
WithMinimumShouldMatch(mini))
b.AppendFilter(query.NewBool().
AppendMultiShould(query.NewMultiMatchPhrasePrefix("kubernetes.pod_name", sf.PodSearch)).
WithMinimumShouldMatch(mini))
b.AppendFilter(query.NewBool().
AppendMultiShould(query.NewMultiMatchPhrasePrefix("kubernetes.container_name", sf.ContainerSearch)).
WithMinimumShouldMatch(mini))
b.AppendFilter(query.NewBool().
AppendMultiShould(query.NewMultiMatchPhrasePrefix("log", sf.LogSearch)).
WithMinimumShouldMatch(mini))
r := query.NewRange("time")
if !sf.Starttime.IsZero() {
r.WithGTE(sf.Starttime)
}
if !sf.Endtime.IsZero() {
r.WithLTE(sf.Endtime)
}
b.AppendFilter(r)
return query.NewQuery().WithBool(b)
}
func podNameRegexp(workloadName string) string {
var regexp string
if len(workloadName) <= podNameMaxLength-replicaSetSuffixMaxLength-podNameSuffixLength {
// match deployment pods, eg. <deploy>-579dfbcddd-24znw
// replicaset rand string is limited to vowels
// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/rand/rand.go#L83
regexp += workloadName + "-[bcdfghjklmnpqrstvwxz2456789]{1,10}-[a-z0-9]{5}|"
// match statefulset pods, eg. <sts>-0
regexp += workloadName + "-[0-9]+|"
// match pods of daemonset or job, eg. <ds>-29tdk, <job>-5xqvl
regexp += workloadName + "-[a-z0-9]{5}"
} else if len(workloadName) <= podNameMaxLength-podNameSuffixLength {
replicaSetSuffixLength := podNameMaxLength - podNameSuffixLength - len(workloadName)
regexp += fmt.Sprintf("%s%d%s", workloadName+"-[bcdfghjklmnpqrstvwxz2456789]{", replicaSetSuffixLength, "}[a-z0-9]{5}|")
regexp += workloadName + "-[0-9]+|"
regexp += workloadName + "-[a-z0-9]{5}"
} else {
// Rand suffix may overwrites the workload name if the name is too long
// This won't happen for StatefulSet because long name will cause ReplicaSet fails during StatefulSet creation.
regexp += workloadName[:podNameMaxLength-podNameSuffixLength+1] + "[a-z0-9]{5}|"
regexp += workloadName + "-[0-9]+"
}
return regexp
}