Improve the s3 uploader for better performance.
Signed-off-by: dkkb <dabkb@aol.com>
This commit is contained in:
@@ -104,7 +104,7 @@ func (s *s2iBinaryUploader) UploadS2iBinary(namespace, name, md5 string, fileHea
|
||||
copy.Spec.FileName = fileHeader.Filename
|
||||
copy.Spec.DownloadURL = fmt.Sprintf(GetS2iBinaryURL, namespace, name, copy.Spec.FileName)
|
||||
|
||||
err = s.s3Client.Upload(fmt.Sprintf("%s-%s", namespace, name), copy.Spec.FileName, binFile)
|
||||
err = s.s3Client.Upload(fmt.Sprintf("%s-%s", namespace, name), copy.Spec.FileName, binFile, int(fileHeader.Size))
|
||||
if err != nil {
|
||||
if aerr, ok := err.(awserr.Error); ok {
|
||||
switch aerr.Code() {
|
||||
|
||||
@@ -116,7 +116,7 @@ func (c *applicationOperator) createApp(app *v1alpha1.HelmApplication, iconData
|
||||
if len(iconData) != 0 {
|
||||
// save icon attachment
|
||||
iconId := idutils.GetUuid(v1alpha1.HelmAttachmentPrefix)
|
||||
err = c.backingStoreClient.Upload(iconId, iconId, bytes.NewBuffer(iconData))
|
||||
err = c.backingStoreClient.Upload(iconId, iconId, bytes.NewBuffer(iconData), len(iconData))
|
||||
if err != nil {
|
||||
klog.Errorf("save icon attachment failed, error: %s", err)
|
||||
return nil, err
|
||||
@@ -499,7 +499,7 @@ func (c *applicationOperator) modifyAppAttachment(app *v1alpha1.HelmApplication,
|
||||
// add attachment to app
|
||||
add := idutils.GetUuid("att-")
|
||||
*attachments = append(*attachments, add)
|
||||
err = c.backingStoreClient.Upload(add, add, bytes.NewBuffer(request.AttachmentContent))
|
||||
err = c.backingStoreClient.Upload(add, add, bytes.NewBuffer(request.AttachmentContent), len(request.AttachmentContent))
|
||||
if err != nil {
|
||||
return "", err
|
||||
} else {
|
||||
@@ -518,7 +518,7 @@ func (c *applicationOperator) modifyAppAttachment(app *v1alpha1.HelmApplication,
|
||||
}
|
||||
if len(request.AttachmentContent) != 0 {
|
||||
add := idutils.GetUuid("att-")
|
||||
err = c.backingStoreClient.Upload(add, add, bytes.NewBuffer(request.AttachmentContent))
|
||||
err = c.backingStoreClient.Upload(add, add, bytes.NewBuffer(request.AttachmentContent), len(request.AttachmentContent))
|
||||
if err != nil {
|
||||
return "", err
|
||||
} else {
|
||||
|
||||
@@ -200,7 +200,7 @@ func (c *applicationOperator) ModifyAppVersion(id string, request *ModifyAppVers
|
||||
spec.Created = &now
|
||||
|
||||
// 3. save chart data to s3 storage, just overwrite the legacy data
|
||||
err = c.backingStoreClient.Upload(dataKeyInStorage(versionCopy.GetWorkspace(), versionCopy.Name), versionCopy.Name, bytes.NewReader(request.Package))
|
||||
err = c.backingStoreClient.Upload(dataKeyInStorage(versionCopy.GetWorkspace(), versionCopy.Name), versionCopy.Name, bytes.NewBuffer(request.Package), len(request.Package))
|
||||
if err != nil {
|
||||
klog.Errorf("upload chart for app version: %s/%s failed, error: %s", versionCopy.GetWorkspace(),
|
||||
versionCopy.GetTrueName(), err)
|
||||
@@ -488,7 +488,7 @@ func (c *applicationOperator) createApplicationVersion(ver *v1alpha1.HelmApplica
|
||||
klog.Errorf("decode error: %s", err)
|
||||
return nil, err
|
||||
} else {
|
||||
err = c.backingStoreClient.Upload(dataKeyInStorage(ver.GetWorkspace(), ver.Name), ver.Name, bytes.NewReader(ver.Spec.Data))
|
||||
err = c.backingStoreClient.Upload(dataKeyInStorage(ver.GetWorkspace(), ver.Name), ver.Name, bytes.NewBuffer(ver.Spec.Data), len(ver.Spec.Data))
|
||||
if err != nil {
|
||||
klog.Errorf("upload chart for app version: %s/%s failed, error: %s", ver.GetWorkspace(),
|
||||
ver.GetTrueName(), err)
|
||||
|
||||
@@ -15,7 +15,6 @@ package openpitrix
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
"github.com/go-openapi/strfmt"
|
||||
"k8s.io/klog"
|
||||
|
||||
@@ -66,7 +65,7 @@ func (c *attachmentOperator) CreateAttachment(data []byte) (*Attachment, error)
|
||||
}
|
||||
id := idutils.GetUuid36(v1alpha1.HelmAttachmentPrefix)
|
||||
|
||||
err := c.backingStoreClient.Upload(id, id, bytes.NewBuffer(data))
|
||||
err := c.backingStoreClient.Upload(id, id, bytes.NewBuffer(data), len(data))
|
||||
if err != nil {
|
||||
klog.Errorf("upload attachment failed, err: %s", err)
|
||||
return nil, err
|
||||
|
||||
@@ -43,7 +43,7 @@ type Object struct {
|
||||
Body io.Reader
|
||||
}
|
||||
|
||||
func (s *FakeS3) Upload(key, fileName string, body io.Reader) error {
|
||||
func (s *FakeS3) Upload(key, fileName string, body io.Reader, size int) error {
|
||||
s.Storage[key] = &Object{
|
||||
Key: key,
|
||||
FileName: fileName,
|
||||
|
||||
@@ -25,7 +25,7 @@ func TestFakeS3(t *testing.T) {
|
||||
s3 := NewFakeS3()
|
||||
key := "hello"
|
||||
fileName := "world"
|
||||
err := s3.Upload(key, fileName, nil)
|
||||
err := s3.Upload(key, fileName, nil, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -16,16 +16,14 @@ limitations under the License.
|
||||
|
||||
package s3
|
||||
|
||||
import (
|
||||
"io"
|
||||
)
|
||||
import "io"
|
||||
|
||||
type Interface interface {
|
||||
//read the content, caller should close the io.ReadCloser.
|
||||
Read(key string) ([]byte, error)
|
||||
|
||||
// Upload uploads a object to storage and returns object location if succeeded
|
||||
Upload(key, fileName string, body io.Reader) error
|
||||
Upload(key, fileName string, body io.Reader, size int) error
|
||||
|
||||
GetDownloadURL(key string, fileName string) (string, error)
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ package s3
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"code.cloudfoundry.org/bytefmt"
|
||||
@@ -36,10 +37,40 @@ type Client struct {
|
||||
bucket string
|
||||
}
|
||||
|
||||
func (s *Client) Upload(key, fileName string, body io.Reader) error {
|
||||
const (
|
||||
DefaultPartSize = 5 * bytefmt.MEGABYTE
|
||||
// MinConcurrency is the minimum concurrency when uploading a part to Amazon S3,
|
||||
// it's also the default value of Concurrency in aws-sdk-go.
|
||||
MinConcurrency = 5
|
||||
// MaxConcurrency is the maximum concurrency to limit the goroutines.
|
||||
MaxConcurrency = 128
|
||||
)
|
||||
|
||||
// calculateConcurrency calculates the concurrency for better performance,
|
||||
// make the concurrency in range [5, 128].
|
||||
func calculateConcurrency(size int) int {
|
||||
if size <= 0 {
|
||||
return MinConcurrency
|
||||
}
|
||||
c := int(math.Ceil(float64(size) / float64(DefaultPartSize)))
|
||||
if c < MinConcurrency {
|
||||
return MinConcurrency
|
||||
} else if c > MaxConcurrency {
|
||||
return MaxConcurrency
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
// Upload use Multipart upload to upload a single object as a set of parts.
|
||||
// If the data length is known to be large, it is recommended to pass in the data length,
|
||||
// it will helps to calculate concurrency. Otherwise, `size` can be 0,
|
||||
// use 5 as default upload concurrency, same as aws-sdk-go.
|
||||
// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html for more details.
|
||||
func (s *Client) Upload(key, fileName string, body io.Reader, size int) error {
|
||||
uploader := s3manager.NewUploader(s.s3Session, func(uploader *s3manager.Uploader) {
|
||||
uploader.PartSize = 5 * bytefmt.MEGABYTE
|
||||
uploader.PartSize = DefaultPartSize
|
||||
uploader.LeavePartsOnError = true
|
||||
uploader.Concurrency = calculateConcurrency(size)
|
||||
})
|
||||
_, err := uploader.Upload(&s3manager.UploadInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
|
||||
14
pkg/simple/client/s3/s3_test.go
Normal file
14
pkg/simple/client/s3/s3_test.go
Normal file
@@ -0,0 +1,14 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"gotest.tools/assert"
|
||||
)
|
||||
|
||||
func TestCalculateConcurrency(t *testing.T) {
|
||||
assert.Equal(t, 5, calculateConcurrency(1*1024*1024))
|
||||
assert.Equal(t, 5, calculateConcurrency(5*1024*1024))
|
||||
assert.Equal(t, 20, calculateConcurrency(99*1024*1024))
|
||||
assert.Equal(t, 128, calculateConcurrency(129*5*1024*1024))
|
||||
}
|
||||
Reference in New Issue
Block a user