diff --git a/pkg/simple/client/es/versions/opensearchv1/opensearchv1.go b/pkg/simple/client/es/versions/opensearchv1/opensearchv1.go index 3537284ff..3df28f3fe 100644 --- a/pkg/simple/client/es/versions/opensearchv1/opensearchv1.go +++ b/pkg/simple/client/es/versions/opensearchv1/opensearchv1.go @@ -26,6 +26,7 @@ import ( "net/http" "time" + jsoniter "github.com/json-iterator/go" "github.com/opensearch-project/opensearch-go" "github.com/opensearch-project/opensearch-go/opensearchapi" @@ -86,9 +87,16 @@ func (o *OpenSearch) Search(indices string, body []byte, scroll bool) ([]byte, e } func (o *OpenSearch) Scroll(id string) ([]byte, error) { + body, err := jsoniter.Marshal(map[string]string{ + "scroll_id": id, + }) + if err != nil { + return nil, err + } + response, err := o.client.Scroll( o.client.Scroll.WithContext(context.Background()), - o.client.Scroll.WithScrollID(id), + o.client.Scroll.WithBody(bytes.NewBuffer(body)), o.client.Scroll.WithScroll(time.Minute)) if err != nil { return nil, err diff --git a/pkg/simple/client/es/versions/opensearchv2/opensearchv2.go b/pkg/simple/client/es/versions/opensearchv2/opensearchv2.go index eceb34118..0912ba252 100644 --- a/pkg/simple/client/es/versions/opensearchv2/opensearchv2.go +++ b/pkg/simple/client/es/versions/opensearchv2/opensearchv2.go @@ -26,6 +26,7 @@ import ( "net/http" "time" + jsoniter "github.com/json-iterator/go" "github.com/opensearch-project/opensearch-go/v2" "github.com/opensearch-project/opensearch-go/v2/opensearchapi" @@ -86,9 +87,16 @@ func (o *OpenSearch) Search(indices string, body []byte, scroll bool) ([]byte, e } func (o *OpenSearch) Scroll(id string) ([]byte, error) { + body, err := jsoniter.Marshal(map[string]string{ + "scroll_id": id, + }) + if err != nil { + return nil, err + } + response, err := o.client.Scroll( o.client.Scroll.WithContext(context.Background()), - o.client.Scroll.WithScrollID(id), + o.client.Scroll.WithBody(bytes.NewBuffer(body)), o.client.Scroll.WithScroll(time.Minute)) if err != nil { return nil, err diff --git a/pkg/simple/client/es/versions/v5/v5.go b/pkg/simple/client/es/versions/v5/v5.go index 3880f9e59..2cee38b23 100644 --- a/pkg/simple/client/es/versions/v5/v5.go +++ b/pkg/simple/client/es/versions/v5/v5.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/go-elasticsearch/v5" "github.com/elastic/go-elasticsearch/v5/esapi" + jsoniter "github.com/json-iterator/go" "kubesphere.io/kubesphere/pkg/simple/client/es/versions" ) @@ -85,9 +86,16 @@ func (e *Elastic) Search(indices string, body []byte, scroll bool) ([]byte, erro } func (e *Elastic) Scroll(id string) ([]byte, error) { + body, err := jsoniter.Marshal(map[string]string{ + "scroll_id": id, + }) + if err != nil { + return nil, err + } + response, err := e.client.Scroll( e.client.Scroll.WithContext(context.Background()), - e.client.Scroll.WithScrollID(id), + e.client.Scroll.WithBody(bytes.NewBuffer(body)), e.client.Scroll.WithScroll(time.Minute)) if err != nil { return nil, err diff --git a/pkg/simple/client/es/versions/v6/v6.go b/pkg/simple/client/es/versions/v6/v6.go index 84d818f14..45c054d75 100644 --- a/pkg/simple/client/es/versions/v6/v6.go +++ b/pkg/simple/client/es/versions/v6/v6.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/go-elasticsearch/v6" "github.com/elastic/go-elasticsearch/v6/esapi" + jsoniter "github.com/json-iterator/go" "kubesphere.io/kubesphere/pkg/simple/client/es/versions" ) @@ -85,9 +86,16 @@ func (e *Elastic) Search(indices string, body []byte, scroll bool) ([]byte, erro } func (e *Elastic) Scroll(id string) ([]byte, error) { + body, err := jsoniter.Marshal(map[string]string{ + "scroll_id": id, + }) + if err != nil { + return nil, err + } + response, err := e.Client.Scroll( e.Client.Scroll.WithContext(context.Background()), - e.Client.Scroll.WithScrollID(id), + e.Client.Scroll.WithBody(bytes.NewBuffer(body)), e.Client.Scroll.WithScroll(time.Minute)) if err != nil { return nil, err diff --git a/pkg/simple/client/es/versions/v7/v7.go b/pkg/simple/client/es/versions/v7/v7.go index bcf50c488..6b4166e35 100644 --- a/pkg/simple/client/es/versions/v7/v7.go +++ b/pkg/simple/client/es/versions/v7/v7.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esapi" + jsoniter "github.com/json-iterator/go" "kubesphere.io/kubesphere/pkg/simple/client/es/versions" ) @@ -86,9 +87,16 @@ func (e *Elastic) Search(indices string, body []byte, scroll bool) ([]byte, erro } func (e *Elastic) Scroll(id string) ([]byte, error) { + body, err := jsoniter.Marshal(map[string]string{ + "scroll_id": id, + }) + if err != nil { + return nil, err + } + response, err := e.client.Scroll( e.client.Scroll.WithContext(context.Background()), - e.client.Scroll.WithScrollID(id), + e.client.Scroll.WithBody(bytes.NewBuffer(body)), e.client.Scroll.WithScroll(time.Minute)) if err != nil { return nil, err diff --git a/pkg/simple/client/logging/elasticsearch/elasticsearch.go b/pkg/simple/client/logging/elasticsearch/elasticsearch.go index b49cfe9a0..fcb46948b 100644 --- a/pkg/simple/client/logging/elasticsearch/elasticsearch.go +++ b/pkg/simple/client/logging/elasticsearch/elasticsearch.go @@ -51,12 +51,15 @@ type Kubernetes struct { // Elasticsearch implement logging interface type client struct { - c *es.Client + c *es.Client + ExportLogsLimit int } func NewClient(options *logging.Options) (logging.Client, error) { - c := &client{} + c := &client{ + ExportLogsLimit: options.ExportLogsLimit, + } var err error c.c, err = es.NewClient(options.Host, options.BasicAuth, options.Username, options.Password, options.IndexPrefix, options.Version) @@ -162,14 +165,8 @@ func (c *client) ExportLogs(sf logging.SearchFilter, w io.Writer) error { data = append(data, c.getSource(hit.Source).Log) } - // limit to retrieve max 100k records - for i := 0; i < 100; i++ { - if i != 0 { - data, id, err = c.scroll(id) - if err != nil { - return err - } - } + size := 0 + for { if len(data) == 0 { return nil } @@ -182,8 +179,17 @@ func (c *client) ExportLogs(sf logging.SearchFilter, w io.Writer) error { if err != nil { return err } + + size = size + 1000 + if size >= c.ExportLogsLimit { + return nil + } + + data, id, err = c.scroll(id) + if err != nil { + return err + } } - return nil } func (c *client) scroll(id string) ([]string, string, error) { diff --git a/pkg/simple/client/logging/options.go b/pkg/simple/client/logging/options.go index 7cd5bb605..4df07bb90 100644 --- a/pkg/simple/client/logging/options.go +++ b/pkg/simple/client/logging/options.go @@ -22,20 +22,26 @@ import ( "kubesphere.io/kubesphere/pkg/utils/reflectutils" ) +const ( + exportLogsLimitDefault = 100000 +) + type Options struct { - Host string `json:"host" yaml:"host"` - BasicAuth bool `json:"basicAuth" yaml:"basicAuth"` - Username string `json:"username" yaml:"username"` - Password string `json:"password" yaml:"password"` - IndexPrefix string `json:"indexPrefix,omitempty" yaml:"indexPrefix,omitempty"` - Version string `json:"version" yaml:"version"` + Host string `json:"host" yaml:"host"` + BasicAuth bool `json:"basicAuth" yaml:"basicAuth"` + Username string `json:"username" yaml:"username"` + Password string `json:"password" yaml:"password"` + IndexPrefix string `json:"indexPrefix,omitempty" yaml:"indexPrefix,omitempty"` + Version string `json:"version" yaml:"version"` + ExportLogsLimit int `json:"exportLogsLimit" yaml:"exportLogsLimit"` } func NewLoggingOptions() *Options { return &Options{ - Host: "", - IndexPrefix: "fluentbit", - Version: "", + Host: "", + IndexPrefix: "fluentbit", + Version: "", + ExportLogsLimit: exportLogsLimitDefault, } } @@ -76,4 +82,7 @@ func (s *Options) AddFlags(fs *pflag.FlagSet, c *Options) { fs.StringVar(&s.Version, "logging-elasticsearch-version", c.Version, ""+ "Elasticsearch major version, e.g. 5/6/7, if left blank, will detect automatically."+ "Currently, minimum supported version is 5.x") + + fs.IntVar(&s.ExportLogsLimit, "logging-export-logs-limit", c.ExportLogsLimit, ""+ + "Maximum lines of logs to export") }