Support configuring the maximum number of logs that can be exported (#5794)
* Support configuring the maximum number of logs that can be exported * Update pkg/simple/client/logging/options.go --------- Signed-off-by: wanjunlei <wanjunlei@kubesphere.io> Co-authored-by: wanjunlei <53003665+wanjunlei@users.noreply.github.com> Co-authored-by: Benjamin Huo <huobj@qq.com>
This commit is contained in:
@@ -26,6 +26,7 @@ import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
"github.com/opensearch-project/opensearch-go"
|
||||
"github.com/opensearch-project/opensearch-go/opensearchapi"
|
||||
|
||||
@@ -86,9 +87,16 @@ func (o *OpenSearch) Search(indices string, body []byte, scroll bool) ([]byte, e
|
||||
}
|
||||
|
||||
func (o *OpenSearch) Scroll(id string) ([]byte, error) {
|
||||
body, err := jsoniter.Marshal(map[string]string{
|
||||
"scroll_id": id,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
response, err := o.client.Scroll(
|
||||
o.client.Scroll.WithContext(context.Background()),
|
||||
o.client.Scroll.WithScrollID(id),
|
||||
o.client.Scroll.WithBody(bytes.NewBuffer(body)),
|
||||
o.client.Scroll.WithScroll(time.Minute))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -26,6 +26,7 @@ import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
"github.com/opensearch-project/opensearch-go/v2"
|
||||
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
|
||||
|
||||
@@ -86,9 +87,16 @@ func (o *OpenSearch) Search(indices string, body []byte, scroll bool) ([]byte, e
|
||||
}
|
||||
|
||||
func (o *OpenSearch) Scroll(id string) ([]byte, error) {
|
||||
body, err := jsoniter.Marshal(map[string]string{
|
||||
"scroll_id": id,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
response, err := o.client.Scroll(
|
||||
o.client.Scroll.WithContext(context.Background()),
|
||||
o.client.Scroll.WithScrollID(id),
|
||||
o.client.Scroll.WithBody(bytes.NewBuffer(body)),
|
||||
o.client.Scroll.WithScroll(time.Minute))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -28,6 +28,7 @@ import (
|
||||
|
||||
"github.com/elastic/go-elasticsearch/v5"
|
||||
"github.com/elastic/go-elasticsearch/v5/esapi"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/es/versions"
|
||||
)
|
||||
@@ -85,9 +86,16 @@ func (e *Elastic) Search(indices string, body []byte, scroll bool) ([]byte, erro
|
||||
}
|
||||
|
||||
func (e *Elastic) Scroll(id string) ([]byte, error) {
|
||||
body, err := jsoniter.Marshal(map[string]string{
|
||||
"scroll_id": id,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
response, err := e.client.Scroll(
|
||||
e.client.Scroll.WithContext(context.Background()),
|
||||
e.client.Scroll.WithScrollID(id),
|
||||
e.client.Scroll.WithBody(bytes.NewBuffer(body)),
|
||||
e.client.Scroll.WithScroll(time.Minute))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -28,6 +28,7 @@ import (
|
||||
|
||||
"github.com/elastic/go-elasticsearch/v6"
|
||||
"github.com/elastic/go-elasticsearch/v6/esapi"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/es/versions"
|
||||
)
|
||||
@@ -85,9 +86,16 @@ func (e *Elastic) Search(indices string, body []byte, scroll bool) ([]byte, erro
|
||||
}
|
||||
|
||||
func (e *Elastic) Scroll(id string) ([]byte, error) {
|
||||
body, err := jsoniter.Marshal(map[string]string{
|
||||
"scroll_id": id,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
response, err := e.Client.Scroll(
|
||||
e.Client.Scroll.WithContext(context.Background()),
|
||||
e.Client.Scroll.WithScrollID(id),
|
||||
e.Client.Scroll.WithBody(bytes.NewBuffer(body)),
|
||||
e.Client.Scroll.WithScroll(time.Minute))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -28,6 +28,7 @@ import (
|
||||
|
||||
"github.com/elastic/go-elasticsearch/v7"
|
||||
"github.com/elastic/go-elasticsearch/v7/esapi"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/es/versions"
|
||||
)
|
||||
@@ -86,9 +87,16 @@ func (e *Elastic) Search(indices string, body []byte, scroll bool) ([]byte, erro
|
||||
}
|
||||
|
||||
func (e *Elastic) Scroll(id string) ([]byte, error) {
|
||||
body, err := jsoniter.Marshal(map[string]string{
|
||||
"scroll_id": id,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
response, err := e.client.Scroll(
|
||||
e.client.Scroll.WithContext(context.Background()),
|
||||
e.client.Scroll.WithScrollID(id),
|
||||
e.client.Scroll.WithBody(bytes.NewBuffer(body)),
|
||||
e.client.Scroll.WithScroll(time.Minute))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -51,12 +51,15 @@ type Kubernetes struct {
|
||||
|
||||
// Elasticsearch implement logging interface
|
||||
type client struct {
|
||||
c *es.Client
|
||||
c *es.Client
|
||||
ExportLogsLimit int
|
||||
}
|
||||
|
||||
func NewClient(options *logging.Options) (logging.Client, error) {
|
||||
|
||||
c := &client{}
|
||||
c := &client{
|
||||
ExportLogsLimit: options.ExportLogsLimit,
|
||||
}
|
||||
|
||||
var err error
|
||||
c.c, err = es.NewClient(options.Host, options.BasicAuth, options.Username, options.Password, options.IndexPrefix, options.Version)
|
||||
@@ -162,14 +165,8 @@ func (c *client) ExportLogs(sf logging.SearchFilter, w io.Writer) error {
|
||||
data = append(data, c.getSource(hit.Source).Log)
|
||||
}
|
||||
|
||||
// limit to retrieve max 100k records
|
||||
for i := 0; i < 100; i++ {
|
||||
if i != 0 {
|
||||
data, id, err = c.scroll(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
size := 0
|
||||
for {
|
||||
if len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
@@ -182,8 +179,17 @@ func (c *client) ExportLogs(sf logging.SearchFilter, w io.Writer) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
size = size + 1000
|
||||
if size >= c.ExportLogsLimit {
|
||||
return nil
|
||||
}
|
||||
|
||||
data, id, err = c.scroll(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *client) scroll(id string) ([]string, string, error) {
|
||||
|
||||
@@ -22,20 +22,26 @@ import (
|
||||
"kubesphere.io/kubesphere/pkg/utils/reflectutils"
|
||||
)
|
||||
|
||||
const (
|
||||
exportLogsLimitDefault = 100000
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
Host string `json:"host" yaml:"host"`
|
||||
BasicAuth bool `json:"basicAuth" yaml:"basicAuth"`
|
||||
Username string `json:"username" yaml:"username"`
|
||||
Password string `json:"password" yaml:"password"`
|
||||
IndexPrefix string `json:"indexPrefix,omitempty" yaml:"indexPrefix,omitempty"`
|
||||
Version string `json:"version" yaml:"version"`
|
||||
Host string `json:"host" yaml:"host"`
|
||||
BasicAuth bool `json:"basicAuth" yaml:"basicAuth"`
|
||||
Username string `json:"username" yaml:"username"`
|
||||
Password string `json:"password" yaml:"password"`
|
||||
IndexPrefix string `json:"indexPrefix,omitempty" yaml:"indexPrefix,omitempty"`
|
||||
Version string `json:"version" yaml:"version"`
|
||||
ExportLogsLimit int `json:"exportLogsLimit" yaml:"exportLogsLimit"`
|
||||
}
|
||||
|
||||
func NewLoggingOptions() *Options {
|
||||
return &Options{
|
||||
Host: "",
|
||||
IndexPrefix: "fluentbit",
|
||||
Version: "",
|
||||
Host: "",
|
||||
IndexPrefix: "fluentbit",
|
||||
Version: "",
|
||||
ExportLogsLimit: exportLogsLimitDefault,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -76,4 +82,7 @@ func (s *Options) AddFlags(fs *pflag.FlagSet, c *Options) {
|
||||
fs.StringVar(&s.Version, "logging-elasticsearch-version", c.Version, ""+
|
||||
"Elasticsearch major version, e.g. 5/6/7, if left blank, will detect automatically."+
|
||||
"Currently, minimum supported version is 5.x")
|
||||
|
||||
fs.IntVar(&s.ExportLogsLimit, "logging-export-logs-limit", c.ExportLogsLimit, ""+
|
||||
"Maximum lines of logs to export")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user