diff --git a/pkg/apiserver/logging/logging.go b/pkg/apiserver/logging/logging.go index fd99e5ad6..7238ce9a5 100644 --- a/pkg/apiserver/logging/logging.go +++ b/pkg/apiserver/logging/logging.go @@ -25,13 +25,14 @@ import ( "kubesphere.io/kubesphere/pkg/models/log" es "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch" fb "kubesphere.io/kubesphere/pkg/simple/client/fluentbit" + "kubesphere.io/kubesphere/pkg/utils/stringutils" "net/http" "strconv" + "strings" ) func LoggingQueryCluster(request *restful.Request, response *restful.Response) { res := logQuery(log.QueryLevelCluster, request) - if res.Status != http.StatusOK { response.WriteHeaderAndEntity(res.Status, errors.New(res.Error)) return @@ -42,7 +43,6 @@ func LoggingQueryCluster(request *restful.Request, response *restful.Response) { func LoggingQueryWorkspace(request *restful.Request, response *restful.Response) { res := logQuery(log.QueryLevelWorkspace, request) - if res.Status != http.StatusOK { response.WriteHeaderAndEntity(res.Status, errors.New(res.Error)) return @@ -53,7 +53,6 @@ func LoggingQueryWorkspace(request *restful.Request, response *restful.Response) func LoggingQueryNamespace(request *restful.Request, response *restful.Response) { res := logQuery(log.QueryLevelNamespace, request) - if res.Status != http.StatusOK { response.WriteHeaderAndEntity(res.Status, errors.New(res.Error)) return @@ -160,79 +159,69 @@ func LoggingDeleteFluentbitOutput(request *restful.Request, response *restful.Re } func logQuery(level log.LogQueryLevel, request *restful.Request) *es.QueryResult { - var param es.QueryParameters - param.Operation = request.QueryParameter("operation") + var param es.QueryParameters switch level { case log.QueryLevelCluster: - { - param.NamespaceFilled, param.Namespaces = log.QueryWorkspace(request.QueryParameter("workspaces"), request.QueryParameter("workspace_query")) - param.NamespaceFilled, param.Namespaces = log.MatchNamespace(request.QueryParameter("namespaces"), param.NamespaceFilled, param.Namespaces) - param.NamespaceFilled, param.NamespaceWithCreationTime = log.GetNamespaceCreationTimeMap(param.Namespaces) - param.NamespaceQuery = request.QueryParameter("namespace_query") - param.PodFilled, param.Pods = log.QueryWorkload(request.QueryParameter("workloads"), request.QueryParameter("workload_query"), param.Namespaces) - param.PodFilled, param.Pods = log.MatchPod(request.QueryParameter("pods"), param.PodFilled, param.Pods) - param.PodQuery = request.QueryParameter("pod_query") - param.ContainerFilled, param.Containers = log.MatchContainer(request.QueryParameter("containers")) - param.ContainerQuery = request.QueryParameter("container_query") - } + var namespaces []string + param.NamespaceNotFound, namespaces = log.MatchNamespace(stringutils.Split(request.QueryParameter("namespaces"), ","), + stringutils.Split(strings.ToLower(request.QueryParameter("namespace_query")), ","), // case-insensitive + stringutils.Split(request.QueryParameter("workspaces"), ","), + stringutils.Split(strings.ToLower(request.QueryParameter("workspace_query")), ",")) // case-insensitive + param.NamespaceWithCreationTime = log.MakeNamespaceCreationTimeMap(namespaces) + param.WorkloadFilter = stringutils.Split(request.QueryParameter("workloads"), ",") + param.WorkloadQuery = stringutils.Split(request.QueryParameter("workload_query"), ",") + param.PodFilter = stringutils.Split(request.QueryParameter("pods"), ",") + param.PodQuery = stringutils.Split(request.QueryParameter("pod_query"), ",") + param.ContainerFilter = stringutils.Split(request.QueryParameter("containers"), ",") + param.ContainerQuery = stringutils.Split(request.QueryParameter("container_query"), ",") case log.QueryLevelWorkspace: - { - param.NamespaceFilled, param.Namespaces = log.QueryWorkspace(request.PathParameter("workspace"), "") - param.NamespaceFilled, param.Namespaces = log.MatchNamespace(request.QueryParameter("namespaces"), param.NamespaceFilled, param.Namespaces) - param.NamespaceFilled, param.NamespaceWithCreationTime = log.GetNamespaceCreationTimeMap(param.Namespaces) - param.NamespaceQuery = request.QueryParameter("namespace_query") - param.PodFilled, param.Pods = log.QueryWorkload(request.QueryParameter("workloads"), request.QueryParameter("workload_query"), param.Namespaces) - param.PodFilled, param.Pods = log.MatchPod(request.QueryParameter("pods"), param.PodFilled, param.Pods) - param.PodQuery = request.QueryParameter("pod_query") - param.ContainerFilled, param.Containers = log.MatchContainer(request.QueryParameter("containers")) - param.ContainerQuery = request.QueryParameter("container_query") - } + var namespaces []string + param.NamespaceNotFound, namespaces = log.MatchNamespace(stringutils.Split(request.QueryParameter("namespaces"), ","), + stringutils.Split(strings.ToLower(request.QueryParameter("namespace_query")), ","), // case-insensitive + stringutils.Split(request.PathParameter("workspace"), ","), nil) // case-insensitive + param.NamespaceWithCreationTime = log.MakeNamespaceCreationTimeMap(namespaces) + param.WorkloadFilter = stringutils.Split(request.QueryParameter("workloads"), ",") + param.WorkloadQuery = stringutils.Split(request.QueryParameter("workload_query"), ",") + param.PodFilter = stringutils.Split(request.QueryParameter("pods"), ",") + param.PodQuery = stringutils.Split(request.QueryParameter("pod_query"), ",") + param.ContainerFilter = stringutils.Split(request.QueryParameter("containers"), ",") + param.ContainerQuery = stringutils.Split(request.QueryParameter("container_query"), ",") case log.QueryLevelNamespace: - { - param.NamespaceFilled, param.Namespaces = log.MatchNamespace(request.PathParameter("namespace"), false, nil) - param.NamespaceFilled, param.NamespaceWithCreationTime = log.GetNamespaceCreationTimeMap(param.Namespaces) - param.PodFilled, param.Pods = log.QueryWorkload(request.QueryParameter("workloads"), request.QueryParameter("workload_query"), param.Namespaces) - param.PodFilled, param.Pods = log.MatchPod(request.QueryParameter("pods"), param.PodFilled, param.Pods) - param.PodQuery = request.QueryParameter("pod_query") - param.ContainerFilled, param.Containers = log.MatchContainer(request.QueryParameter("containers")) - param.ContainerQuery = request.QueryParameter("container_query") - } + namespaces := []string{request.PathParameter("namespace")} + param.NamespaceWithCreationTime = log.MakeNamespaceCreationTimeMap(namespaces) + param.WorkloadFilter = stringutils.Split(request.QueryParameter("workloads"), ",") + param.WorkloadQuery = stringutils.Split(request.QueryParameter("workload_query"), ",") + param.PodFilter = stringutils.Split(request.QueryParameter("pods"), ",") + param.PodQuery = stringutils.Split(request.QueryParameter("pod_query"), ",") + param.ContainerFilter = stringutils.Split(request.QueryParameter("containers"), ",") + param.ContainerQuery = stringutils.Split(request.QueryParameter("container_query"), ",") case log.QueryLevelWorkload: - { - param.NamespaceFilled, param.Namespaces = log.MatchNamespace(request.PathParameter("namespace"), false, nil) - param.NamespaceFilled, param.NamespaceWithCreationTime = log.GetNamespaceCreationTimeMap(param.Namespaces) - param.PodFilled, param.Pods = log.QueryWorkload(request.PathParameter("workload"), "", param.Namespaces) - param.PodFilled, param.Pods = log.MatchPod(request.QueryParameter("pods"), param.PodFilled, param.Pods) - param.PodQuery = request.QueryParameter("pod_query") - param.ContainerFilled, param.Containers = log.MatchContainer(request.QueryParameter("containers")) - param.ContainerQuery = request.QueryParameter("container_query") - } + namespaces := []string{request.PathParameter("namespace")} + param.NamespaceWithCreationTime = log.MakeNamespaceCreationTimeMap(namespaces) + param.WorkloadFilter = []string{request.PathParameter("workload")} + param.PodFilter = stringutils.Split(request.QueryParameter("pods"), ",") + param.PodQuery = stringutils.Split(request.QueryParameter("pod_query"), ",") + param.ContainerFilter = stringutils.Split(request.QueryParameter("containers"), ",") + param.ContainerQuery = stringutils.Split(request.QueryParameter("container_query"), ",") case log.QueryLevelPod: - { - param.NamespaceFilled, param.Namespaces = log.MatchNamespace(request.PathParameter("namespace"), false, nil) - param.NamespaceFilled, param.NamespaceWithCreationTime = log.GetNamespaceCreationTimeMap(param.Namespaces) - param.PodFilled, param.Pods = log.MatchPod(request.PathParameter("pod"), false, nil) - param.ContainerFilled, param.Containers = log.MatchContainer(request.QueryParameter("containers")) - param.ContainerQuery = request.QueryParameter("container_query") - } + namespaces := []string{request.PathParameter("namespace")} + param.NamespaceWithCreationTime = log.MakeNamespaceCreationTimeMap(namespaces) + param.PodFilter = []string{request.PathParameter("pod")} + param.ContainerFilter = stringutils.Split(request.QueryParameter("containers"), ",") + param.ContainerQuery = stringutils.Split(request.QueryParameter("container_query"), ",") case log.QueryLevelContainer: - { - param.NamespaceFilled, param.Namespaces = log.MatchNamespace(request.PathParameter("namespace"), false, nil) - param.NamespaceFilled, param.NamespaceWithCreationTime = log.GetNamespaceCreationTimeMap(param.Namespaces) - param.PodFilled, param.Pods = log.MatchPod(request.PathParameter("pod"), false, nil) - param.ContainerFilled, param.Containers = log.MatchContainer(request.PathParameter("container")) - } + namespaces := []string{request.PathParameter("namespace")} + param.NamespaceWithCreationTime = log.MakeNamespaceCreationTimeMap(namespaces) + param.PodFilter = []string{request.PathParameter("pod")} + param.ContainerFilter = []string{request.PathParameter("container")} } - if len(param.Namespaces) == 1 { - param.Workspace = log.GetWorkspaceOfNamesapce(param.Namespaces[0]) - } + param.LogQuery = stringutils.Split(request.QueryParameter("log_query"), ",") + param.Operation = request.QueryParameter("operation") param.Interval = request.QueryParameter("interval") - - param.LogQuery = request.QueryParameter("log_query") param.StartTime = request.QueryParameter("start_time") param.EndTime = request.QueryParameter("end_time") param.Sort = request.QueryParameter("sort") diff --git a/pkg/models/log/logcollector.go b/pkg/models/log/logcollector.go index 42253683c..dcfcb1952 100644 --- a/pkg/models/log/logcollector.go +++ b/pkg/models/log/logcollector.go @@ -23,320 +23,69 @@ import ( "k8s.io/apimachinery/pkg/labels" "kubesphere.io/kubesphere/pkg/constants" "kubesphere.io/kubesphere/pkg/informers" - "reflect" + "kubesphere.io/kubesphere/pkg/utils/stringutils" "strconv" "strings" "time" ) -func intersection(s1, s2 []string) (inter []string) { - hash := make(map[string]bool) - for _, e := range s1 { - hash[e] = true - } - for _, e := range s2 { - // If elements present in the hashmap then append intersection list. - if hash[e] { - inter = append(inter, e) - } - } - //Remove dups from slice. - inter = removeDups(inter) - return -} - -//Remove dups from slice. -func removeDups(elements []string) (nodups []string) { - encountered := make(map[string]bool) - for _, element := range elements { - if !encountered[element] { - nodups = append(nodups, element) - encountered[element] = true - } - } - return -} - -func in(value interface{}, container interface{}) int { - if container == nil { - return -1 - } - containerValue := reflect.ValueOf(container) - switch reflect.TypeOf(container).Kind() { - case reflect.Slice, reflect.Array: - for i := 0; i < containerValue.Len(); i++ { - if containerValue.Index(i).Interface() == value { - return i - } - } - case reflect.Map: - if containerValue.MapIndex(reflect.ValueOf(value)).IsValid() { - return -1 - } - default: - return -1 - } - return -1 -} - -func getWorkloadName(name string, kind string) string { - if kind == "ReplicaSet" { - lastIndex := strings.LastIndex(name, "-") - if lastIndex >= 0 { - return name[:lastIndex] - } - } - - return name -} - -func matchLabel(label string, labelsMatch []string) bool { - var result = false - - for _, labelMatch := range labelsMatch { - if strings.Compare(label, labelMatch) == 0 { - result = true - break - } - } - - return result -} - -func queryLabel(label string, labelsQuery []string) bool { - var result = false - - for _, labelQuery := range labelsQuery { - if strings.Contains(label, labelQuery) { - result = true - break - } - } - - return result -} - -func QueryWorkspace(workspaceMatch string, workspaceQuery string) (bool, []string) { - if workspaceMatch == "" && workspaceQuery == "" { - return false, nil - } +// list namespaces that match search conditions +func MatchNamespace(nsFilter []string, nsQuery []string, wsFilter []string, wsQuery []string) (bool, []string) { nsLister := informers.SharedInformerFactory().Core().V1().Namespaces().Lister() nsList, err := nsLister.List(labels.Everything()) if err != nil { - glog.Error("failed to list namespace, error: ", err) + glog.Errorf("failed to list namespace, error: %s", err) return true, nil } var namespaces []string - var hasMatch = false - var workspacesMatch []string - if workspaceMatch != "" { - workspacesMatch = strings.Split(strings.Replace(workspaceMatch, ",", " ", -1), " ") - hasMatch = true - } - - var hasQuery = false - var workspacesQuery []string - if workspaceQuery != "" { - workspacesQuery = strings.Split(strings.ToLower(strings.Replace(workspaceQuery, ",", " ", -1)), " ") - hasQuery = true + // if no search condition is set on both namespace and workspace, + // then return all namespaces + if nsQuery == nil && nsFilter == nil && wsQuery == nil && wsFilter == nil { + for _, ns := range nsList { + namespaces = append(namespaces, ns.Name) + } + return false, namespaces } for _, ns := range nsList { - labels := ns.GetLabels() - _, ok := labels[constants.WorkspaceLabelKey] - if ok { - var namespaceCanAppend = true - if hasMatch { - if !matchLabel(labels[constants.WorkspaceLabelKey], workspacesMatch) { - namespaceCanAppend = false - } - } - if hasQuery { - if !queryLabel(strings.ToLower(labels[constants.WorkspaceLabelKey]), workspacesQuery) { - namespaceCanAppend = false - } - } - - if namespaceCanAppend { - namespaces = append(namespaces, ns.GetName()) - } - } - } - - return true, namespaces -} - -func MatchNamespace(namespaceMatch string, namespaceFilled bool, namespaces []string) (bool, []string) { - if namespaceMatch == "" { - return namespaceFilled, namespaces - } - - namespacesMatch := strings.Split(strings.Replace(namespaceMatch, ",", " ", -1), " ") - - if namespaceFilled { - return true, intersection(namespacesMatch, namespaces) - } - - return true, namespacesMatch -} - -func GetNamespaceCreationTimeMap(namespaces []string) (bool, map[string]string) { - - namespaceWithCreationTime := make(map[string]string) - - nsLister := informers.SharedInformerFactory().Core().V1().Namespaces().Lister() - - if len(namespaces) == 0 { - nsList, err := nsLister.List(labels.Everything()) - if err != nil { - glog.Error("failed to list namespace, error: ", err) - return true, namespaceWithCreationTime - } - - for _, ns := range nsList { + if stringutils.StringIn(ns.Name, nsFilter) || + stringutils.StringIn(ns.Annotations[constants.WorkspaceLabelKey], wsFilter) || + containsIn(ns.Name, nsQuery) || + containsIn(ns.Annotations[constants.WorkspaceLabelKey], wsQuery) { namespaces = append(namespaces, ns.Name) } } + // if namespaces is equal to nil, indicates no namespace matched + // it causes the query to return no result + return namespaces == nil, namespaces +} + +func containsIn(str string, subStrs []string) bool { + for _, sub := range subStrs { + if strings.Contains(str, sub) { + return true + } + } + return false +} + +func MakeNamespaceCreationTimeMap(namespaces []string) map[string]string { + + namespaceWithCreationTime := make(map[string]string) + + nsLister := informers.SharedInformerFactory().Core().V1().Namespaces().Lister() for _, item := range namespaces { ns, err := nsLister.Get(item) if err != nil { - glog.Error("failed to get namespace, error: ", err) + // the ns doesn't exist continue } - namespaceWithCreationTime[ns.Name] = strconv.FormatInt(ns.CreationTimestamp.UnixNano()/int64(time.Millisecond), 10) } - return true, namespaceWithCreationTime -} - -func QueryWorkload(workloadMatch string, workloadQuery string, namespaces []string) (bool, []string) { - if workloadMatch == "" && workloadQuery == "" { - return false, nil - } - - podLister := informers.SharedInformerFactory().Core().V1().Pods().Lister() - podList, err := podLister.List(labels.Everything()) - if err != nil { - glog.Error("failed to list pods, error: ", err) - return true, nil - } - - var pods []string - - var hasMatch = false - var workloadsMatch []string - if workloadMatch != "" { - workloadsMatch = strings.Split(strings.Replace(workloadMatch, ",", " ", -1), " ") - hasMatch = true - } - - var hasQuery = false - var workloadsQuery []string - if workloadQuery != "" { - workloadsQuery = strings.Split(strings.ToLower(strings.Replace(workloadQuery, ",", " ", -1)), " ") - hasQuery = true - } - - if namespaces == nil { - for _, pod := range podList { - /*if len(pod.ObjectMeta.OwnerReferences) > 0 { - glog.Infof("List Pod %v:%v:%v", pod.Name, pod.ObjectMeta.OwnerReferences[0].Name, pod.ObjectMeta.OwnerReferences[0].Kind) - }*/ - if len(pod.ObjectMeta.OwnerReferences) > 0 { - var podCanAppend = true - workloadName := getWorkloadName(pod.ObjectMeta.OwnerReferences[0].Name, pod.ObjectMeta.OwnerReferences[0].Kind) - if hasMatch { - if !matchLabel(workloadName, workloadsMatch) { - podCanAppend = false - } - } - if hasQuery { - if !queryLabel(strings.ToLower(workloadName), workloadsQuery) { - podCanAppend = false - } - } - - if podCanAppend { - pods = append(pods, pod.Name) - } - } - } - } else { - for _, pod := range podList { - /*if len(pod.ObjectMeta.OwnerReferences) > 0 { - glog.Infof("List Pod %v:%v:%v", pod.Name, pod.ObjectMeta.OwnerReferences[0].Name, pod.ObjectMeta.OwnerReferences[0].Kind) - }*/ - if len(pod.ObjectMeta.OwnerReferences) > 0 && in(pod.Namespace, namespaces) >= 0 { - var podCanAppend = true - workloadName := getWorkloadName(pod.ObjectMeta.OwnerReferences[0].Name, pod.ObjectMeta.OwnerReferences[0].Kind) - if hasMatch { - if !matchLabel(workloadName, workloadsMatch) { - podCanAppend = false - } - } - if hasQuery { - if !queryLabel(strings.ToLower(workloadName), workloadsQuery) { - podCanAppend = false - } - } - - if podCanAppend { - pods = append(pods, pod.Name) - } - } - } - } - - return true, pods -} - -func MatchPod(podMatch string, podFilled bool, pods []string) (bool, []string) { - if podMatch == "" { - return podFilled, pods - } - - podsMatch := strings.Split(strings.Replace(podMatch, ",", " ", -1), " ") - - if podFilled { - return true, intersection(podsMatch, pods) - } - - return true, podsMatch -} - -func MatchContainer(containerMatch string) (bool, []string) { - if containerMatch == "" { - return false, nil - } - - return true, strings.Split(strings.Replace(containerMatch, ",", " ", -1), " ") -} - -func GetWorkspaceOfNamesapce(namespace string) string { - var workspace string - workspace = "" - - nsLister := informers.SharedInformerFactory().Core().V1().Namespaces().Lister() - nsList, err := nsLister.List(labels.Everything()) - if err != nil { - glog.Error("failed to list namespace, error: ", err) - return workspace - } - - for _, ns := range nsList { - if ns.GetName() == namespace { - labels := ns.GetLabels() - _, ok := labels[constants.WorkspaceLabelKey] - if ok { - workspace = labels[constants.WorkspaceLabelKey] - } - } - } - - return workspace + return namespaceWithCreationTime } diff --git a/pkg/simple/client/elasticsearch/esclient.go b/pkg/simple/client/elasticsearch/esclient.go index 72a5f1d6d..a6d527e18 100644 --- a/pkg/simple/client/elasticsearch/esclient.go +++ b/pkg/simple/client/elasticsearch/esclient.go @@ -25,6 +25,30 @@ import ( "github.com/json-iterator/go" ) +const ( + OperationQuery int = iota + OperationStatistics + OperationHistogram + + matchPhrase = iota + matchPhrasePrefix + regexpQuery + + podNameMaxLength = 63 + // max 10 characters + 1 hyphen + replicaSetSuffixMaxLength = 11 + // a unique random string as suffix, 5 characters + 1 hyphen + randSuffixLength = 6 + + fieldPodName = "kubernetes.pod_name" + fieldContainerName = "kubernetes.container_name" + fieldLog = "log" + + fieldNamespaceNameKeyword = "kubernetes.namespace_name.keyword" + fieldPodNameKeyword = "kubernetes.pod_name.keyword" + fieldContainerNameKeyword = "kubernetes.container_name.keyword" +) + var jsonIter = jsoniter.ConfigCompatibleWithStandardLibrary var ( @@ -34,13 +58,6 @@ var ( client Client ) -type Config struct { - Host string - Port string - Index string - VersionMajor string -} - func (cfg *Config) WriteESConfigs() { mu.Lock() defer mu.Unlock() @@ -55,189 +72,58 @@ func (cfg *Config) WriteESConfigs() { client = NewForConfig(config) } -type Request struct { - From int64 `json:"from"` - Size int64 `json:"size"` - Sorts []Sort `json:"sort,omitempty"` - MainQuery BoolQuery `json:"query"` - Aggs interface{} `json:"aggs,omitempty"` - MainHighLight MainHighLight `json:"highlight,omitempty"` -} - -type Sort struct { - Order Order `json:"time"` -} - -type Order struct { - Order string `json:"order"` -} - -type BoolQuery struct { - BoolMusts BoolMusts `json:"bool"` -} - -type BoolMusts struct { - Musts []interface{} `json:"must"` -} - -type RangeQuery struct { - RangeSpec RangeSpec `json:"range"` -} - -type RangeSpec struct { - TimeRange TimeRange `json:"time"` -} - -type TimeRange struct { - Gte string `json:"gte,omitempty"` - Lte string `json:"lte,omitempty"` -} - -type BoolShouldMatchPhrase struct { - ShouldMatchPhrase ShouldMatchPhrase `json:"bool"` -} - -type ShouldMatchPhrase struct { - Shoulds []interface{} `json:"should"` - MinimumShouldMatch int64 `json:"minimum_should_match"` -} - -type MatchPhrase struct { - MatchPhrase interface{} `json:"match_phrase"` -} - -type Match struct { - Match interface{} `json:"match"` -} - -type QueryWord struct { - Word string `json:"query"` -} - -type MainHighLight struct { - Fields []interface{} `json:"fields,omitempty"` - FragmentSize int `json:"fragment_size"` -} - -type LogHighLightField struct { - FieldContent EmptyField `json:"log"` -} - -type NamespaceHighLightField struct { - FieldContent EmptyField `json:"kubernetes.namespace_name.keyword"` -} - -type PodHighLightField struct { - FieldContent EmptyField `json:"kubernetes.pod_name.keyword"` -} - -type ContainerHighLightField struct { - FieldContent EmptyField `json:"kubernetes.container_name.keyword"` -} - -type EmptyField struct { -} - -// StatisticsAggs, the struct for `aggs` of type Request, holds a cardinality aggregation for distinct container counting -type StatisticsAggs struct { - ContainerAgg ContainerAgg `json:"containers"` -} - -type ContainerAgg struct { - Cardinality AggField `json:"cardinality"` -} - -type AggField struct { - Field string `json:"field"` -} - -type HistogramAggs struct { - HistogramAgg HistogramAgg `json:"histogram"` -} - -type HistogramAgg struct { - DateHistogram DateHistogram `json:"date_histogram"` -} - -type DateHistogram struct { - Field string `json:"field"` - Interval string `json:"interval"` -} - func createQueryRequest(param QueryParameters) (int, []byte, error) { var request Request - var mainBoolQuery BoolMusts + var mainBoolQuery BoolFilter - if param.NamespaceFilled { - var shouldMatchPhrase ShouldMatchPhrase - if len(param.NamespaceWithCreationTime) == 0 { - matchPhrase := MatchPhrase{map[string]interface{}{"kubernetes.namespace_name.key_word": QueryWord{""}}} - shouldMatchPhrase.Shoulds = append(shouldMatchPhrase.Shoulds, matchPhrase) - } else { - for namespace, creationTime := range param.NamespaceWithCreationTime { - var boolQuery BoolQuery + if len(param.NamespaceWithCreationTime) != 0 { + var boolShould BoolShould + for namespace, creationTime := range param.NamespaceWithCreationTime { + var boolFilter BoolFilter - matchPhrase := MatchPhrase{map[string]interface{}{"kubernetes.namespace_name.keyword": QueryWord{namespace}}} - rangeQuery := RangeQuery{RangeSpec{TimeRange{creationTime, ""}}} + matchPhrase := MatchPhrase{MatchPhrase: map[string]string{fieldNamespaceNameKeyword: namespace}} + rangeQuery := RangeQuery{RangeSpec{TimeRange{creationTime, ""}}} - boolQuery.BoolMusts.Musts = append(boolQuery.BoolMusts.Musts, matchPhrase) - boolQuery.BoolMusts.Musts = append(boolQuery.BoolMusts.Musts, rangeQuery) + boolFilter.Filter = append(boolFilter.Filter, matchPhrase) + boolFilter.Filter = append(boolFilter.Filter, rangeQuery) - shouldMatchPhrase.Shoulds = append(shouldMatchPhrase.Shoulds, boolQuery) - } + boolShould.Should = append(boolShould.Should, BoolQuery{Bool: boolFilter}) } - shouldMatchPhrase.MinimumShouldMatch = 1 - mainBoolQuery.Musts = append(mainBoolQuery.Musts, BoolShouldMatchPhrase{shouldMatchPhrase}) + boolShould.MinimumShouldMatch = 1 + mainBoolQuery.Filter = append(mainBoolQuery.Filter, BoolQuery{Bool: boolShould}) } - if param.PodFilled { - var shouldMatchPhrase ShouldMatchPhrase - if len(param.Pods) == 0 { - matchPhrase := MatchPhrase{map[string]interface{}{"kubernetes.pod_name.key_word": QueryWord{""}}} - shouldMatchPhrase.Shoulds = append(shouldMatchPhrase.Shoulds, matchPhrase) - } else { - for _, pod := range param.Pods { - matchPhrase := MatchPhrase{map[string]interface{}{"kubernetes.pod_name.keyword": QueryWord{pod}}} - shouldMatchPhrase.Shoulds = append(shouldMatchPhrase.Shoulds, matchPhrase) - } - } - shouldMatchPhrase.MinimumShouldMatch = 1 - mainBoolQuery.Musts = append(mainBoolQuery.Musts, BoolShouldMatchPhrase{shouldMatchPhrase}) + if param.WorkloadFilter != nil { + boolQuery := makeBoolShould(regexpQuery, fieldPodNameKeyword, param.WorkloadFilter) + mainBoolQuery.Filter = append(mainBoolQuery.Filter, boolQuery) } - if param.ContainerFilled { - var shouldMatchPhrase ShouldMatchPhrase - if len(param.Containers) == 0 { - matchPhrase := MatchPhrase{map[string]interface{}{"kubernetes.container_name.key_word": QueryWord{""}}} - shouldMatchPhrase.Shoulds = append(shouldMatchPhrase.Shoulds, matchPhrase) - } else { - for _, container := range param.Containers { - matchPhrase := MatchPhrase{map[string]interface{}{"kubernetes.container_name.keyword": QueryWord{container}}} - shouldMatchPhrase.Shoulds = append(shouldMatchPhrase.Shoulds, matchPhrase) - } - } - shouldMatchPhrase.MinimumShouldMatch = 1 - mainBoolQuery.Musts = append(mainBoolQuery.Musts, BoolShouldMatchPhrase{shouldMatchPhrase}) + if param.PodFilter != nil { + boolQuery := makeBoolShould(matchPhrase, fieldPodNameKeyword, param.PodFilter) + mainBoolQuery.Filter = append(mainBoolQuery.Filter, boolQuery) + } + if param.ContainerFilter != nil { + boolQuery := makeBoolShould(matchPhrase, fieldContainerNameKeyword, param.ContainerFilter) + mainBoolQuery.Filter = append(mainBoolQuery.Filter, boolQuery) } - if param.NamespaceQuery != "" { - match := Match{map[string]interface{}{"kubernetes.namespace_name": QueryWord{param.NamespaceQuery}}} - mainBoolQuery.Musts = append(mainBoolQuery.Musts, match) + if param.WorkloadQuery != nil { + boolQuery := makeBoolShould(matchPhrasePrefix, fieldPodName, param.WorkloadQuery) + mainBoolQuery.Filter = append(mainBoolQuery.Filter, boolQuery) } - if param.PodQuery != "" { - match := Match{map[string]interface{}{"kubernetes.pod_name": QueryWord{param.PodQuery}}} - mainBoolQuery.Musts = append(mainBoolQuery.Musts, match) + if param.PodQuery != nil { + boolQuery := makeBoolShould(matchPhrasePrefix, fieldPodName, param.PodQuery) + mainBoolQuery.Filter = append(mainBoolQuery.Filter, boolQuery) } - if param.ContainerQuery != "" { - match := Match{map[string]interface{}{"kubernetes.container_name": QueryWord{param.ContainerQuery}}} - mainBoolQuery.Musts = append(mainBoolQuery.Musts, match) + if param.ContainerQuery != nil { + boolQuery := makeBoolShould(matchPhrasePrefix, fieldContainerName, param.ContainerQuery) + mainBoolQuery.Filter = append(mainBoolQuery.Filter, boolQuery) } - - if param.LogQuery != "" { - match := Match{map[string]interface{}{"log": QueryWord{param.LogQuery}}} - mainBoolQuery.Musts = append(mainBoolQuery.Musts, match) + if param.LogQuery != nil { + boolQuery := makeBoolShould(matchPhrasePrefix, fieldLog, param.LogQuery) + mainBoolQuery.Filter = append(mainBoolQuery.Filter, boolQuery) } rangeQuery := RangeQuery{RangeSpec{TimeRange{param.StartTime, param.EndTime}}} - mainBoolQuery.Musts = append(mainBoolQuery.Musts, rangeQuery) + mainBoolQuery.Filter = append(mainBoolQuery.Filter, rangeQuery) var operation int @@ -286,128 +172,57 @@ func createQueryRequest(param QueryParameters) (int, []byte, error) { return operation, queryRequest, err } -// Fore more info, refer to https://www.elastic.co/guide/en/elasticsearch/reference/current/getting-started-search-API.html -// Response from the elasticsearch engine -type Response struct { - Status int `json:"status"` - Workspace string `json:"workspace,omitempty"` - Shards Shards `json:"_shards"` - Hits Hits `json:"hits"` - Aggregations json.RawMessage `json:"aggregations"` +func makeBoolShould(queryType int, field string, list []string) BoolQuery { + var should []interface{} + for _, phrase := range list { + + var q interface{} + + switch queryType { + case matchPhrase: + q = MatchPhrase{MatchPhrase: map[string]string{field: phrase}} + case matchPhrasePrefix: + q = MatchPhrasePrefix{MatchPhrasePrefix: map[string]string{field: phrase}} + case regexpQuery: + q = RegexpQuery{Regexp: map[string]string{field: makePodNameRegexp(phrase)}} + } + + should = append(should, q) + } + + return BoolQuery{ + Bool: BoolShould{ + Should: should, + MinimumShouldMatch: 1, + }, + } } -type Shards struct { - Total int64 `json:"total"` - Successful int64 `json:"successful"` - Skipped int64 `json:"skipped"` - Failed int64 `json:"failed"` +func makePodNameRegexp(workloadName string) string { + var regexp string + if len(workloadName) <= podNameMaxLength-replicaSetSuffixMaxLength-randSuffixLength { + // match deployment pods, eg. -579dfbcddd-24znw + // replicaset rand string is limited to vowels + // https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/rand/rand.go#L83 + regexp += workloadName + "-[bcdfghjklmnpqrstvwxz2456789]{1,10}-[a-z0-9]{5}|" + // match statefulset pods, eg. -0 + regexp += workloadName + "-[0-9]+|" + // match pods of daemonset or job, eg. -29tdk, -5xqvl + regexp += workloadName + "-[a-z0-9]{5}" + } else if len(workloadName) <= podNameMaxLength-randSuffixLength { + replicaSetSuffixLength := podNameMaxLength - randSuffixLength - len(workloadName) + regexp += fmt.Sprintf("%s%d%s", workloadName+"-[bcdfghjklmnpqrstvwxz2456789]{", replicaSetSuffixLength, "}[a-z0-9]{5}|") + regexp += workloadName + "-[0-9]+|" + regexp += workloadName + "-[a-z0-9]{5}" + } else { + // Rand suffix may overwrites the workload name if the name is too long + // This won't happen for StatefulSet because a statefulset pod will fail to create + regexp += workloadName[:podNameMaxLength-randSuffixLength+1] + "[a-z0-9]{5}|" + regexp += workloadName + "-[0-9]+" + } + return regexp } -type Hits struct { - // As of ElasticSearch v7.x, hits.total is changed - Total interface{} `json:"total"` - Hits []Hit `json:"hits"` -} - -type Hit struct { - Source Source `json:"_source"` - HighLight HighLight `json:"highlight"` - Sort []int64 `json:"sort"` -} - -type Source struct { - Log string `json:"log"` - Time string `json:"time"` - Kubernetes Kubernetes `json:"kubernetes"` -} - -type Kubernetes struct { - Namespace string `json:"namespace_name"` - Pod string `json:"pod_name"` - Container string `json:"container_name"` - Host string `json:"host"` -} - -type HighLight struct { - LogHighLights []string `json:"log,omitempty" description:"log messages to highlight"` - NamespaceHighLights []string `json:"kubernetes.namespace_name.keyword,omitempty" description:"namespaces to highlight"` - PodHighLights []string `json:"kubernetes.pod_name.keyword,omitempty" description:"pods to highlight"` - ContainerHighLights []string `json:"kubernetes.container_name.keyword,omitempty" description:"containers to highlight"` -} - -type LogRecord struct { - Time int64 `json:"time,omitempty" description:"log timestamp"` - Log string `json:"log,omitempty" description:"log message"` - Namespace string `json:"namespace,omitempty" description:"namespace"` - Pod string `json:"pod,omitempty" description:"pod name"` - Container string `json:"container,omitempty" description:"container name"` - Host string `json:"host,omitempty" description:"node id"` - HighLight HighLight `json:"highlight,omitempty" description:"highlighted log fragment"` -} - -type ReadResult struct { - Total int64 `json:"total" description:"total number of matched results"` - From int64 `json:"from" description:"the offset from the result set"` - Size int64 `json:"size" description:"the amount of hits to be returned"` - Records []LogRecord `json:"records,omitempty" description:"actual array of results"` -} - -// StatisticsResponseAggregations, the struct for `aggregations` of type Reponse, holds return results from the aggregation StatisticsAggs -type StatisticsResponseAggregations struct { - ContainerCount ContainerCount `json:"containers"` -} - -type ContainerCount struct { - Value int64 `json:"value"` -} - -type HistogramAggregations struct { - HistogramAggregation HistogramAggregation `json:"histogram"` -} - -type HistogramAggregation struct { - Histograms []HistogramStatistics `json:"buckets"` -} - -type HistogramStatistics struct { - Time int64 `json:"key"` - Count int64 `json:"doc_count"` -} - -type HistogramRecord struct { - Time int64 `json:"time" description:"timestamp"` - Count int64 `json:"count" description:"total number of logs at intervals"` -} - -type StatisticsResult struct { - Containers int64 `json:"containers" description:"total number of containers"` - Logs int64 `json:"logs" description:"total number of logs"` -} - -type HistogramResult struct { - Total int64 `json:"total" description:"total number of logs"` - StartTime int64 `json:"start_time" description:"start time"` - EndTime int64 `json:"end_time" description:"end time"` - Interval string `json:"interval" description:"interval"` - Histograms []HistogramRecord `json:"histograms" description:"actual array of histogram results"` -} - -// Wrap elasticsearch response -type QueryResult struct { - Status int `json:"status,omitempty" description:"query status"` - Error string `json:"error,omitempty" description:"debugging information"` - Workspace string `json:"workspace,omitempty" description:"the name of the workspace where logs come from"` - Read *ReadResult `json:"query,omitempty" description:"query results"` - Statistics *StatisticsResult `json:"statistics,omitempty" description:"statistics results"` - Histogram *HistogramResult `json:"histogram,omitempty" description:"histogram results"` -} - -const ( - OperationQuery int = iota - OperationStatistics - OperationHistogram -) - func calcTimestamp(input string) int64 { var t time.Time var err error @@ -513,40 +328,31 @@ func parseQueryResult(operation int, param QueryParameters, body []byte) *QueryR } queryResult.Status = http.StatusOK - queryResult.Workspace = param.Workspace return &queryResult } -type QueryParameters struct { - NamespaceFilled bool - Namespaces []string - NamespaceWithCreationTime map[string]string - PodFilled bool - Pods []string - ContainerFilled bool - Containers []string - - NamespaceQuery string - PodQuery string - ContainerQuery string - - Workspace string - - Operation string - LogQuery string - Interval string - StartTime string - EndTime string - Sort string - From int64 - Size int64 -} - func Query(param QueryParameters) *QueryResult { var queryResult = new(QueryResult) + if param.NamespaceNotFound { + queryResult = new(QueryResult) + queryResult.Status = http.StatusOK + switch param.Operation { + case "statistics": + queryResult.Statistics = new(StatisticsResult) + case "histogram": + queryResult.Histogram = &HistogramResult{ + StartTime: calcTimestamp(param.StartTime), + EndTime: calcTimestamp(param.EndTime), + Interval: param.Interval} + default: + queryResult.Read = new(ReadResult) + } + return queryResult + } + if client == nil { queryResult.Status = http.StatusBadRequest queryResult.Error = fmt.Sprintf("Invalid elasticsearch address: host=%s, port=%s", config.Host, config.Port) diff --git a/pkg/simple/client/elasticsearch/types.go b/pkg/simple/client/elasticsearch/types.go new file mode 100644 index 000000000..0cb5958e8 --- /dev/null +++ b/pkg/simple/client/elasticsearch/types.go @@ -0,0 +1,261 @@ +package esclient + +import ( + "encoding/json" +) + +// elasticsearch client config +type Config struct { + Host string + Port string + Index string + VersionMajor string +} + +type QueryParameters struct { + // when true, indicates the provided `namespaces` or `namespace_query` doesn't match any namespace + NamespaceNotFound bool + // a map of namespace with creation time + NamespaceWithCreationTime map[string]string + + // filter for literally matching + // query for fuzzy matching + WorkloadFilter []string + WorkloadQuery []string + PodFilter []string + PodQuery []string + ContainerFilter []string + ContainerQuery []string + LogQuery []string + + Operation string + Interval string + StartTime string + EndTime string + Sort string + From int64 + Size int64 +} + +// elasticsearch request body +type Request struct { + From int64 `json:"from"` + Size int64 `json:"size"` + Sorts []Sort `json:"sort,omitempty"` + MainQuery BoolQuery `json:"query"` + Aggs interface{} `json:"aggs,omitempty"` + MainHighLight MainHighLight `json:"highlight,omitempty"` +} + +type Sort struct { + Order Order `json:"time"` +} + +type Order struct { + Order string `json:"order"` +} + +type BoolQuery struct { + Bool interface{} `json:"bool"` +} + +// user filter instead of must +// filter ignores scoring +type BoolFilter struct { + Filter []interface{} `json:"filter"` +} + +type BoolShould struct { + Should []interface{} `json:"should"` + MinimumShouldMatch int64 `json:"minimum_should_match"` +} + +type RangeQuery struct { + RangeSpec RangeSpec `json:"range"` +} + +type RangeSpec struct { + TimeRange TimeRange `json:"time"` +} + +type TimeRange struct { + Gte string `json:"gte,omitempty"` + Lte string `json:"lte,omitempty"` +} + +type MatchPhrase struct { + MatchPhrase map[string]string `json:"match_phrase"` +} + +type MatchPhrasePrefix struct { + MatchPhrasePrefix interface{} `json:"match_phrase_prefix"` +} + +type RegexpQuery struct { + Regexp interface{} `json:"regexp"` +} + +type MainHighLight struct { + Fields []interface{} `json:"fields,omitempty"` + FragmentSize int `json:"fragment_size"` +} + +type LogHighLightField struct { + FieldContent EmptyField `json:"log"` +} + +type NamespaceHighLightField struct { + FieldContent EmptyField `json:"kubernetes.namespace_name.keyword"` +} + +type PodHighLightField struct { + FieldContent EmptyField `json:"kubernetes.pod_name.keyword"` +} + +type ContainerHighLightField struct { + FieldContent EmptyField `json:"kubernetes.container_name.keyword"` +} + +type EmptyField struct { +} + +// StatisticsAggs, the struct for `aggs` of type Request, holds a cardinality aggregation for distinct container counting +type StatisticsAggs struct { + ContainerAgg ContainerAgg `json:"containers"` +} + +type ContainerAgg struct { + Cardinality AggField `json:"cardinality"` +} + +type AggField struct { + Field string `json:"field"` +} + +type HistogramAggs struct { + HistogramAgg HistogramAgg `json:"histogram"` +} + +type HistogramAgg struct { + DateHistogram DateHistogram `json:"date_histogram"` +} + +type DateHistogram struct { + Field string `json:"field"` + Interval string `json:"interval"` +} + +// Fore more info, refer to https://www.elastic.co/guide/en/elasticsearch/reference/current/getting-started-search-API.html +// Response body from the elasticsearch engine +type Response struct { + Status int `json:"status"` + Workspace string `json:"workspace,omitempty"` + Shards Shards `json:"_shards"` + Hits Hits `json:"hits"` + Aggregations json.RawMessage `json:"aggregations"` +} + +type Shards struct { + Total int64 `json:"total"` + Successful int64 `json:"successful"` + Skipped int64 `json:"skipped"` + Failed int64 `json:"failed"` +} + +type Hits struct { + // As of ElasticSearch v7.x, hits.total is changed + Total interface{} `json:"total"` + Hits []Hit `json:"hits"` +} + +type Hit struct { + Source Source `json:"_source"` + HighLight HighLight `json:"highlight"` + Sort []int64 `json:"sort"` +} + +type Source struct { + Log string `json:"log"` + Time string `json:"time"` + Kubernetes Kubernetes `json:"kubernetes"` +} + +type Kubernetes struct { + Namespace string `json:"namespace_name"` + Pod string `json:"pod_name"` + Container string `json:"container_name"` + Host string `json:"host"` +} + +type HighLight struct { + LogHighLights []string `json:"log,omitempty" description:"log messages to highlight"` + NamespaceHighLights []string `json:"kubernetes.namespace_name.keyword,omitempty" description:"namespaces to highlight"` + PodHighLights []string `json:"kubernetes.pod_name.keyword,omitempty" description:"pods to highlight"` + ContainerHighLights []string `json:"kubernetes.container_name.keyword,omitempty" description:"containers to highlight"` +} + +type LogRecord struct { + Time int64 `json:"time,omitempty" description:"log timestamp"` + Log string `json:"log,omitempty" description:"log message"` + Namespace string `json:"namespace,omitempty" description:"namespace"` + Pod string `json:"pod,omitempty" description:"pod name"` + Container string `json:"container,omitempty" description:"container name"` + Host string `json:"host,omitempty" description:"node id"` + HighLight HighLight `json:"highlight,omitempty" description:"highlighted log fragment"` +} + +type ReadResult struct { + Total int64 `json:"total" description:"total number of matched results"` + From int64 `json:"from" description:"the offset from the result set"` + Size int64 `json:"size" description:"the amount of hits to be returned"` + Records []LogRecord `json:"records,omitempty" description:"actual array of results"` +} + +// StatisticsResponseAggregations, the struct for `aggregations` of type Reponse, holds return results from the aggregation StatisticsAggs +type StatisticsResponseAggregations struct { + ContainerCount ContainerCount `json:"containers"` +} + +type ContainerCount struct { + Value int64 `json:"value"` +} + +type HistogramAggregations struct { + HistogramAggregation HistogramAggregation `json:"histogram"` +} + +type HistogramAggregation struct { + Histograms []HistogramStatistics `json:"buckets"` +} + +type HistogramStatistics struct { + Time int64 `json:"key"` + Count int64 `json:"doc_count"` +} + +type HistogramRecord struct { + Time int64 `json:"time" description:"timestamp"` + Count int64 `json:"count" description:"total number of logs at intervals"` +} + +type StatisticsResult struct { + Containers int64 `json:"containers" description:"total number of containers"` + Logs int64 `json:"logs" description:"total number of logs"` +} + +type HistogramResult struct { + Total int64 `json:"total" description:"total number of logs"` + StartTime int64 `json:"start_time" description:"start time"` + EndTime int64 `json:"end_time" description:"end time"` + Interval string `json:"interval" description:"interval"` + Histograms []HistogramRecord `json:"histograms" description:"actual array of histogram results"` +} + +// Wrap elasticsearch response +type QueryResult struct { + Status int `json:"status,omitempty" description:"query status"` + Error string `json:"error,omitempty" description:"debugging information"` + Read *ReadResult `json:"query,omitempty" description:"query results"` + Statistics *StatisticsResult `json:"statistics,omitempty" description:"statistics results"` + Histogram *HistogramResult `json:"histogram,omitempty" description:"histogram results"` +} diff --git a/pkg/utils/stringutils/string.go b/pkg/utils/stringutils/string.go index ff391b70a..ef68b33bd 100644 --- a/pkg/utils/stringutils/string.go +++ b/pkg/utils/stringutils/string.go @@ -14,6 +14,7 @@ limitations under the License. package stringutils import ( + "strings" "unicode/utf8" "github.com/asaskevich/govalidator" @@ -75,3 +76,10 @@ func Reverse(s string) string { } return string(buf) } + +func Split(str string, sep string) []string { + if str == "" { + return nil + } + return strings.Split(str, sep) +}