473 lines
15 KiB
Go
473 lines
15 KiB
Go
/*
|
|
Copyright 2018 The KubeSphere Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package log
|
|
|
|
import (
|
|
"github.com/jinzhu/gorm"
|
|
"github.com/json-iterator/go"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/client-go/rest"
|
|
es "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch"
|
|
fb "kubesphere.io/kubesphere/pkg/simple/client/fluentbit"
|
|
"kubesphere.io/kubesphere/pkg/simple/client/mysql"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
_ "github.com/go-sql-driver/mysql"
|
|
)
|
|
|
|
var jsonIter = jsoniter.ConfigCompatibleWithStandardLibrary
|
|
|
|
func createCRDClientSet() (*rest.RESTClient, *runtime.Scheme, error) {
|
|
config, err := fb.GetClientConfig("")
|
|
if err != nil {
|
|
//panic(err.Error())
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Create a new clientset which include our CRD schema
|
|
return fb.NewFluentbitCRDClient(config)
|
|
}
|
|
|
|
func getParameterValue(parameters []fb.Parameter, name string) string {
|
|
var value string
|
|
|
|
value = ""
|
|
for _, parameter := range parameters {
|
|
if parameter.Name == name {
|
|
value = parameter.Value
|
|
}
|
|
}
|
|
|
|
return value
|
|
}
|
|
|
|
func getFilters(result *FluentbitFiltersResult, Filters []fb.Plugin) {
|
|
for _, filter := range Filters {
|
|
if strings.Compare(filter.Name, "fluentbit-filter-input-regex") == 0 {
|
|
parameters := strings.Split(getParameterValue(filter.Parameters, "Regex"), " ")
|
|
field := strings.TrimSuffix(strings.TrimPrefix(parameters[0], "kubernetes_"), "_name")
|
|
expression := parameters[1]
|
|
result.Filters = append(result.Filters, FluentbitFilter{"Regex", field, expression})
|
|
}
|
|
if strings.Compare(filter.Name, "fluentbit-filter-input-exclude") == 0 {
|
|
parameters := strings.Split(getParameterValue(filter.Parameters, "Exclude"), " ")
|
|
field := strings.TrimSuffix(strings.TrimPrefix(parameters[0], "kubernetes_"), "_name")
|
|
expression := parameters[1]
|
|
result.Filters = append(result.Filters, FluentbitFilter{"Exclude", field, expression})
|
|
}
|
|
}
|
|
}
|
|
|
|
func FluentbitFiltersQuery() *FluentbitFiltersResult {
|
|
var result FluentbitFiltersResult
|
|
|
|
crdcs, scheme, err := createCRDClientSet()
|
|
if err != nil {
|
|
result.Status = http.StatusInternalServerError
|
|
return &result
|
|
}
|
|
|
|
// Create a CRD client interface
|
|
crdclient := fb.CrdClient(crdcs, scheme, "kubesphere-logging-system")
|
|
|
|
item, err := crdclient.Get("fluent-bit")
|
|
if err != nil {
|
|
result.Status = http.StatusInternalServerError
|
|
return &result
|
|
}
|
|
|
|
getFilters(&result, item.Spec.Filter)
|
|
|
|
result.Status = http.StatusOK
|
|
|
|
return &result
|
|
}
|
|
|
|
func FluentbitFiltersUpdate(filters *[]FluentbitFilter) *FluentbitFiltersResult {
|
|
var result FluentbitFiltersResult
|
|
|
|
//Generate filter plugin config
|
|
var filter []fb.Plugin
|
|
|
|
var para_kubernetes []fb.Parameter
|
|
para_kubernetes = append(para_kubernetes, fb.Parameter{Name: "Name", Value: "kubernetes"})
|
|
para_kubernetes = append(para_kubernetes, fb.Parameter{Name: "Match", Value: "kube.*"})
|
|
para_kubernetes = append(para_kubernetes, fb.Parameter{Name: "Kube_URL", Value: "https://kubernetes.default.svc:443"})
|
|
para_kubernetes = append(para_kubernetes, fb.Parameter{Name: "Kube_CA_File", Value: "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"})
|
|
para_kubernetes = append(para_kubernetes, fb.Parameter{Name: "Kube_Token_File", Value: "/var/run/secrets/kubernetes.io/serviceaccount/token"})
|
|
filter = append(filter, fb.Plugin{Type: "fluentbit_filter", Name: "fluentbit-filter-kubernetes", Parameters: para_kubernetes})
|
|
|
|
var para_lift []fb.Parameter
|
|
para_lift = append(para_lift, fb.Parameter{Name: "Name", Value: "nest"})
|
|
para_lift = append(para_lift, fb.Parameter{Name: "Match", Value: "kube.*"})
|
|
para_lift = append(para_lift, fb.Parameter{Name: "Operation", Value: "lift"})
|
|
para_lift = append(para_lift, fb.Parameter{Name: "Nested_under", Value: "kubernetes"})
|
|
para_lift = append(para_lift, fb.Parameter{Name: "Prefix_with", Value: "kubernetes_"})
|
|
filter = append(filter, fb.Plugin{Type: "fluentbit_filter", Name: "fluentbit-filter-input-lift", Parameters: para_lift})
|
|
|
|
var para_remove_stream []fb.Parameter
|
|
para_remove_stream = append(para_remove_stream, fb.Parameter{Name: "Name", Value: "modify"})
|
|
para_remove_stream = append(para_remove_stream, fb.Parameter{Name: "Match", Value: "kube.*"})
|
|
para_remove_stream = append(para_remove_stream, fb.Parameter{Name: "Remove", Value: "stream"})
|
|
filter = append(filter, fb.Plugin{Type: "fluentbit_filter", Name: "fluentbit-filter-input-remove-stream", Parameters: para_remove_stream})
|
|
|
|
var para_remove_labels []fb.Parameter
|
|
para_remove_labels = append(para_remove_labels, fb.Parameter{Name: "Name", Value: "modify"})
|
|
para_remove_labels = append(para_remove_labels, fb.Parameter{Name: "Match", Value: "kube.*"})
|
|
para_remove_labels = append(para_remove_labels, fb.Parameter{Name: "Remove", Value: "kubernetes_labels"})
|
|
filter = append(filter, fb.Plugin{Type: "fluentbit_filter", Name: "fluentbit-filter-input-remove-labels", Parameters: para_remove_labels})
|
|
|
|
var para_remove_annotations []fb.Parameter
|
|
para_remove_annotations = append(para_remove_annotations, fb.Parameter{Name: "Name", Value: "modify"})
|
|
para_remove_annotations = append(para_remove_annotations, fb.Parameter{Name: "Match", Value: "kube.*"})
|
|
para_remove_annotations = append(para_remove_annotations, fb.Parameter{Name: "Remove", Value: "kubernetes_annotations"})
|
|
filter = append(filter, fb.Plugin{Type: "fluentbit_filter", Name: "fluentbit-filter-input-remove-annotations", Parameters: para_remove_annotations})
|
|
|
|
var para_remove_pod_id []fb.Parameter
|
|
para_remove_pod_id = append(para_remove_pod_id, fb.Parameter{Name: "Name", Value: "modify"})
|
|
para_remove_pod_id = append(para_remove_pod_id, fb.Parameter{Name: "Match", Value: "kube.*"})
|
|
para_remove_pod_id = append(para_remove_pod_id, fb.Parameter{Name: "Remove", Value: "kubernetes_pod_id"})
|
|
filter = append(filter, fb.Plugin{Type: "fluentbit_filter", Name: "fluentbit-filter-input-remove-podid", Parameters: para_remove_pod_id})
|
|
|
|
var para_remove_docker_id []fb.Parameter
|
|
para_remove_docker_id = append(para_remove_docker_id, fb.Parameter{Name: "Name", Value: "modify"})
|
|
para_remove_docker_id = append(para_remove_docker_id, fb.Parameter{Name: "Match", Value: "kube.*"})
|
|
para_remove_docker_id = append(para_remove_docker_id, fb.Parameter{Name: "Remove", Value: "kubernetes_docker_id"})
|
|
filter = append(filter, fb.Plugin{Type: "fluentbit_filter", Name: "fluentbit-filter-input-remove-dockerid", Parameters: para_remove_docker_id})
|
|
|
|
if len(*filters) > 0 {
|
|
for _, item := range *filters {
|
|
if strings.Compare(item.Type, "Regex") == 0 {
|
|
field := "kubernetes_" + strings.TrimSpace(item.Field) + "_name"
|
|
expression := strings.TrimSpace(item.Expression)
|
|
|
|
var para_regex []fb.Parameter
|
|
para_regex = append(para_regex, fb.Parameter{Name: "Name", Value: "grep"})
|
|
para_regex = append(para_regex, fb.Parameter{Name: "Match", Value: "kube.*"})
|
|
para_regex = append(para_regex, fb.Parameter{Name: "Regex", Value: field + " " + expression})
|
|
filter = append(filter, fb.Plugin{Type: "fluentbit_filter", Name: "fluentbit-filter-input-regex", Parameters: para_regex})
|
|
}
|
|
|
|
if strings.Compare(item.Type, "Exclude") == 0 {
|
|
field := "kubernetes_" + strings.TrimSpace(item.Field) + "_name"
|
|
expression := strings.TrimSpace(item.Expression)
|
|
|
|
var para_exclude []fb.Parameter
|
|
para_exclude = append(para_exclude, fb.Parameter{Name: "Name", Value: "grep"})
|
|
para_exclude = append(para_exclude, fb.Parameter{Name: "Match", Value: "kube.*"})
|
|
para_exclude = append(para_exclude, fb.Parameter{Name: "Exclude", Value: field + " " + expression})
|
|
filter = append(filter, fb.Plugin{Type: "fluentbit_filter", Name: "fluentbit-filter-input-exclude", Parameters: para_exclude})
|
|
}
|
|
}
|
|
}
|
|
|
|
var para_nest []fb.Parameter
|
|
para_nest = append(para_nest, fb.Parameter{Name: "Name", Value: "nest"})
|
|
para_nest = append(para_nest, fb.Parameter{Name: "Match", Value: "kube.*"})
|
|
para_nest = append(para_nest, fb.Parameter{Name: "Operation", Value: "nest"})
|
|
para_nest = append(para_nest, fb.Parameter{Name: "Wildcard", Value: "kubernetes_*"})
|
|
para_nest = append(para_nest, fb.Parameter{Name: "Nested_under", Value: "kubernetes"})
|
|
para_nest = append(para_nest, fb.Parameter{Name: "Remove_prefix", Value: "kubernetes_"})
|
|
filter = append(filter, fb.Plugin{Type: "fluentbit_filter", Name: "fluentbit-filter-input-nest", Parameters: para_nest})
|
|
|
|
crdcs, scheme, err := createCRDClientSet()
|
|
if err != nil {
|
|
result.Status = http.StatusInternalServerError
|
|
return &result
|
|
}
|
|
|
|
// Create a CRD client interface
|
|
crdclient := fb.CrdClient(crdcs, scheme, "kubesphere-logging-system")
|
|
|
|
var item *fb.FluentBit
|
|
var err_read error
|
|
|
|
item, err_read = crdclient.Get("fluent-bit")
|
|
if err_read != nil {
|
|
result.Status = http.StatusInternalServerError
|
|
return &result
|
|
}
|
|
|
|
item.Spec.Filter = filter
|
|
|
|
itemnew, err := crdclient.Update("fluent-bit", item)
|
|
if err != nil {
|
|
result.Status = http.StatusInternalServerError
|
|
return &result
|
|
}
|
|
|
|
getFilters(&result, itemnew.Spec.Filter)
|
|
result.Status = http.StatusOK
|
|
|
|
return &result
|
|
}
|
|
|
|
func FluentbitOutputsQuery() *FluentbitOutputsResult {
|
|
var result FluentbitOutputsResult
|
|
|
|
// Retrieve outputs from DB
|
|
db := mysql.Client()
|
|
|
|
var outputs []OutputDBBinding
|
|
|
|
err := db.Find(&outputs).Error
|
|
if err != nil {
|
|
result.Status = http.StatusInternalServerError
|
|
return &result
|
|
}
|
|
|
|
var unmarshaledOutputs []fb.OutputPlugin
|
|
|
|
for _, output := range outputs {
|
|
var params []fb.Parameter
|
|
|
|
err = jsonIter.UnmarshalFromString(output.Parameters, ¶ms)
|
|
if err != nil {
|
|
result.Status = http.StatusInternalServerError
|
|
return &result
|
|
}
|
|
|
|
unmarshaledOutputs = append(unmarshaledOutputs,
|
|
fb.OutputPlugin{Plugin: fb.Plugin{Type: output.Type, Name: output.Name, Parameters: params},
|
|
Id: output.Id, Enable: output.Enable, Updatetime: output.Updatetime})
|
|
}
|
|
|
|
result.Outputs = unmarshaledOutputs
|
|
result.Status = http.StatusOK
|
|
|
|
return &result
|
|
}
|
|
|
|
func FluentbitOutputInsert(output fb.OutputPlugin) *FluentbitOutputsResult {
|
|
var result FluentbitOutputsResult
|
|
|
|
params, err := jsoniter.MarshalToString(output.Parameters)
|
|
if err != nil {
|
|
result.Status = http.StatusBadRequest
|
|
return &result
|
|
}
|
|
|
|
// 1. Update DB
|
|
db := mysql.Client()
|
|
|
|
marshaledOutput := OutputDBBinding{Type: output.Type, Name: output.Name,
|
|
Parameters: params, Enable: output.Enable, Updatetime: time.Now()}
|
|
err = db.Create(&marshaledOutput).Error
|
|
if err != nil {
|
|
result.Status = http.StatusInternalServerError
|
|
return &result
|
|
}
|
|
|
|
// 2. Keep CRD in inline with DB
|
|
err = syncFluentbitCRDOutputWithDB(db)
|
|
if err != nil {
|
|
result.Status = http.StatusInternalServerError
|
|
return &result
|
|
}
|
|
|
|
// 3. If it's an configs output added, reset configs client configs
|
|
configs := ParseEsOutputParams(output.Parameters)
|
|
if configs != nil {
|
|
configs.WriteESConfigs()
|
|
}
|
|
|
|
result.Status = http.StatusOK
|
|
return &result
|
|
}
|
|
|
|
func FluentbitOutputUpdate(output fb.OutputPlugin, id string) *FluentbitOutputsResult {
|
|
var result FluentbitOutputsResult
|
|
|
|
// 1. Update DB
|
|
db := mysql.Client()
|
|
|
|
params, err := jsoniter.MarshalToString(output.Parameters)
|
|
if err != nil {
|
|
result.Status = http.StatusBadRequest
|
|
return &result
|
|
}
|
|
|
|
var marshaledOutput OutputDBBinding
|
|
err = db.Where("id = ?", id).First(&marshaledOutput).Error
|
|
if err != nil {
|
|
result.Status = http.StatusInternalServerError
|
|
return &result
|
|
}
|
|
|
|
marshaledOutput.Name = output.Name
|
|
marshaledOutput.Type = output.Type
|
|
marshaledOutput.Parameters = params
|
|
marshaledOutput.Enable = output.Enable
|
|
|
|
err = db.Save(&marshaledOutput).Error
|
|
if err != nil {
|
|
result.Status = http.StatusInternalServerError
|
|
return &result
|
|
}
|
|
|
|
// 2. Keep CRD in inline with DB
|
|
err = syncFluentbitCRDOutputWithDB(db)
|
|
if err != nil {
|
|
result.Status = http.StatusBadRequest
|
|
return &result
|
|
}
|
|
|
|
// 3. If it's an configs output updated, reset configs client configs
|
|
configs := ParseEsOutputParams(output.Parameters)
|
|
if configs != nil {
|
|
configs.WriteESConfigs()
|
|
}
|
|
|
|
result.Status = http.StatusOK
|
|
return &result
|
|
}
|
|
|
|
func FluentbitOutputDelete(id string) *FluentbitOutputsResult {
|
|
var result FluentbitOutputsResult
|
|
|
|
// 1. Remove the record from DB
|
|
db := mysql.Client()
|
|
|
|
err := db.Where("id = ?", id).Delete(&OutputDBBinding{}).Error
|
|
if err != nil {
|
|
result.Status = http.StatusInternalServerError
|
|
return &result
|
|
}
|
|
|
|
// 2. Keep CRD in inline with DB
|
|
err = syncFluentbitCRDOutputWithDB(db)
|
|
if err != nil {
|
|
result.Status = http.StatusBadRequest
|
|
return &result
|
|
}
|
|
|
|
result.Status = http.StatusOK
|
|
return &result
|
|
}
|
|
|
|
func syncFluentbitCRDOutputWithDB(db *gorm.DB) error {
|
|
var outputs []OutputDBBinding
|
|
|
|
err := db.Where("enable is true").Find(&outputs).Error
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var unmarshaledOutputs []fb.Plugin
|
|
|
|
for _, output := range outputs {
|
|
var params []fb.Parameter
|
|
|
|
err = jsonIter.UnmarshalFromString(output.Parameters, ¶ms)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
unmarshaledOutputs = append(unmarshaledOutputs, fb.Plugin{Type: output.Type, Name: output.Name, Parameters: params})
|
|
}
|
|
// Empty output is not allowed, must specify a null-type output
|
|
if len(unmarshaledOutputs) == 0 {
|
|
unmarshaledOutputs = []fb.Plugin{
|
|
{
|
|
Type: "fluentbit_output",
|
|
Name: "fluentbit-output-null",
|
|
Parameters: []fb.Parameter{
|
|
{
|
|
Name: "Name",
|
|
Value: "null",
|
|
},
|
|
{
|
|
Name: "Match",
|
|
Value: "*",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
crdcs, scheme, err := createCRDClientSet()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Create a CRD client interface
|
|
crdclient := fb.CrdClient(crdcs, scheme, "kubesphere-logging-system")
|
|
|
|
fluentbit, err := crdclient.Get("fluent-bit")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
fluentbit.Spec.Output = unmarshaledOutputs
|
|
_, err = crdclient.Update("fluent-bit", fluentbit)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Parse es host, port and index
|
|
func ParseEsOutputParams(params []fb.Parameter) *es.ESConfigs {
|
|
|
|
var (
|
|
isEsFound bool
|
|
|
|
host = "127.0.0.1"
|
|
port = "9200"
|
|
index = "logstash"
|
|
logstashFormat string
|
|
logstashPrefix string
|
|
)
|
|
|
|
for _, param := range params {
|
|
switch param.Name {
|
|
case "Name":
|
|
if param.Value == "es" {
|
|
isEsFound = true
|
|
}
|
|
case "Host":
|
|
host = param.Value
|
|
case "Port":
|
|
port = param.Value
|
|
case "Index":
|
|
index = param.Value
|
|
case "Logstash_Format":
|
|
logstashFormat = strings.ToLower(param.Value)
|
|
case "Logstash_Prefix":
|
|
logstashPrefix = param.Value
|
|
}
|
|
}
|
|
|
|
if !isEsFound {
|
|
return nil
|
|
}
|
|
|
|
// If Logstash_Format is On/True, ignore Index
|
|
if logstashFormat == "on" || logstashFormat == "true" {
|
|
if logstashPrefix != "" {
|
|
index = logstashPrefix
|
|
} else {
|
|
index = "logstash"
|
|
}
|
|
}
|
|
|
|
return &es.ESConfigs{Host: host, Port: port, Index: index}
|
|
}
|