Files
kubesphere/pkg/models/devops/s2ibinary_handler.go
2021-03-30 13:44:24 +08:00

206 lines
6.1 KiB
Go

/*
Copyright 2020 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package devops
import (
"context"
"fmt"
"mime/multipart"
"net/http"
"reflect"
"code.cloudfoundry.org/bytefmt"
"github.com/aws/aws-sdk-go/aws/awserr"
awsS3 "github.com/aws/aws-sdk-go/service/s3"
"github.com/emicklei/go-restful"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/apis/devops/v1alpha1"
"kubesphere.io/kubesphere/pkg/client/clientset/versioned"
"kubesphere.io/kubesphere/pkg/client/informers/externalversions"
"kubesphere.io/kubesphere/pkg/simple/client/s3"
)
const (
GetS2iBinaryURL = "http://ks-apiserver.kubesphere-system.svc/kapis/devops.kubesphere.io/v1alpha2/namespaces/%s/s2ibinaries/%s/file/%s"
)
type S2iBinaryUploader interface {
UploadS2iBinary(namespace, name, md5 string, header *multipart.FileHeader) (*v1alpha1.S2iBinary, error)
DownloadS2iBinary(namespace, name, fileName string) (string, error)
}
type s2iBinaryUploader struct {
client versioned.Interface
informers externalversions.SharedInformerFactory
s3Client s3.Interface
}
func NewS2iBinaryUploader(client versioned.Interface, informers externalversions.SharedInformerFactory, s3Client s3.Interface) S2iBinaryUploader {
return &s2iBinaryUploader{
client: client,
informers: informers,
s3Client: s3Client,
}
}
func (s *s2iBinaryUploader) UploadS2iBinary(namespace, name, md5 string, fileHeader *multipart.FileHeader) (*v1alpha1.S2iBinary, error) {
binFile, err := fileHeader.Open()
if err != nil {
klog.Errorf("%+v", err)
return nil, err
}
defer binFile.Close()
origin, err := s.informers.Devops().V1alpha1().S2iBinaries().Lister().S2iBinaries(namespace).Get(name)
if err != nil {
klog.Errorf("%+v", err)
return nil, err
}
//Check file is uploading
if origin.Status.Phase == v1alpha1.StatusUploading {
err := restful.NewError(http.StatusConflict, "file is uploading, please try later")
klog.Error(err)
return nil, err
}
copy := origin.DeepCopy()
copy.Spec.MD5 = md5
copy.Spec.Size = bytefmt.ByteSize(uint64(fileHeader.Size))
copy.Spec.FileName = fileHeader.Filename
copy.Spec.DownloadURL = fmt.Sprintf(GetS2iBinaryURL, namespace, name, copy.Spec.FileName)
if origin.Status.Phase == v1alpha1.StatusReady && reflect.DeepEqual(origin, copy) {
return origin, nil
}
//Set status Uploading to lock resource
uploading, err := s.SetS2iBinaryStatus(copy, v1alpha1.StatusUploading)
if err != nil {
err := restful.NewError(http.StatusConflict, fmt.Sprintf("could not set status: %+v", err))
klog.Error(err)
return nil, err
}
copy = uploading.DeepCopy()
copy.Spec.MD5 = md5
copy.Spec.Size = bytefmt.ByteSize(uint64(fileHeader.Size))
copy.Spec.FileName = fileHeader.Filename
copy.Spec.DownloadURL = fmt.Sprintf(GetS2iBinaryURL, namespace, name, copy.Spec.FileName)
err = s.s3Client.Upload(fmt.Sprintf("%s-%s", namespace, name), copy.Spec.FileName, binFile)
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case awsS3.ErrCodeNoSuchBucket:
klog.Error(err)
_, serr := s.SetS2iBinaryStatusWithRetry(copy, origin.Status.Phase)
if serr != nil {
klog.Error(serr)
}
return nil, err
default:
klog.Error(err)
_, serr := s.SetS2iBinaryStatusWithRetry(copy, v1alpha1.StatusUploadFailed)
if serr != nil {
klog.Error(serr)
}
return nil, err
}
}
klog.Error(err)
return nil, err
}
if copy.Spec.UploadTimeStamp == nil {
copy.Spec.UploadTimeStamp = new(metav1.Time)
}
*copy.Spec.UploadTimeStamp = metav1.Now()
copy, err = s.client.DevopsV1alpha1().S2iBinaries(namespace).Update(context.Background(), copy, metav1.UpdateOptions{})
if err != nil {
klog.Error(err)
return nil, err
}
copy, err = s.SetS2iBinaryStatusWithRetry(copy, v1alpha1.StatusReady)
if err != nil {
klog.Error(err)
return nil, err
}
return copy, nil
}
func (s *s2iBinaryUploader) DownloadS2iBinary(namespace, name, fileName string) (string, error) {
origin, err := s.informers.Devops().V1alpha1().S2iBinaries().Lister().S2iBinaries(namespace).Get(name)
if err != nil {
klog.Errorf("%+v", err)
return "", err
}
if origin.Spec.FileName != fileName {
err := fmt.Errorf("could not fould file %s", fileName)
klog.Error(err)
return "", err
}
if origin.Status.Phase != v1alpha1.StatusReady {
err := restful.NewError(http.StatusBadRequest, "file is not ready, please try later")
klog.Error(err)
return "", err
}
return s.s3Client.GetDownloadURL(fmt.Sprintf("%s-%s", namespace, name), fileName)
}
func (s *s2iBinaryUploader) SetS2iBinaryStatus(s2ibin *v1alpha1.S2iBinary, status string) (*v1alpha1.S2iBinary, error) {
copy := s2ibin.DeepCopy()
copy.Status.Phase = status
copy, err := s.client.DevopsV1alpha1().S2iBinaries(s2ibin.Namespace).Update(context.Background(), copy, metav1.UpdateOptions{})
if err != nil {
klog.Error(err)
return nil, err
}
return copy, nil
}
func (s *s2iBinaryUploader) SetS2iBinaryStatusWithRetry(s2ibin *v1alpha1.S2iBinary, status string) (*v1alpha1.S2iBinary, error) {
var bin *v1alpha1.S2iBinary
var err error
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
bin, err = s.client.DevopsV1alpha1().S2iBinaries(s2ibin.Namespace).Get(context.Background(), s2ibin.Name, metav1.GetOptions{})
if err != nil {
klog.Error(err)
return err
}
bin.Status.Phase = status
bin, err = s.client.DevopsV1alpha1().S2iBinaries(s2ibin.Namespace).Update(context.Background(), bin, metav1.UpdateOptions{})
if err != nil {
klog.Error(err)
return err
}
return nil
})
if err != nil {
klog.Error(err)
return nil, err
}
return bin, nil
}