diff --git a/cmd/ks-apiserver/app/options/options.go b/cmd/ks-apiserver/app/options/options.go index 3c2d0043c..3f17b6ca4 100644 --- a/cmd/ks-apiserver/app/options/options.go +++ b/cmd/ks-apiserver/app/options/options.go @@ -6,6 +6,7 @@ import ( "k8s.io/klog" genericoptions "kubesphere.io/kubesphere/pkg/server/options" "kubesphere.io/kubesphere/pkg/simple/client/devops" + esclient "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch" "kubesphere.io/kubesphere/pkg/simple/client/k8s" "kubesphere.io/kubesphere/pkg/simple/client/ldap" "kubesphere.io/kubesphere/pkg/simple/client/mysql" @@ -31,6 +32,7 @@ type ServerRunOptions struct { S3Options *s2is3.S3Options RedisOptions *redis.RedisOptions OpenPitrixOptions *openpitrix.OpenPitrixOptions + LoggingOptions *esclient.ElasticSearchOptions } func NewServerRunOptions() *ServerRunOptions { @@ -47,6 +49,7 @@ func NewServerRunOptions() *ServerRunOptions { S3Options: s2is3.NewS3Options(), RedisOptions: redis.NewRedisOptions(), OpenPitrixOptions: openpitrix.NewOpenPitrixOptions(), + LoggingOptions: esclient.NewElasticSearchOptions(), } return &s @@ -65,6 +68,7 @@ func (s *ServerRunOptions) Flags() (fss cliflag.NamedFlagSets) { s.OpenPitrixOptions.AddFlags(fss.FlagSet("openpitrix")) s.ServiceMeshOptions.AddFlags(fss.FlagSet("servicemesh")) s.MonitoringOptions.AddFlags(fss.FlagSet("monitoring")) + s.LoggingOptions.AddFlags(fss.FlagSet("logging")) fs := fss.FlagSet("klog") local := flag.NewFlagSet("klog", flag.ExitOnError) diff --git a/cmd/ks-apiserver/app/options/validation.go b/cmd/ks-apiserver/app/options/validation.go index b1a6d3ea9..85a8862bd 100644 --- a/cmd/ks-apiserver/app/options/validation.go +++ b/cmd/ks-apiserver/app/options/validation.go @@ -15,6 +15,7 @@ func (s *ServerRunOptions) Validate() []error { errors = append(errors, s.S3Options.Validate()...) errors = append(errors, s.RedisOptions.Validate()...) errors = append(errors, s.OpenPitrixOptions.Validate()...) + errors = append(errors, s.LoggingOptions.Validate()...) return errors } diff --git a/cmd/ks-apiserver/app/server.go b/cmd/ks-apiserver/app/server.go index 316445135..a5a10edb1 100644 --- a/cmd/ks-apiserver/app/server.go +++ b/cmd/ks-apiserver/app/server.go @@ -29,7 +29,6 @@ import ( "kubesphere.io/kubesphere/pkg/apiserver/runtime" "kubesphere.io/kubesphere/pkg/apiserver/servicemesh/tracing" "kubesphere.io/kubesphere/pkg/informers" - logging "kubesphere.io/kubesphere/pkg/models/log" "kubesphere.io/kubesphere/pkg/server" apiserverconfig "kubesphere.io/kubesphere/pkg/server/config" "kubesphere.io/kubesphere/pkg/server/filter" @@ -96,8 +95,6 @@ func Run(s *options.ServerRunOptions, stopCh <-chan struct{}) error { initializeServicemeshConfig(s) - initializeESClientConfig() - err = CreateAPIServer(s) if err != nil { return err @@ -126,24 +123,6 @@ func initializeServicemeshConfig(s *options.ServerRunOptions) { kconfig.Set(config) } -func initializeESClientConfig() { - - // List all outputs - outputs, err := logging.GetFluentbitOutputFromConfigMap() - if err != nil { - klog.Errorln(err) - return - } - - // Iterate the outputs to get elasticsearch configs - for _, output := range outputs { - if configs := logging.ParseEsOutputParams(output.Parameters); configs != nil { - configs.WriteESConfigs() - return - } - } -} - // func CreateAPIServer(s *options.ServerRunOptions) error { var err error @@ -188,7 +167,8 @@ func CreateClientSet(conf *apiserverconfig.Config, stopCh <-chan struct{}) error SetOpenPitrixOptions(conf.OpenPitrixOptions). SetPrometheusOptions(conf.MonitoringOptions). SetRedisOptions(conf.RedisOptions). - SetKubeSphereOptions(conf.KubeSphereOptions) + SetKubeSphereOptions(conf.KubeSphereOptions). + SetElasticSearchOptions(conf.LoggingOptions) client.NewClientSetFactory(csop, stopCh) @@ -303,6 +283,7 @@ func Complete(s *options.ServerRunOptions) error { RedisOptions: s.RedisOptions, S3Options: s.S3Options, OpenPitrixOptions: s.OpenPitrixOptions, + LoggingOptions: s.LoggingOptions, }) s = &options.ServerRunOptions{ @@ -317,6 +298,7 @@ func Complete(s *options.ServerRunOptions) error { RedisOptions: conf.RedisOptions, S3Options: conf.S3Options, OpenPitrixOptions: conf.OpenPitrixOptions, + LoggingOptions: conf.LoggingOptions, } return nil diff --git a/pkg/simple/client/elasticsearch/types.go b/pkg/api/logging/v1alpha2/types.go similarity index 99% rename from pkg/simple/client/elasticsearch/types.go rename to pkg/api/logging/v1alpha2/types.go index 0cb5958e8..7a16a0f6b 100644 --- a/pkg/simple/client/elasticsearch/types.go +++ b/pkg/api/logging/v1alpha2/types.go @@ -1,4 +1,4 @@ -package esclient +package v1alpha2 import ( "encoding/json" diff --git a/pkg/apis/logging/v1alpha2/register.go b/pkg/apis/logging/v1alpha2/register.go index b98dce924..f8e9b2d8d 100644 --- a/pkg/apis/logging/v1alpha2/register.go +++ b/pkg/apis/logging/v1alpha2/register.go @@ -21,11 +21,11 @@ import ( "github.com/emicklei/go-restful" "github.com/emicklei/go-restful-openapi" "k8s.io/apimachinery/pkg/runtime/schema" + "kubesphere.io/kubesphere/pkg/api/logging/v1alpha2" "kubesphere.io/kubesphere/pkg/apiserver/logging" "kubesphere.io/kubesphere/pkg/apiserver/runtime" "kubesphere.io/kubesphere/pkg/constants" "kubesphere.io/kubesphere/pkg/models/log" - esclient "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch" fluentbitclient "kubesphere.io/kubesphere/pkg/simple/client/fluentbit" "net/http" ) @@ -66,8 +66,8 @@ func addWebService(c *restful.Container) error { Param(ws.QueryParameter("from", "The offset from the result set. This field returns query results from the specified offset. It requires **operation** is set to query. Defaults to 0 (i.e. from the beginning of the result set).").DataType("integer").DefaultValue("0").Required(false)). Param(ws.QueryParameter("size", "Size of result to return. It requires **operation** is set to query. Defaults to 10 (i.e. 10 log records).").DataType("integer").DefaultValue("10").Required(false)). Metadata(restfulspec.KeyOpenAPITags, []string{constants.LogQueryTag}). - Writes(esclient.QueryResult{}). - Returns(http.StatusOK, RespOK, esclient.QueryResult{})). + Writes(v1alpha2.QueryResult{}). + Returns(http.StatusOK, RespOK, v1alpha2.QueryResult{})). Consumes(restful.MIME_JSON, restful.MIME_XML). Produces(restful.MIME_JSON) @@ -91,8 +91,8 @@ func addWebService(c *restful.Container) error { Param(ws.QueryParameter("from", "The offset from the result set. This field returns query results from the specified offset. It requires **operation** is set to query. Defaults to 0 (i.e. from the beginning of the result set).").DataType("integer").DefaultValue("0").Required(false)). Param(ws.QueryParameter("size", "Size of result to return. It requires **operation** is set to query. Defaults to 10 (i.e. 10 log records).").DataType("integer").DefaultValue("10").Required(false)). Metadata(restfulspec.KeyOpenAPITags, []string{constants.LogQueryTag}). - Writes(esclient.QueryResult{}). - Returns(http.StatusOK, RespOK, esclient.QueryResult{})). + Writes(v1alpha2.QueryResult{}). + Returns(http.StatusOK, RespOK, v1alpha2.QueryResult{})). Consumes(restful.MIME_JSON, restful.MIME_XML). Produces(restful.MIME_JSON) @@ -114,8 +114,8 @@ func addWebService(c *restful.Container) error { Param(ws.QueryParameter("from", "The offset from the result set. This field returns query results from the specified offset. It requires **operation** is set to query. Defaults to 0 (i.e. from the beginning of the result set).").DataType("integer").DefaultValue("0").Required(false)). Param(ws.QueryParameter("size", "Size of result to return. It requires **operation** is set to query. Defaults to 10 (i.e. 10 log records).").DataType("integer").DefaultValue("10").Required(false)). Metadata(restfulspec.KeyOpenAPITags, []string{constants.LogQueryTag}). - Writes(esclient.QueryResult{}). - Returns(http.StatusOK, RespOK, esclient.QueryResult{})). + Writes(v1alpha2.QueryResult{}). + Returns(http.StatusOK, RespOK, v1alpha2.QueryResult{})). Consumes(restful.MIME_JSON, restful.MIME_XML). Produces(restful.MIME_JSON) @@ -136,8 +136,8 @@ func addWebService(c *restful.Container) error { Param(ws.QueryParameter("from", "The offset from the result set. This field returns query results from the specified offset. It requires **operation** is set to query. Defaults to 0 (i.e. from the beginning of the result set).").DataType("integer").DefaultValue("0").Required(false)). Param(ws.QueryParameter("size", "Size of result to return. It requires **operation** is set to query. Defaults to 10 (i.e. 10 log records).").DataType("integer").DefaultValue("10").Required(false)). Metadata(restfulspec.KeyOpenAPITags, []string{constants.LogQueryTag}). - Writes(esclient.QueryResult{}). - Returns(http.StatusOK, RespOK, esclient.QueryResult{})). + Writes(v1alpha2.QueryResult{}). + Returns(http.StatusOK, RespOK, v1alpha2.QueryResult{})). Consumes(restful.MIME_JSON, restful.MIME_XML). Produces(restful.MIME_JSON) @@ -156,8 +156,8 @@ func addWebService(c *restful.Container) error { Param(ws.QueryParameter("from", "The offset from the result set. This field returns query results from the specified offset. It requires **operation** is set to query. Defaults to 0 (i.e. from the beginning of the result set).").DataType("integer").DefaultValue("0").Required(false)). Param(ws.QueryParameter("size", "Size of result to return. It requires **operation** is set to query. Defaults to 10 (i.e. 10 log records).").DataType("integer").DefaultValue("10").Required(false)). Metadata(restfulspec.KeyOpenAPITags, []string{constants.LogQueryTag}). - Writes(esclient.QueryResult{}). - Returns(http.StatusOK, RespOK, esclient.QueryResult{})). + Writes(v1alpha2.QueryResult{}). + Returns(http.StatusOK, RespOK, v1alpha2.QueryResult{})). Consumes(restful.MIME_JSON, restful.MIME_XML). Produces(restful.MIME_JSON) @@ -175,8 +175,8 @@ func addWebService(c *restful.Container) error { Param(ws.QueryParameter("from", "The offset from the result set. This field returns query results from the specified offset. It requires **operation** is set to query. Defaults to 0 (i.e. from the beginning of the result set).").DataType("integer").DefaultValue("0").Required(false)). Param(ws.QueryParameter("size", "Size of result to return. It requires **operation** is set to query. Defaults to 10 (i.e. 10 log records).").DataType("integer").DefaultValue("10").Required(false)). Metadata(restfulspec.KeyOpenAPITags, []string{constants.LogQueryTag}). - Writes(esclient.QueryResult{}). - Returns(http.StatusOK, RespOK, esclient.QueryResult{})). + Writes(v1alpha2.QueryResult{}). + Returns(http.StatusOK, RespOK, v1alpha2.QueryResult{})). Consumes(restful.MIME_JSON, restful.MIME_XML). Produces(restful.MIME_JSON) diff --git a/pkg/apis/tenant/v1alpha2/register.go b/pkg/apis/tenant/v1alpha2/register.go index 482597389..6454c8664 100644 --- a/pkg/apis/tenant/v1alpha2/register.go +++ b/pkg/apis/tenant/v1alpha2/register.go @@ -23,15 +23,14 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime/schema" devopsv1alpha2 "kubesphere.io/kubesphere/pkg/api/devops/v1alpha2" + "kubesphere.io/kubesphere/pkg/api/logging/v1alpha2" "kubesphere.io/kubesphere/pkg/apis/tenant/v1alpha1" "kubesphere.io/kubesphere/pkg/apiserver/runtime" "kubesphere.io/kubesphere/pkg/apiserver/tenant" "kubesphere.io/kubesphere/pkg/constants" - "kubesphere.io/kubesphere/pkg/server/params" - "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch" - "kubesphere.io/kubesphere/pkg/models" "kubesphere.io/kubesphere/pkg/server/errors" + "kubesphere.io/kubesphere/pkg/server/params" "net/http" ) @@ -177,8 +176,8 @@ func addWebService(c *restful.Container) error { Param(ws.QueryParameter("from", "The offset from the result set. This field returns query results from the specified offset. It requires **operation** is set to query. Defaults to 0 (i.e. from the beginning of the result set).").DataType("integer").DefaultValue("0").Required(false)). Param(ws.QueryParameter("size", "Size of result to return. It requires **operation** is set to query. Defaults to 10 (i.e. 10 log records).").DataType("integer").DefaultValue("10").Required(false)). Metadata(restfulspec.KeyOpenAPITags, []string{constants.TenantResourcesTag}). - Writes(esclient.Response{}). - Returns(http.StatusOK, RespOK, esclient.Response{})). + Writes(v1alpha2.Response{}). + Returns(http.StatusOK, RespOK, v1alpha2.Response{})). Consumes(restful.MIME_JSON, restful.MIME_XML). Produces(restful.MIME_JSON) diff --git a/pkg/apiserver/logging/logging.go b/pkg/apiserver/logging/logging.go index a75e2333a..50348be7d 100644 --- a/pkg/apiserver/logging/logging.go +++ b/pkg/apiserver/logging/logging.go @@ -21,9 +21,10 @@ package logging import ( "github.com/emicklei/go-restful" "k8s.io/klog" + "kubesphere.io/kubesphere/pkg/api/logging/v1alpha2" "kubesphere.io/kubesphere/pkg/models/log" "kubesphere.io/kubesphere/pkg/server/errors" - es "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch" + cs "kubesphere.io/kubesphere/pkg/simple/client" fb "kubesphere.io/kubesphere/pkg/simple/client/fluentbit" "kubesphere.io/kubesphere/pkg/utils/stringutils" "net/http" @@ -32,7 +33,12 @@ import ( ) func LoggingQueryCluster(request *restful.Request, response *restful.Response) { - res := logQuery(log.QueryLevelCluster, request) + res, err := logQuery(log.QueryLevelCluster, request) + if err != nil { + response.WriteHeaderAndEntity(http.StatusServiceUnavailable, err) + return + } + if res.Status != http.StatusOK { response.WriteHeaderAndEntity(res.Status, errors.New(res.Error)) return @@ -42,7 +48,12 @@ func LoggingQueryCluster(request *restful.Request, response *restful.Response) { } func LoggingQueryWorkspace(request *restful.Request, response *restful.Response) { - res := logQuery(log.QueryLevelWorkspace, request) + res, err := logQuery(log.QueryLevelWorkspace, request) + if err != nil { + response.WriteHeaderAndEntity(http.StatusServiceUnavailable, err) + return + } + if res.Status != http.StatusOK { response.WriteHeaderAndEntity(res.Status, errors.New(res.Error)) return @@ -52,7 +63,12 @@ func LoggingQueryWorkspace(request *restful.Request, response *restful.Response) } func LoggingQueryNamespace(request *restful.Request, response *restful.Response) { - res := logQuery(log.QueryLevelNamespace, request) + res, err := logQuery(log.QueryLevelNamespace, request) + if err != nil { + response.WriteHeaderAndEntity(http.StatusServiceUnavailable, err) + return + } + if res.Status != http.StatusOK { response.WriteHeaderAndEntity(res.Status, errors.New(res.Error)) return @@ -62,7 +78,11 @@ func LoggingQueryNamespace(request *restful.Request, response *restful.Response) } func LoggingQueryWorkload(request *restful.Request, response *restful.Response) { - res := logQuery(log.QueryLevelWorkload, request) + res, err := logQuery(log.QueryLevelWorkload, request) + if err != nil { + response.WriteHeaderAndEntity(http.StatusServiceUnavailable, err) + return + } if res.Status != http.StatusOK { response.WriteHeaderAndEntity(res.Status, errors.New(res.Error)) @@ -73,7 +93,12 @@ func LoggingQueryWorkload(request *restful.Request, response *restful.Response) } func LoggingQueryPod(request *restful.Request, response *restful.Response) { - res := logQuery(log.QueryLevelPod, request) + res, err := logQuery(log.QueryLevelPod, request) + if err != nil { + response.WriteHeaderAndEntity(http.StatusServiceUnavailable, err) + return + } + if res.Status != http.StatusOK { response.WriteHeaderAndEntity(res.Status, errors.New(res.Error)) return @@ -82,7 +107,12 @@ func LoggingQueryPod(request *restful.Request, response *restful.Response) { } func LoggingQueryContainer(request *restful.Request, response *restful.Response) { - res := logQuery(log.QueryLevelContainer, request) + res, err := logQuery(log.QueryLevelContainer, request) + if err != nil { + response.WriteHeaderAndEntity(http.StatusServiceUnavailable, err) + return + } + if res.Status != http.StatusOK { response.WriteHeaderAndEntity(res.Status, errors.New(res.Error)) return @@ -158,9 +188,14 @@ func LoggingDeleteFluentbitOutput(request *restful.Request, response *restful.Re response.WriteAsJson(res) } -func logQuery(level log.LogQueryLevel, request *restful.Request) *es.QueryResult { +func logQuery(level log.LogQueryLevel, request *restful.Request) (*v1alpha2.QueryResult, error) { + es, err := cs.ClientSets().ElasticSearch() + if err != nil { + klog.Error(err) + return nil, err + } - var param es.QueryParameters + var param v1alpha2.QueryParameters switch level { case log.QueryLevelCluster: @@ -226,7 +261,6 @@ func logQuery(level log.LogQueryLevel, request *restful.Request) *es.QueryResult param.EndTime = request.QueryParameter("end_time") param.Sort = request.QueryParameter("sort") - var err error param.From, err = strconv.ParseInt(request.QueryParameter("from"), 10, 64) if err != nil { param.From = 0 @@ -236,5 +270,5 @@ func logQuery(level log.LogQueryLevel, request *restful.Request) *es.QueryResult param.Size = 10 } - return es.Query(param) + return es.Query(param), nil } diff --git a/pkg/apiserver/tenant/tenant.go b/pkg/apiserver/tenant/tenant.go index 7b9a6a319..a82d57679 100644 --- a/pkg/apiserver/tenant/tenant.go +++ b/pkg/apiserver/tenant/tenant.go @@ -24,7 +24,8 @@ import ( k8serr "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/net" "k8s.io/klog" - "kubesphere.io/kubesphere/pkg/api/devops/v1alpha2" + devopsv1alpha2 "kubesphere.io/kubesphere/pkg/api/devops/v1alpha2" + loggingv1alpha2 "kubesphere.io/kubesphere/pkg/api/logging/v1alpha2" "kubesphere.io/kubesphere/pkg/apis/tenant/v1alpha1" "kubesphere.io/kubesphere/pkg/apiserver/logging" "kubesphere.io/kubesphere/pkg/constants" @@ -36,7 +37,6 @@ import ( "kubesphere.io/kubesphere/pkg/server/errors" "kubesphere.io/kubesphere/pkg/server/params" - "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch" "kubesphere.io/kubesphere/pkg/utils/sliceutil" "net/http" "strings" @@ -286,7 +286,7 @@ func CreateDevopsProject(req *restful.Request, resp *restful.Response) { workspaceName := req.PathParameter("workspace") username := req.HeaderParameter(constants.UserNameHeader) - var devops v1alpha2.DevOpsProject + var devops devopsv1alpha2.DevOpsProject err := req.ReadEntity(&devops) @@ -374,7 +374,7 @@ func LogQuery(req *restful.Request, resp *restful.Response) { // if the user belongs to no namespace // then no log visible if len(namespaces) == 0 { - res := esclient.QueryResult{Status: http.StatusOK} + res := loggingv1alpha2.QueryResult{Status: http.StatusOK} resp.WriteAsJson(res) return } else if len(queryNamespaces) == 1 && queryNamespaces[0] == "" { @@ -382,7 +382,7 @@ func LogQuery(req *restful.Request, resp *restful.Response) { } else { inter := intersection(queryNamespaces, namespaces) if len(inter) == 0 { - res := esclient.QueryResult{Status: http.StatusOK} + res := loggingv1alpha2.QueryResult{Status: http.StatusOK} resp.WriteAsJson(res) return } diff --git a/pkg/models/log/logcrd.go b/pkg/models/log/logcrd.go index 4470fc169..14a21e542 100644 --- a/pkg/models/log/logcrd.go +++ b/pkg/models/log/logcrd.go @@ -26,8 +26,8 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/klog" + "kubesphere.io/kubesphere/pkg/api/logging/v1alpha2" "kubesphere.io/kubesphere/pkg/informers" - es "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch" fb "kubesphere.io/kubesphere/pkg/simple/client/fluentbit" "net/http" "strings" @@ -102,12 +102,6 @@ func FluentbitOutputInsert(output fb.OutputPlugin) *FluentbitOutputsResult { return &result } - // 3. If it's an configs output added, reset configs client configs - configs := ParseEsOutputParams(output.Parameters) - if configs != nil { - configs.WriteESConfigs() - } - result.Status = http.StatusOK return &result } @@ -155,12 +149,6 @@ func FluentbitOutputUpdate(output fb.OutputPlugin, id string) *FluentbitOutputsR return &result } - // 3. If it's an configs output updated, reset configs client configs - configs := ParseEsOutputParams(output.Parameters) - if configs != nil { - configs.WriteESConfigs() - } - result.Status = http.StatusOK return &result } @@ -333,7 +321,7 @@ func syncFluentbitCRDOutputWithConfigMap(outputs []fb.OutputPlugin) error { } // Parse es host, port and index -func ParseEsOutputParams(params []fb.Parameter) *es.Config { +func ParseEsOutputParams(params []fb.Parameter) *v1alpha2.Config { var ( isEsFound bool @@ -377,5 +365,5 @@ func ParseEsOutputParams(params []fb.Parameter) *es.Config { } } - return &es.Config{Host: host, Port: port, Index: index} + return &v1alpha2.Config{Host: host, Port: port, Index: index} } diff --git a/pkg/server/config/config.go b/pkg/server/config/config.go index 9fd4b57b2..2de5266e5 100644 --- a/pkg/server/config/config.go +++ b/pkg/server/config/config.go @@ -7,11 +7,14 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/klog" "kubesphere.io/kubesphere/pkg/apiserver/runtime" + "kubesphere.io/kubesphere/pkg/simple/client/alerting" "kubesphere.io/kubesphere/pkg/simple/client/devops" + "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch" "kubesphere.io/kubesphere/pkg/simple/client/k8s" "kubesphere.io/kubesphere/pkg/simple/client/kubesphere" "kubesphere.io/kubesphere/pkg/simple/client/ldap" "kubesphere.io/kubesphere/pkg/simple/client/mysql" + "kubesphere.io/kubesphere/pkg/simple/client/notification" "kubesphere.io/kubesphere/pkg/simple/client/openpitrix" "kubesphere.io/kubesphere/pkg/simple/client/prometheus" "kubesphere.io/kubesphere/pkg/simple/client/redis" @@ -136,22 +139,33 @@ type Config struct { S3Options *s2is3.S3Options `json:"s3,omitempty" yaml:"s3,omitempty" mapstructure:"s3"` OpenPitrixOptions *openpitrix.OpenPitrixOptions `json:"openpitrix,omitempty" yaml:"openpitrix,omitempty" mapstructure:"openpitrix"` MonitoringOptions *prometheus.PrometheusOptions `json:"monitoring,omitempty" yaml:"monitoring,omitempty" mapstructure:"monitoring"` - KubeSphereOptions *kubesphere.KubeSphereOptions `json:"-" yaml:"kubesphere,omitempty" mapstructure:"kubesphere"` + LoggingOptions *esclient.ElasticSearchOptions `json:"logging,omitempty" yaml:"logging,omitempty" mapstructure:"logging"` + + // Options below are only loaded from configuration file, no command line flags for these options now. + KubeSphereOptions *kubesphere.KubeSphereOptions `json:"-" yaml:"kubesphere,omitempty" mapstructure:"kubesphere"` + + // Options used for enabling components, not actually used now. Once we switch Alerting/Notification API to kubesphere, + // we can add these options to kubesphere command lines + AlertingOptions *alerting.AlertingOptions `json:"alerting,omitempty" yaml:"alerting,omitempty" mapstructure:"alerting"` + NotificationOptions *notification.NotificationOptions `json:"notification,omitempty" yaml:"notification,omitempty" mapstructure:"notification"` } func newConfig() *Config { return &Config{ - MySQLOptions: mysql.NewMySQLOptions(), - DevopsOptions: devops.NewDevopsOptions(), - SonarQubeOptions: sonarqube.NewSonarQubeOptions(), - KubernetesOptions: k8s.NewKubernetesOptions(), - ServiceMeshOptions: servicemesh.NewServiceMeshOptions(), - LdapOptions: ldap.NewLdapOptions(), - RedisOptions: redis.NewRedisOptions(), - S3Options: s2is3.NewS3Options(), - OpenPitrixOptions: openpitrix.NewOpenPitrixOptions(), - MonitoringOptions: prometheus.NewPrometheusOptions(), - KubeSphereOptions: kubesphere.NewKubeSphereOptions(), + MySQLOptions: mysql.NewMySQLOptions(), + DevopsOptions: devops.NewDevopsOptions(), + SonarQubeOptions: sonarqube.NewSonarQubeOptions(), + KubernetesOptions: k8s.NewKubernetesOptions(), + ServiceMeshOptions: servicemesh.NewServiceMeshOptions(), + LdapOptions: ldap.NewLdapOptions(), + RedisOptions: redis.NewRedisOptions(), + S3Options: s2is3.NewS3Options(), + OpenPitrixOptions: openpitrix.NewOpenPitrixOptions(), + MonitoringOptions: prometheus.NewPrometheusOptions(), + KubeSphereOptions: kubesphere.NewKubeSphereOptions(), + AlertingOptions: alerting.NewAlertingOptions(), + NotificationOptions: notification.NewNotificationOptions(), + LoggingOptions: esclient.NewElasticSearchOptions(), } } @@ -162,6 +176,10 @@ func Get() *Config { func (c *Config) Apply(conf *Config) { shadowConfig = conf + if conf.LoggingOptions != nil { + conf.LoggingOptions.ApplyTo(c.LoggingOptions) + } + if conf.KubeSphereOptions != nil { conf.KubeSphereOptions.ApplyTo(c.KubeSphereOptions) } @@ -246,4 +264,16 @@ func (c *Config) stripEmptyOptions() { c.S3Options = nil } + if c.AlertingOptions != nil && c.AlertingOptions.Endpoint == "" { + c.AlertingOptions = nil + } + + if c.LoggingOptions != nil && c.LoggingOptions.Host == "" { + c.LoggingOptions = nil + } + + if c.NotificationOptions != nil && c.NotificationOptions.Endpoint == "" { + c.NotificationOptions = nil + } + } diff --git a/pkg/server/config/config_test.go b/pkg/server/config/config_test.go index 89e84cfa6..de2c04e2d 100644 --- a/pkg/server/config/config_test.go +++ b/pkg/server/config/config_test.go @@ -4,11 +4,14 @@ import ( "fmt" "gopkg.in/yaml.v2" "io/ioutil" + "kubesphere.io/kubesphere/pkg/simple/client/alerting" "kubesphere.io/kubesphere/pkg/simple/client/devops" + esclient "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch" "kubesphere.io/kubesphere/pkg/simple/client/k8s" "kubesphere.io/kubesphere/pkg/simple/client/kubesphere" "kubesphere.io/kubesphere/pkg/simple/client/ldap" "kubesphere.io/kubesphere/pkg/simple/client/mysql" + "kubesphere.io/kubesphere/pkg/simple/client/notification" "kubesphere.io/kubesphere/pkg/simple/client/openpitrix" "kubesphere.io/kubesphere/pkg/simple/client/prometheus" "kubesphere.io/kubesphere/pkg/simple/client/redis" @@ -83,10 +86,24 @@ func newTestConfig() *Config { Endpoint: "http://prometheus.kubesphere-monitoring-system.svc", SecondaryEndpoint: "http://prometheus.kubesphere-monitoring-system.svc", }, + LoggingOptions: &esclient.ElasticSearchOptions{ + Host: "http://elasticsearch-logging.kubesphere-logging-system.svc:9200", + LogstashFormat: false, + Index: "", + LogstashPrefix: "elk", + Match: "kube.*", + Version: "6", + }, KubeSphereOptions: &kubesphere.KubeSphereOptions{ APIServer: "http://ks-apiserver.kubesphere-system.svc", AccountServer: "http://ks-account.kubesphere-system.svc", }, + AlertingOptions: &alerting.AlertingOptions{ + Endpoint: "http://alerting.kubesphere-alerting-system.svc:9200", + }, + NotificationOptions: ¬ification.NotificationOptions{ + Endpoint: "http://notification.kubesphere-alerting-system.svc:9200", + }, } return conf } diff --git a/pkg/simple/client/alerting/options.go b/pkg/simple/client/alerting/options.go new file mode 100644 index 000000000..67414c92d --- /dev/null +++ b/pkg/simple/client/alerting/options.go @@ -0,0 +1,17 @@ +package alerting + +type AlertingOptions struct { + Endpoint string +} + +func NewAlertingOptions() *AlertingOptions { + return &AlertingOptions{ + Endpoint: "", + } +} + +func (s *AlertingOptions) ApplyTo(options *AlertingOptions) { + if s.Endpoint != "" { + options.Endpoint = s.Endpoint + } +} diff --git a/pkg/simple/client/elasticsearch/esclient.go b/pkg/simple/client/elasticsearch/esclient.go index 2bf2218bc..2d9af100e 100644 --- a/pkg/simple/client/elasticsearch/esclient.go +++ b/pkg/simple/client/elasticsearch/esclient.go @@ -13,13 +13,17 @@ limitations under the License. package esclient import ( + "context" "encoding/json" "fmt" "k8s.io/klog" + "kubesphere.io/kubesphere/pkg/api/logging/v1alpha2" + v5 "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch/versions/v5" + v6 "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch/versions/v6" + v7 "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch/versions/v7" "net/http" "strconv" "strings" - "sync" "time" "github.com/json-iterator/go" @@ -49,48 +53,109 @@ const ( fieldContainerNameKeyword = "kubernetes.container_name.keyword" ) -var jsonIter = jsoniter.ConfigCompatibleWithStandardLibrary - -var ( - mu sync.Mutex - config *Config - - client Client +const ( + ElasticV5 = "5" + ElasticV6 = "6" + ElasticV7 = "7" ) -func (cfg *Config) WriteESConfigs() { - mu.Lock() - defer mu.Unlock() +var jsonIter = jsoniter.ConfigCompatibleWithStandardLibrary - config = cfg - if err := detectVersionMajor(config); err != nil { - klog.Errorln(err) - client = nil - return - } - - client = NewForConfig(config) +type ElasticSearchClient struct { + client Client } -func createQueryRequest(param QueryParameters) (int, []byte, error) { - var request Request - var mainBoolQuery BoolFilter +func NewLoggingClient(options *ElasticSearchOptions) (*ElasticSearchClient, error) { + version := "6" + esClient := &ElasticSearchClient{} + + if options.Version == "" { + var err error + version, err = detectVersionMajor(options.Host) + if err != nil { + return nil, err + } + } + + if options.LogstashFormat { + if options.LogstashPrefix != "" { + options.Index = options.LogstashPrefix + } else { + options.Index = "logstash" + } + } + + switch version { + case ElasticV5: + esClient.client = v5.New(options.Host, options.Index) + case ElasticV6: + esClient.client = v6.New(options.Host, options.Index) + case ElasticV7: + esClient.client = v7.New(options.Host, options.Index) + default: + return nil, fmt.Errorf("unsupported elasticsearch version %s", version) + } + + return esClient, nil +} + +func (c *ElasticSearchClient) ES() *Client { + return &c.client +} + +func detectVersionMajor(host string) (string, error) { + + // Info APIs are backward compatible with versions of v5.x, v6.x and v7.x + es := v6.New(host, "") + res, err := es.Client.Info( + es.Client.Info.WithContext(context.Background()), + ) + if err != nil { + return "", err + } + + defer res.Body.Close() + + var b map[string]interface{} + if err = json.NewDecoder(res.Body).Decode(&b); err != nil { + return "", err + } + if res.IsError() { + // Print the response status and error information. + e, _ := b["error"].(map[string]interface{}) + return "", fmt.Errorf("[%s] %s: %s", res.Status(), e["type"], e["reason"]) + } + + // get the major version + version, _ := b["version"].(map[string]interface{}) + number, _ := version["number"].(string) + if number == "" { + return "", fmt.Errorf("failed to detect elastic version number") + } + + v := strings.Split(number, ".")[0] + return v, nil +} + +func createQueryRequest(param v1alpha2.QueryParameters) (int, []byte, error) { + var request v1alpha2.Request + var mainBoolQuery v1alpha2.BoolFilter if len(param.NamespaceWithCreationTime) != 0 { - var boolShould BoolShould + var boolShould v1alpha2.BoolShould for namespace, creationTime := range param.NamespaceWithCreationTime { - var boolFilter BoolFilter + var boolFilter v1alpha2.BoolFilter - matchPhrase := MatchPhrase{MatchPhrase: map[string]string{fieldNamespaceNameKeyword: namespace}} - rangeQuery := RangeQuery{RangeSpec{TimeRange{creationTime, ""}}} + matchPhrase := v1alpha2.MatchPhrase{MatchPhrase: map[string]string{fieldNamespaceNameKeyword: namespace}} + rangeQuery := v1alpha2.RangeQuery{RangeSpec: v1alpha2.RangeSpec{TimeRange: v1alpha2.TimeRange{Gte: creationTime, Lte: ""}}} boolFilter.Filter = append(boolFilter.Filter, matchPhrase) boolFilter.Filter = append(boolFilter.Filter, rangeQuery) - boolShould.Should = append(boolShould.Should, BoolQuery{Bool: boolFilter}) + boolShould.Should = append(boolShould.Should, v1alpha2.BoolQuery{Bool: boolFilter}) } boolShould.MinimumShouldMatch = 1 - mainBoolQuery.Filter = append(mainBoolQuery.Filter, BoolQuery{Bool: boolShould}) + mainBoolQuery.Filter = append(mainBoolQuery.Filter, v1alpha2.BoolQuery{Bool: boolShould}) } if param.WorkloadFilter != nil { boolQuery := makeBoolShould(regexpQuery, fieldPodNameKeyword, param.WorkloadFilter) @@ -122,15 +187,15 @@ func createQueryRequest(param QueryParameters) (int, []byte, error) { mainBoolQuery.Filter = append(mainBoolQuery.Filter, boolQuery) } - rangeQuery := RangeQuery{RangeSpec{TimeRange{param.StartTime, param.EndTime}}} + rangeQuery := v1alpha2.RangeQuery{RangeSpec: v1alpha2.RangeSpec{TimeRange: v1alpha2.TimeRange{Gte: param.StartTime, Lte: param.EndTime}}} mainBoolQuery.Filter = append(mainBoolQuery.Filter, rangeQuery) var operation int if param.Operation == "statistics" { operation = OperationStatistics - containerAgg := AggField{"kubernetes.docker_id.keyword"} - statisticAggs := StatisticsAggs{ContainerAgg{containerAgg}} + containerAgg := v1alpha2.AggField{Field: "kubernetes.docker_id.keyword"} + statisticAggs := v1alpha2.StatisticsAggs{ContainerAgg: v1alpha2.ContainerAgg{Cardinality: containerAgg}} request.Aggs = statisticAggs request.Size = 0 } else if param.Operation == "histogram" { @@ -142,7 +207,7 @@ func createQueryRequest(param QueryParameters) (int, []byte, error) { interval = "15m" } param.Interval = interval - request.Aggs = HistogramAggs{HistogramAgg{DateHistogram{"time", interval}}} + request.Aggs = v1alpha2.HistogramAggs{HistogramAgg: v1alpha2.HistogramAgg{DateHistogram: v1alpha2.DateHistogram{Field: "time", Interval: interval}}} request.Size = 0 } else { operation = OperationQuery @@ -154,25 +219,25 @@ func createQueryRequest(param QueryParameters) (int, []byte, error) { } else { order = "desc" } - request.Sorts = append(request.Sorts, Sort{Order{order}}) + request.Sorts = append(request.Sorts, v1alpha2.Sort{Order: v1alpha2.Order{Order: order}}) - var mainHighLight MainHighLight - mainHighLight.Fields = append(mainHighLight.Fields, LogHighLightField{}) - mainHighLight.Fields = append(mainHighLight.Fields, NamespaceHighLightField{}) - mainHighLight.Fields = append(mainHighLight.Fields, PodHighLightField{}) - mainHighLight.Fields = append(mainHighLight.Fields, ContainerHighLightField{}) + var mainHighLight v1alpha2.MainHighLight + mainHighLight.Fields = append(mainHighLight.Fields, v1alpha2.LogHighLightField{}) + mainHighLight.Fields = append(mainHighLight.Fields, v1alpha2.NamespaceHighLightField{}) + mainHighLight.Fields = append(mainHighLight.Fields, v1alpha2.PodHighLightField{}) + mainHighLight.Fields = append(mainHighLight.Fields, v1alpha2.ContainerHighLightField{}) mainHighLight.FragmentSize = 0 request.MainHighLight = mainHighLight } - request.MainQuery = BoolQuery{mainBoolQuery} + request.MainQuery = v1alpha2.BoolQuery{Bool: mainBoolQuery} queryRequest, err := json.Marshal(request) return operation, queryRequest, err } -func makeBoolShould(queryType int, field string, list []string) BoolQuery { +func makeBoolShould(queryType int, field string, list []string) v1alpha2.BoolQuery { var should []interface{} for _, phrase := range list { @@ -180,18 +245,18 @@ func makeBoolShould(queryType int, field string, list []string) BoolQuery { switch queryType { case matchPhrase: - q = MatchPhrase{MatchPhrase: map[string]string{field: phrase}} + q = v1alpha2.MatchPhrase{MatchPhrase: map[string]string{field: phrase}} case matchPhrasePrefix: - q = MatchPhrasePrefix{MatchPhrasePrefix: map[string]string{field: phrase}} + q = v1alpha2.MatchPhrasePrefix{MatchPhrasePrefix: map[string]string{field: phrase}} case regexpQuery: - q = RegexpQuery{Regexp: map[string]string{field: makePodNameRegexp(phrase)}} + q = v1alpha2.RegexpQuery{Regexp: map[string]string{field: makePodNameRegexp(phrase)}} } should = append(should, q) } - return BoolQuery{ - Bool: BoolShould{ + return v1alpha2.BoolQuery{ + Bool: v1alpha2.BoolShould{ Should: should, MinimumShouldMatch: 1, }, @@ -244,10 +309,10 @@ func calcTimestamp(input string) int64 { return ret } -func parseQueryResult(operation int, param QueryParameters, body []byte) *QueryResult { - var queryResult QueryResult +func (c *ElasticSearchClient) parseQueryResult(operation int, param v1alpha2.QueryParameters, body []byte) *v1alpha2.QueryResult { + var queryResult v1alpha2.QueryResult - var response Response + var response v1alpha2.Response err := jsonIter.Unmarshal(body, &response) if err != nil { klog.Errorln(err) @@ -273,12 +338,12 @@ func parseQueryResult(operation int, param QueryParameters, body []byte) *QueryR switch operation { case OperationQuery: - var readResult ReadResult - readResult.Total = client.GetTotalHitCount(response.Hits.Total) + var readResult v1alpha2.ReadResult + readResult.Total = c.client.GetTotalHitCount(response.Hits.Total) readResult.From = param.From readResult.Size = param.Size for _, hit := range response.Hits.Hits { - var logRecord LogRecord + var logRecord v1alpha2.LogRecord logRecord.Time = calcTimestamp(hit.Source.Time) logRecord.Log = hit.Source.Log logRecord.Namespace = hit.Source.Kubernetes.Namespace @@ -291,7 +356,7 @@ func parseQueryResult(operation int, param QueryParameters, body []byte) *QueryR queryResult.Read = &readResult case OperationStatistics: - var statisticsResponse StatisticsResponseAggregations + var statisticsResponse v1alpha2.StatisticsResponseAggregations err := jsonIter.Unmarshal(response.Aggregations, &statisticsResponse) if err != nil && response.Aggregations != nil { klog.Errorln(err) @@ -299,17 +364,17 @@ func parseQueryResult(operation int, param QueryParameters, body []byte) *QueryR queryResult.Error = err.Error() return &queryResult } - queryResult.Statistics = &StatisticsResult{Containers: statisticsResponse.ContainerCount.Value, Logs: client.GetTotalHitCount(response.Hits.Total)} + queryResult.Statistics = &v1alpha2.StatisticsResult{Containers: statisticsResponse.ContainerCount.Value, Logs: c.client.GetTotalHitCount(response.Hits.Total)} case OperationHistogram: - var histogramResult HistogramResult - histogramResult.Total = client.GetTotalHitCount(response.Hits.Total) + var histogramResult v1alpha2.HistogramResult + histogramResult.Total = c.client.GetTotalHitCount(response.Hits.Total) histogramResult.StartTime = calcTimestamp(param.StartTime) histogramResult.EndTime = calcTimestamp(param.EndTime) histogramResult.Interval = param.Interval - var histogramAggregations HistogramAggregations - err := jsonIter.Unmarshal(response.Aggregations, &histogramAggregations) + var histogramAggregations v1alpha2.HistogramAggregations + err = jsonIter.Unmarshal(response.Aggregations, &histogramAggregations) if err != nil && response.Aggregations != nil { klog.Errorln(err) queryResult.Status = http.StatusInternalServerError @@ -317,7 +382,7 @@ func parseQueryResult(operation int, param QueryParameters, body []byte) *QueryR return &queryResult } for _, histogram := range histogramAggregations.HistogramAggregation.Histograms { - var histogramRecord HistogramRecord + var histogramRecord v1alpha2.HistogramRecord histogramRecord.Time = histogram.Time histogramRecord.Count = histogram.Count @@ -332,30 +397,30 @@ func parseQueryResult(operation int, param QueryParameters, body []byte) *QueryR return &queryResult } -func Query(param QueryParameters) *QueryResult { +func (c *ElasticSearchClient) Query(param v1alpha2.QueryParameters) *v1alpha2.QueryResult { - var queryResult = new(QueryResult) + var queryResult = new(v1alpha2.QueryResult) if param.NamespaceNotFound { - queryResult = new(QueryResult) + queryResult = new(v1alpha2.QueryResult) queryResult.Status = http.StatusOK switch param.Operation { case "statistics": - queryResult.Statistics = new(StatisticsResult) + queryResult.Statistics = new(v1alpha2.StatisticsResult) case "histogram": - queryResult.Histogram = &HistogramResult{ + queryResult.Histogram = &v1alpha2.HistogramResult{ StartTime: calcTimestamp(param.StartTime), EndTime: calcTimestamp(param.EndTime), Interval: param.Interval} default: - queryResult.Read = new(ReadResult) + queryResult.Read = new(v1alpha2.ReadResult) } return queryResult } - if client == nil { + if c.client == nil { queryResult.Status = http.StatusBadRequest - queryResult.Error = fmt.Sprintf("Invalid elasticsearch address: host=%s, port=%s", config.Host, config.Port) + queryResult.Error = "can not create elastic search client" return queryResult } @@ -367,16 +432,16 @@ func Query(param QueryParameters) *QueryResult { return queryResult } - body, err := client.Search(query) + body, err := c.client.Search(query) if err != nil { klog.Errorln(err) - queryResult = new(QueryResult) + queryResult = new(v1alpha2.QueryResult) queryResult.Status = http.StatusInternalServerError queryResult.Error = err.Error() return queryResult } - queryResult = parseQueryResult(operation, param, body) + queryResult = c.parseQueryResult(operation, param, body) return queryResult } diff --git a/pkg/simple/client/elasticsearch/interface.go b/pkg/simple/client/elasticsearch/interface.go index 291a8cc73..ddbf9b3d8 100644 --- a/pkg/simple/client/elasticsearch/interface.go +++ b/pkg/simple/client/elasticsearch/interface.go @@ -1,72 +1,7 @@ package esclient -import ( - "context" - "encoding/json" - "fmt" - v5 "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch/versions/v5" - v6 "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch/versions/v6" - v7 "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch/versions/v7" - "strings" -) - -const ( - ElasticV5 = "5" - ElasticV6 = "6" - ElasticV7 = "7" -) - type Client interface { // Perform Search API Search(body []byte) ([]byte, error) GetTotalHitCount(v interface{}) int64 } - -func NewForConfig(cfg *Config) Client { - address := fmt.Sprintf("http://%s:%s", cfg.Host, cfg.Port) - index := cfg.Index - switch cfg.VersionMajor { - case ElasticV5: - return v5.New(address, index) - case ElasticV6: - return v6.New(address, index) - case ElasticV7: - return v7.New(address, index) - default: - return nil - } -} - -func detectVersionMajor(cfg *Config) error { - - // Info APIs are backward compatible with versions of v5.x, v6.x and v7.x - address := fmt.Sprintf("http://%s:%s", cfg.Host, cfg.Port) - es := v6.New(address, "") - res, err := es.Client.Info( - es.Client.Info.WithContext(context.Background()), - ) - if err != nil { - return err - } - defer res.Body.Close() - - var b map[string]interface{} - if err := json.NewDecoder(res.Body).Decode(&b); err != nil { - return err - } - if res.IsError() { - // Print the response status and error information. - e, _ := b["error"].(map[string]interface{}) - return fmt.Errorf("[%s] %s: %s", res.Status(), e["type"], e["reason"]) - } - - // get the major version - version, _ := b["version"].(map[string]interface{}) - number, _ := version["number"].(string) - if number == "" { - return fmt.Errorf("failed to detect elastic version number") - } - - cfg.VersionMajor = strings.Split(number, ".")[0] - return nil -} diff --git a/pkg/simple/client/elasticsearch/options.go b/pkg/simple/client/elasticsearch/options.go new file mode 100644 index 000000000..946d5dbeb --- /dev/null +++ b/pkg/simple/client/elasticsearch/options.go @@ -0,0 +1,63 @@ +package esclient + +import ( + "github.com/spf13/pflag" + "kubesphere.io/kubesphere/pkg/utils/reflectutils" +) + +type ElasticSearchOptions struct { + Host string `json:"host,omitempty" yaml:"host,omitempty"` + LogstashFormat bool `json:"logstashFormat,omitempty" yaml:"logstashFormat,omitempty"` + Index string `json:",omitempty" yaml:",omitempty"` + LogstashPrefix string `json:"logstashPrefix,omitempty" yaml:"logstashPrefix,omitempty"` + Match string `json:",omitempty" yaml:",omitempty"` + Version string `json:",omitempty" yaml:",omitempty"` +} + +func NewElasticSearchOptions() *ElasticSearchOptions { + return &ElasticSearchOptions{ + Host: "", + LogstashFormat: false, + Index: "fluentbit", + LogstashPrefix: "", + Match: "kube.*", + Version: "6", + } +} + +func (s *ElasticSearchOptions) ApplyTo(options *ElasticSearchOptions) { + if s.Host != "" { + reflectutils.Override(options, s) + } +} + +func (s *ElasticSearchOptions) Validate() []error { + errs := []error{} + + return errs +} + +func (s *ElasticSearchOptions) AddFlags(fs *pflag.FlagSet) { + fs.StringVar(&s.Host, "elasticsearch-host", s.Host, ""+ + "ElasticSearch logging service host. KubeSphere is using elastic as log store, "+ + "if this filed left blank, KubeSphere will use kubernetes builtin log API instead, and"+ + " the following elastic search options will be ignored.") + + fs.BoolVar(&s.LogstashFormat, "logstash-format", s.LogstashFormat, ""+ + "Whether to toggle logstash format compatibility.") + + fs.StringVar(&s.LogstashPrefix, "logstash-prefix", s.LogstashPrefix, ""+ + "If logstash-format is enabled, the Index name is composed using a prefix and the date,"+ + "e.g: If logstash-prefix is equals to 'mydata' your index will become 'mydata-YYYY.MM.DD'."+ + "The last string appended belongs to the date when the data is being generated.") + + fs.StringVar(&s.Match, "elasticsearch-match", s.Match, ""+ + "The regex match for index, eg. kube.*") + + fs.StringVar(&s.Index, "elasticsearch-index", s.Index, ""+ + "Index name.") + + fs.StringVar(&s.Version, "elasticsearch-version", s.Version, ""+ + "ElasticSearch major version, e.g. 5/6/7, if left blank, will detect automatically."+ + "Currently, minimum supported version is 5.x") +} diff --git a/pkg/simple/client/elasticsearch/versions/v5/elasticsearch.go b/pkg/simple/client/elasticsearch/versions/v5/elasticsearch.go index a66f60c3e..6fd96fc4e 100644 --- a/pkg/simple/client/elasticsearch/versions/v5/elasticsearch.go +++ b/pkg/simple/client/elasticsearch/versions/v5/elasticsearch.go @@ -14,16 +14,16 @@ type Elastic struct { index string } -func New(address string, index string) Elastic { +func New(address string, index string) *Elastic { client, _ := elasticsearch.NewClient(elasticsearch.Config{ Addresses: []string{address}, }) - return Elastic{client: client, index: index} + return &Elastic{client: client, index: index} } -func (e Elastic) Search(body []byte) ([]byte, error) { +func (e *Elastic) Search(body []byte) ([]byte, error) { response, err := e.client.Search( e.client.Search.WithContext(context.Background()), @@ -49,7 +49,7 @@ func (e Elastic) Search(body []byte) ([]byte, error) { return ioutil.ReadAll(response.Body) } -func (e Elastic) GetTotalHitCount(v interface{}) int64 { +func (e *Elastic) GetTotalHitCount(v interface{}) int64 { f, _ := v.(float64) return int64(f) } diff --git a/pkg/simple/client/elasticsearch/versions/v6/elasticsearch.go b/pkg/simple/client/elasticsearch/versions/v6/elasticsearch.go index 2c116ed2a..577447747 100644 --- a/pkg/simple/client/elasticsearch/versions/v6/elasticsearch.go +++ b/pkg/simple/client/elasticsearch/versions/v6/elasticsearch.go @@ -14,16 +14,16 @@ type Elastic struct { index string } -func New(address string, index string) Elastic { +func New(address string, index string) *Elastic { client, _ := elasticsearch.NewClient(elasticsearch.Config{ Addresses: []string{address}, }) - return Elastic{Client: client, index: index} + return &Elastic{Client: client, index: index} } -func (e Elastic) Search(body []byte) ([]byte, error) { +func (e *Elastic) Search(body []byte) ([]byte, error) { response, err := e.Client.Search( e.Client.Search.WithContext(context.Background()), @@ -49,7 +49,7 @@ func (e Elastic) Search(body []byte) ([]byte, error) { return ioutil.ReadAll(response.Body) } -func (e Elastic) GetTotalHitCount(v interface{}) int64 { +func (e *Elastic) GetTotalHitCount(v interface{}) int64 { f, _ := v.(float64) return int64(f) } diff --git a/pkg/simple/client/elasticsearch/versions/v7/elasticsearch.go b/pkg/simple/client/elasticsearch/versions/v7/elasticsearch.go index 27634b4ed..bc1293baf 100644 --- a/pkg/simple/client/elasticsearch/versions/v7/elasticsearch.go +++ b/pkg/simple/client/elasticsearch/versions/v7/elasticsearch.go @@ -14,16 +14,16 @@ type Elastic struct { index string } -func New(address string, index string) Elastic { +func New(address string, index string) *Elastic { client, _ := elasticsearch.NewClient(elasticsearch.Config{ Addresses: []string{address}, }) - return Elastic{client: client, index: index} + return &Elastic{client: client, index: index} } -func (e Elastic) Search(body []byte) ([]byte, error) { +func (e *Elastic) Search(body []byte) ([]byte, error) { response, err := e.client.Search( e.client.Search.WithContext(context.Background()), @@ -50,7 +50,7 @@ func (e Elastic) Search(body []byte) ([]byte, error) { return ioutil.ReadAll(response.Body) } -func (e Elastic) GetTotalHitCount(v interface{}) int64 { +func (e *Elastic) GetTotalHitCount(v interface{}) int64 { m, _ := v.(map[string]interface{}) f, _ := m["value"].(float64) return int64(f) diff --git a/pkg/simple/client/factory.go b/pkg/simple/client/factory.go index 165b607d5..a605f29a5 100644 --- a/pkg/simple/client/factory.go +++ b/pkg/simple/client/factory.go @@ -4,6 +4,7 @@ import ( "fmt" goredis "github.com/go-redis/redis" "kubesphere.io/kubesphere/pkg/simple/client/devops" + esclient "kubesphere.io/kubesphere/pkg/simple/client/elasticsearch" "kubesphere.io/kubesphere/pkg/simple/client/k8s" "kubesphere.io/kubesphere/pkg/simple/client/kubesphere" "kubesphere.io/kubesphere/pkg/simple/client/ldap" @@ -25,30 +26,32 @@ func (e ClientSetNotEnabledError) Error() string { } type ClientSetOptions struct { - mySQLOptions *mysql.MySQLOptions - redisOptions *redis.RedisOptions - kubernetesOptions *k8s.KubernetesOptions - devopsOptions *devops.DevopsOptions - sonarqubeOptions *sonarqube.SonarQubeOptions - ldapOptions *ldap.LdapOptions - s3Options *s2is3.S3Options - openPitrixOptions *openpitrix.OpenPitrixOptions - prometheusOptions *prometheus.PrometheusOptions - kubesphereOptions *kubesphere.KubeSphereOptions + mySQLOptions *mysql.MySQLOptions + redisOptions *redis.RedisOptions + kubernetesOptions *k8s.KubernetesOptions + devopsOptions *devops.DevopsOptions + sonarqubeOptions *sonarqube.SonarQubeOptions + ldapOptions *ldap.LdapOptions + s3Options *s2is3.S3Options + openPitrixOptions *openpitrix.OpenPitrixOptions + prometheusOptions *prometheus.PrometheusOptions + kubesphereOptions *kubesphere.KubeSphereOptions + elasticSearhOptions *esclient.ElasticSearchOptions } func NewClientSetOptions() *ClientSetOptions { return &ClientSetOptions{ - mySQLOptions: mysql.NewMySQLOptions(), - redisOptions: redis.NewRedisOptions(), - kubernetesOptions: k8s.NewKubernetesOptions(), - ldapOptions: ldap.NewLdapOptions(), - devopsOptions: devops.NewDevopsOptions(), - sonarqubeOptions: sonarqube.NewSonarQubeOptions(), - s3Options: s2is3.NewS3Options(), - openPitrixOptions: openpitrix.NewOpenPitrixOptions(), - prometheusOptions: prometheus.NewPrometheusOptions(), - kubesphereOptions: kubesphere.NewKubeSphereOptions(), + mySQLOptions: mysql.NewMySQLOptions(), + redisOptions: redis.NewRedisOptions(), + kubernetesOptions: k8s.NewKubernetesOptions(), + ldapOptions: ldap.NewLdapOptions(), + devopsOptions: devops.NewDevopsOptions(), + sonarqubeOptions: sonarqube.NewSonarQubeOptions(), + s3Options: s2is3.NewS3Options(), + openPitrixOptions: openpitrix.NewOpenPitrixOptions(), + prometheusOptions: prometheus.NewPrometheusOptions(), + kubesphereOptions: kubesphere.NewKubeSphereOptions(), + elasticSearhOptions: esclient.NewElasticSearchOptions(), } } @@ -102,6 +105,11 @@ func (c *ClientSetOptions) SetKubeSphereOptions(options *kubesphere.KubeSphereOp return c } +func (c *ClientSetOptions) SetElasticSearchOptions(options *esclient.ElasticSearchOptions) *ClientSetOptions { + c.elasticSearhOptions = options + return c +} + // ClientSet provide best of effort service to initialize clients, // but there is no guarantee to return a valid client instance, // so do validity check before use @@ -111,15 +119,16 @@ type ClientSet struct { mySQLClient *mysql.MySQLClient - k8sClient *k8s.KubernetesClient - ldapClient *ldap.LdapClient - devopsClient *devops.DevopsClient - sonarQubeClient *sonarqube.SonarQubeClient - redisClient *redis.RedisClient - s3Client *s2is3.S3Client - prometheusClient *prometheus.PrometheusClient - openpitrixClient *openpitrix.OpenPitrixClient - kubesphereClient *kubesphere.KubeSphereClient + k8sClient *k8s.KubernetesClient + ldapClient *ldap.LdapClient + devopsClient *devops.DevopsClient + sonarQubeClient *sonarqube.SonarQubeClient + redisClient *redis.RedisClient + s3Client *s2is3.S3Client + prometheusClient *prometheus.PrometheusClient + openpitrixClient *openpitrix.OpenPitrixClient + kubesphereClient *kubesphere.KubeSphereClient + elasticSearchClient *esclient.ElasticSearchClient } var mutex sync.Mutex @@ -335,3 +344,27 @@ func (cs *ClientSet) Prometheus() (*prometheus.PrometheusClient, error) { func (cs *ClientSet) KubeSphere() *kubesphere.KubeSphereClient { return cs.kubesphereClient } + +func (cs *ClientSet) ElasticSearch() (*esclient.ElasticSearchClient, error) { + var err error + + if cs.csoptions.elasticSearhOptions == nil || cs.csoptions.elasticSearhOptions.Host == "" { + return nil, ClientSetNotEnabledError{} + } + + if cs.elasticSearchClient != nil { + return cs.elasticSearchClient, nil + } else { + mutex.Lock() + defer mutex.Unlock() + + if cs.elasticSearchClient == nil { + cs.elasticSearchClient, err = esclient.NewLoggingClient(cs.csoptions.elasticSearhOptions) + if err != nil { + return nil, err + } + } + + return cs.elasticSearchClient, nil + } +} diff --git a/pkg/simple/client/notification/options.go b/pkg/simple/client/notification/options.go new file mode 100644 index 000000000..5c8a6efa0 --- /dev/null +++ b/pkg/simple/client/notification/options.go @@ -0,0 +1,17 @@ +package notification + +type NotificationOptions struct { + Endpoint string +} + +func NewNotificationOptions() *NotificationOptions { + return &NotificationOptions{ + Endpoint: "", + } +} + +func (s *NotificationOptions) ApplyTo(options *NotificationOptions) { + if s.Endpoint != "" { + options.Endpoint = s.Endpoint + } +}