diff --git a/pkg/models/devops/s2ibinary_handler.go b/pkg/models/devops/s2ibinary_handler.go index be60b7ddf..22c99d46d 100644 --- a/pkg/models/devops/s2ibinary_handler.go +++ b/pkg/models/devops/s2ibinary_handler.go @@ -104,7 +104,7 @@ func (s *s2iBinaryUploader) UploadS2iBinary(namespace, name, md5 string, fileHea copy.Spec.FileName = fileHeader.Filename copy.Spec.DownloadURL = fmt.Sprintf(GetS2iBinaryURL, namespace, name, copy.Spec.FileName) - err = s.s3Client.Upload(fmt.Sprintf("%s-%s", namespace, name), copy.Spec.FileName, binFile) + err = s.s3Client.Upload(fmt.Sprintf("%s-%s", namespace, name), copy.Spec.FileName, binFile, int(fileHeader.Size)) if err != nil { if aerr, ok := err.(awserr.Error); ok { switch aerr.Code() { diff --git a/pkg/models/openpitrix/applications.go b/pkg/models/openpitrix/applications.go index 3191f6990..269d948e4 100644 --- a/pkg/models/openpitrix/applications.go +++ b/pkg/models/openpitrix/applications.go @@ -116,7 +116,7 @@ func (c *applicationOperator) createApp(app *v1alpha1.HelmApplication, iconData if len(iconData) != 0 { // save icon attachment iconId := idutils.GetUuid(v1alpha1.HelmAttachmentPrefix) - err = c.backingStoreClient.Upload(iconId, iconId, bytes.NewBuffer(iconData)) + err = c.backingStoreClient.Upload(iconId, iconId, bytes.NewBuffer(iconData), len(iconData)) if err != nil { klog.Errorf("save icon attachment failed, error: %s", err) return nil, err @@ -499,7 +499,7 @@ func (c *applicationOperator) modifyAppAttachment(app *v1alpha1.HelmApplication, // add attachment to app add := idutils.GetUuid("att-") *attachments = append(*attachments, add) - err = c.backingStoreClient.Upload(add, add, bytes.NewBuffer(request.AttachmentContent)) + err = c.backingStoreClient.Upload(add, add, bytes.NewBuffer(request.AttachmentContent), len(request.AttachmentContent)) if err != nil { return "", err } else { @@ -518,7 +518,7 @@ func (c *applicationOperator) modifyAppAttachment(app *v1alpha1.HelmApplication, } if len(request.AttachmentContent) != 0 { add := idutils.GetUuid("att-") - err = c.backingStoreClient.Upload(add, add, bytes.NewBuffer(request.AttachmentContent)) + err = c.backingStoreClient.Upload(add, add, bytes.NewBuffer(request.AttachmentContent), len(request.AttachmentContent)) if err != nil { return "", err } else { diff --git a/pkg/models/openpitrix/applicationversions.go b/pkg/models/openpitrix/applicationversions.go index 39ed52bf2..69339fdef 100644 --- a/pkg/models/openpitrix/applicationversions.go +++ b/pkg/models/openpitrix/applicationversions.go @@ -200,7 +200,7 @@ func (c *applicationOperator) ModifyAppVersion(id string, request *ModifyAppVers spec.Created = &now // 3. save chart data to s3 storage, just overwrite the legacy data - err = c.backingStoreClient.Upload(dataKeyInStorage(versionCopy.GetWorkspace(), versionCopy.Name), versionCopy.Name, bytes.NewReader(request.Package)) + err = c.backingStoreClient.Upload(dataKeyInStorage(versionCopy.GetWorkspace(), versionCopy.Name), versionCopy.Name, bytes.NewBuffer(request.Package), len(request.Package)) if err != nil { klog.Errorf("upload chart for app version: %s/%s failed, error: %s", versionCopy.GetWorkspace(), versionCopy.GetTrueName(), err) @@ -488,7 +488,7 @@ func (c *applicationOperator) createApplicationVersion(ver *v1alpha1.HelmApplica klog.Errorf("decode error: %s", err) return nil, err } else { - err = c.backingStoreClient.Upload(dataKeyInStorage(ver.GetWorkspace(), ver.Name), ver.Name, bytes.NewReader(ver.Spec.Data)) + err = c.backingStoreClient.Upload(dataKeyInStorage(ver.GetWorkspace(), ver.Name), ver.Name, bytes.NewBuffer(ver.Spec.Data), len(ver.Spec.Data)) if err != nil { klog.Errorf("upload chart for app version: %s/%s failed, error: %s", ver.GetWorkspace(), ver.GetTrueName(), err) diff --git a/pkg/models/openpitrix/attachments.go b/pkg/models/openpitrix/attachments.go index 39627c32a..c9a0ef55d 100644 --- a/pkg/models/openpitrix/attachments.go +++ b/pkg/models/openpitrix/attachments.go @@ -15,7 +15,6 @@ package openpitrix import ( "bytes" - "github.com/go-openapi/strfmt" "k8s.io/klog" @@ -66,7 +65,7 @@ func (c *attachmentOperator) CreateAttachment(data []byte) (*Attachment, error) } id := idutils.GetUuid36(v1alpha1.HelmAttachmentPrefix) - err := c.backingStoreClient.Upload(id, id, bytes.NewBuffer(data)) + err := c.backingStoreClient.Upload(id, id, bytes.NewBuffer(data), len(data)) if err != nil { klog.Errorf("upload attachment failed, err: %s", err) return nil, err diff --git a/pkg/simple/client/s3/fake/fakes3.go b/pkg/simple/client/s3/fake/fakes3.go index 84f0b088b..36cd9583a 100644 --- a/pkg/simple/client/s3/fake/fakes3.go +++ b/pkg/simple/client/s3/fake/fakes3.go @@ -43,7 +43,7 @@ type Object struct { Body io.Reader } -func (s *FakeS3) Upload(key, fileName string, body io.Reader) error { +func (s *FakeS3) Upload(key, fileName string, body io.Reader, size int) error { s.Storage[key] = &Object{ Key: key, FileName: fileName, diff --git a/pkg/simple/client/s3/fake/fakes3_test.go b/pkg/simple/client/s3/fake/fakes3_test.go index 4fcc540ff..948f410af 100644 --- a/pkg/simple/client/s3/fake/fakes3_test.go +++ b/pkg/simple/client/s3/fake/fakes3_test.go @@ -25,7 +25,7 @@ func TestFakeS3(t *testing.T) { s3 := NewFakeS3() key := "hello" fileName := "world" - err := s3.Upload(key, fileName, nil) + err := s3.Upload(key, fileName, nil, 0) if err != nil { t.Fatal(err) } diff --git a/pkg/simple/client/s3/interface.go b/pkg/simple/client/s3/interface.go index 681c9dc54..54a6dffdf 100644 --- a/pkg/simple/client/s3/interface.go +++ b/pkg/simple/client/s3/interface.go @@ -16,16 +16,14 @@ limitations under the License. package s3 -import ( - "io" -) +import "io" type Interface interface { //read the content, caller should close the io.ReadCloser. Read(key string) ([]byte, error) // Upload uploads a object to storage and returns object location if succeeded - Upload(key, fileName string, body io.Reader) error + Upload(key, fileName string, body io.Reader, size int) error GetDownloadURL(key string, fileName string) (string, error) diff --git a/pkg/simple/client/s3/s3.go b/pkg/simple/client/s3/s3.go index 0d418acf8..75a9caa96 100644 --- a/pkg/simple/client/s3/s3.go +++ b/pkg/simple/client/s3/s3.go @@ -19,6 +19,7 @@ package s3 import ( "fmt" "io" + "math" "time" "code.cloudfoundry.org/bytefmt" @@ -36,10 +37,40 @@ type Client struct { bucket string } -func (s *Client) Upload(key, fileName string, body io.Reader) error { +const ( + DefaultPartSize = 5 * bytefmt.MEGABYTE + // MinConcurrency is the minimum concurrency when uploading a part to Amazon S3, + // it's also the default value of Concurrency in aws-sdk-go. + MinConcurrency = 5 + // MaxConcurrency is the maximum concurrency to limit the goroutines. + MaxConcurrency = 128 +) + +// calculateConcurrency calculates the concurrency for better performance, +// make the concurrency in range [5, 128]. +func calculateConcurrency(size int) int { + if size <= 0 { + return MinConcurrency + } + c := int(math.Ceil(float64(size) / float64(DefaultPartSize))) + if c < MinConcurrency { + return MinConcurrency + } else if c > MaxConcurrency { + return MaxConcurrency + } + return c +} + +// Upload use Multipart upload to upload a single object as a set of parts. +// If the data length is known to be large, it is recommended to pass in the data length, +// it will helps to calculate concurrency. Otherwise, `size` can be 0, +// use 5 as default upload concurrency, same as aws-sdk-go. +// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html for more details. +func (s *Client) Upload(key, fileName string, body io.Reader, size int) error { uploader := s3manager.NewUploader(s.s3Session, func(uploader *s3manager.Uploader) { - uploader.PartSize = 5 * bytefmt.MEGABYTE + uploader.PartSize = DefaultPartSize uploader.LeavePartsOnError = true + uploader.Concurrency = calculateConcurrency(size) }) _, err := uploader.Upload(&s3manager.UploadInput{ Bucket: aws.String(s.bucket), diff --git a/pkg/simple/client/s3/s3_test.go b/pkg/simple/client/s3/s3_test.go new file mode 100644 index 000000000..fb1385d9d --- /dev/null +++ b/pkg/simple/client/s3/s3_test.go @@ -0,0 +1,14 @@ +package s3 + +import ( + "testing" + + "gotest.tools/assert" +) + +func TestCalculateConcurrency(t *testing.T) { + assert.Equal(t, 5, calculateConcurrency(1*1024*1024)) + assert.Equal(t, 5, calculateConcurrency(5*1024*1024)) + assert.Equal(t, 20, calculateConcurrency(99*1024*1024)) + assert.Equal(t, 128, calculateConcurrency(129*5*1024*1024)) +}