reset to master

Signed-off-by: runzexia <runzexia@yunify.com>
This commit is contained in:
runzexia
2019-08-27 14:54:01 +08:00
parent c6440ec6e5
commit f00917b025
506 changed files with 79327 additions and 52528 deletions

View File

@@ -0,0 +1,30 @@
/*
Copyright 2019 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package install
import (
k8sruntime "k8s.io/apimachinery/pkg/runtime"
urlruntime "k8s.io/apimachinery/pkg/util/runtime"
devopsv1alpha1 "kubesphere.io/kubesphere/pkg/apis/devops/v1alpha1"
)
func Install(scheme *k8sruntime.Scheme) {
urlruntime.Must(devopsv1alpha1.AddToScheme(scheme))
urlruntime.Must(scheme.SetVersionPriority(devopsv1alpha1.SchemeGroupVersion))
}

View File

@@ -20,6 +20,23 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const (
ResourceKindS2iBinary = "S2iBinary"
ResourceSingularServicePolicy = "s2ibinary"
ResourcePluralServicePolicy = "s2ibinaries"
)
const (
StatusUploading = "Uploading"
StatusReady = "Ready"
StatusUnableToDownload = "UnableToDownload"
)
const (
S2iBinaryFinalizerName = "s2ibinary.finalizers.kubesphere.io"
S2iBinaryLabelKey = "s2ibinary-name.kubesphere.io"
)
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.

View File

@@ -22,6 +22,7 @@ import (
"github.com/emicklei/go-restful"
"github.com/emicklei/go-restful-openapi"
"k8s.io/apimachinery/pkg/runtime/schema"
v1alpha1devops "kubesphere.io/kubesphere/pkg/apis/devops/v1alpha1"
devopsapi "kubesphere.io/kubesphere/pkg/apiserver/devops"
"kubesphere.io/kubesphere/pkg/apiserver/runtime"
"kubesphere.io/kubesphere/pkg/constants"
@@ -593,6 +594,25 @@ The last one is encrypted info, such as the password of the username-password ty
Returns(http.StatusOK, RespOK, devops.Crumb{}).
Writes(devops.Crumb{}))
webservice.Route(webservice.PUT("/namespaces/{namespace}/s2ibinaries/{s2ibinary}/file").
To(devopsapi.UploadS2iBinary).
Consumes("multipart/form-data").
Produces(restful.MIME_JSON).
Doc("Upload S2iBinary file").
Param(webservice.PathParameter("namespace", "the name of namespaces")).
Param(webservice.PathParameter("s2ibinary", "the name of s2ibinary")).
Param(webservice.FormParameter("s2ibinary", "file to upload")).
Param(webservice.FormParameter("md5", "md5 of file")).
Returns(http.StatusOK, RespOK, v1alpha1devops.S2iBinary{}))
webservice.Route(webservice.GET("/namespaces/{namespace}/s2ibinaries/{s2ibinary}/file/{file}").
To(devopsapi.DownloadS2iBinary).
Produces(restful.MIME_OCTET).
Doc("Download S2iBinary file").
Param(webservice.PathParameter("namespace", "the name of namespaces")).
Param(webservice.PathParameter("s2ibinary", "the name of s2ibinary")).
Returns(http.StatusOK, RespOK, nil))
// TODO are not used in this version. will be added in 2.1.0
//// match /job/init-job/descriptorByName/org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition/checkScriptCompile
//webservice.Route(webservice.POST("/devops/check/scriptcompile").

View File

@@ -0,0 +1,87 @@
package devops
import (
"code.cloudfoundry.org/bytefmt"
"fmt"
"github.com/emicklei/go-restful"
"github.com/golang/glog"
"kubesphere.io/kubesphere/pkg/errors"
"kubesphere.io/kubesphere/pkg/models/devops"
"kubesphere.io/kubesphere/pkg/utils/hashutil"
"net/http"
)
func UploadS2iBinary(req *restful.Request, resp *restful.Response) {
ns := req.PathParameter("namespace")
name := req.PathParameter("s2ibinary")
err := req.Request.ParseMultipartForm(bytefmt.MEGABYTE * 20)
if err != nil {
glog.Errorf("%+v", err)
errors.ParseSvcErr(restful.NewError(http.StatusBadRequest, err.Error()), resp)
return
}
if len(req.Request.MultipartForm.File) == 0 {
err := restful.NewError(http.StatusBadRequest, "could not get file from form")
glog.Errorf("%+v", err)
errors.ParseSvcErr(restful.NewError(http.StatusBadRequest, err.Error()), resp)
return
}
if len(req.Request.MultipartForm.File["s2ibinary"]) == 0 {
err := restful.NewError(http.StatusBadRequest, "could not get file from form")
glog.Errorf("%+v", err)
errors.ParseSvcErr(err, resp)
return
}
if len(req.Request.MultipartForm.File["s2ibinary"]) > 1 {
err := restful.NewError(http.StatusBadRequest, "s2ibinary should only have one file")
glog.Errorf("%+v", err)
errors.ParseSvcErr(err, resp)
return
}
defer req.Request.MultipartForm.RemoveAll()
file, err := req.Request.MultipartForm.File["s2ibinary"][0].Open()
if err != nil {
glog.Error(err)
errors.ParseSvcErr(err, resp)
return
}
filemd5, err := hashutil.GetMD5(file)
if err != nil {
glog.Error(err)
errors.ParseSvcErr(err, resp)
return
}
md5, ok := req.Request.MultipartForm.Value["md5"]
if ok && len(req.Request.MultipartForm.Value["md5"]) > 0 {
if md5[0] != filemd5 {
err := restful.NewError(http.StatusBadRequest, fmt.Sprintf("md5 not match, origin: %+v, calculate: %+v", md5[0], filemd5))
glog.Error(err)
errors.ParseSvcErr(err, resp)
return
}
}
s2ibin, err := devops.UploadS2iBinary(ns, name, filemd5, req.Request.MultipartForm.File["s2ibinary"][0])
if err != nil {
glog.Errorf("%+v", err)
errors.ParseSvcErr(err, resp)
return
}
resp.WriteAsJson(s2ibin)
}
func DownloadS2iBinary(req *restful.Request, resp *restful.Response) {
ns := req.PathParameter("namespace")
name := req.PathParameter("s2ibinary")
fileName := req.PathParameter("file")
url, err := devops.DownloadS2iBinary(ns, name, fileName)
if err != nil {
glog.Errorf("%+v", err)
errors.ParseSvcErr(err, resp)
return
}
http.Redirect(resp.ResponseWriter, req.Request, url, http.StatusFound)
return
}

View File

@@ -0,0 +1,11 @@
approvers:
- runzexia
- soulseen
reviewers:
- runzexia
- soulseen
labels:
- area/controller
- area/devops

View File

@@ -0,0 +1,256 @@
package s2ibinary
import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/util/metrics"
"kubesphere.io/kubesphere/pkg/simple/client/s2is3"
"kubesphere.io/kubesphere/pkg/utils/sliceutil"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
"time"
devopsv1alpha1 "kubesphere.io/kubesphere/pkg/apis/devops/v1alpha1"
devopsclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
devopsinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/devops/v1alpha1"
devopslisters "kubesphere.io/kubesphere/pkg/client/listers/devops/v1alpha1"
)
var log = logf.Log.WithName("s2ibinary-controller")
type S2iBinaryController struct {
client clientset.Interface
devopsClient devopsclient.Interface
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
s2iBinaryLister devopslisters.S2iBinaryLister
s2iBinarySynced cache.InformerSynced
workqueue workqueue.RateLimitingInterface
workerLoopPeriod time.Duration
}
func NewController(devopsclientset devopsclient.Interface,
client clientset.Interface,
s2ibinInformer devopsinformers.S2iBinaryInformer) *S2iBinaryController {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(func(format string, args ...interface{}) {
log.Info(fmt.Sprintf(format, args))
})
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "s2ibinary-controller"})
if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("s2ibinary_controller", client.CoreV1().RESTClient().GetRateLimiter())
}
v := &S2iBinaryController{
client: client,
devopsClient: devopsclientset,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "s2ibinary"),
s2iBinaryLister: s2ibinInformer.Lister(),
s2iBinarySynced: s2ibinInformer.Informer().HasSynced,
workerLoopPeriod: time.Second,
}
v.eventBroadcaster = broadcaster
v.eventRecorder = recorder
s2ibinInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: v.enqueueFoo,
UpdateFunc: func(oldObj, newObj interface{}) {
old := oldObj.(*devopsv1alpha1.S2iBinary)
new := newObj.(*devopsv1alpha1.S2iBinary)
if old.ResourceVersion == new.ResourceVersion {
return
}
v.enqueueFoo(newObj)
},
DeleteFunc: v.enqueueFoo,
})
return v
}
// enqueueFoo takes a Foo resource and converts it into a namespace/name
// string which is then put onto the work workqueue. This method should *not* be
// passed resources of any type other than Foo.
func (c *S2iBinaryController) enqueueFoo(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(key)
}
func (c *S2iBinaryController) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
// We wrap this block in a func so we can defer c.workqueue.Done.
err := func(obj interface{}) error {
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.workqueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workqueue.
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced.
if err := c.syncHandler(key); err != nil {
// Put the item back on the workqueue to handle any transient errors.
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
klog.Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
log.Error(err, "could not reconcile s2ibinary")
utilruntime.HandleError(err)
return true
}
return true
}
func (c *S2iBinaryController) worker() {
for c.processNextWorkItem() {
}
}
func (c *S2iBinaryController) Start(stopCh <-chan struct{}) error {
return c.Run(1, stopCh)
}
func (c *S2iBinaryController) Run(workers int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
log.Info("starting s2ibinary controller")
defer log.Info("shutting down s2ibinary controller")
if !cache.WaitForCacheSync(stopCh, c.s2iBinarySynced) {
return fmt.Errorf("failed to wait for caches to sync")
}
for i := 0; i < workers; i++ {
go wait.Until(c.worker, c.workerLoopPeriod, stopCh)
}
<-stopCh
return nil
}
// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Foo resource
// with the current status of the resource.
func (c *S2iBinaryController) syncHandler(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
log.Error(err, fmt.Sprintf("could not split s2ibin meta %s ", key))
return nil
}
s2ibin, err := c.s2iBinaryLister.S2iBinaries(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
log.Info(fmt.Sprintf("s2ibin '%s' in work queue no longer exists ", key))
return nil
}
log.Error(err, fmt.Sprintf("could not get s2ibin %s ", key))
return err
}
if s2ibin.ObjectMeta.DeletionTimestamp.IsZero() {
if !sliceutil.HasString(s2ibin.ObjectMeta.Finalizers, devopsv1alpha1.S2iBinaryFinalizerName) {
s2ibin.ObjectMeta.Finalizers = append(s2ibin.ObjectMeta.Finalizers, devopsv1alpha1.S2iBinaryFinalizerName)
_, err := c.devopsClient.DevopsV1alpha1().S2iBinaries(namespace).Update(s2ibin)
if err != nil {
log.Error(err, fmt.Sprintf("failed to update s2ibin %s ", key))
return err
}
}
} else {
if sliceutil.HasString(s2ibin.ObjectMeta.Finalizers, devopsv1alpha1.S2iBinaryFinalizerName) {
if err := c.DeleteBinaryInS3(s2ibin); err != nil {
log.Error(err, fmt.Sprintf("failed to delete resource %s in s3", key))
return err
}
s2ibin.ObjectMeta.Finalizers = sliceutil.RemoveString(s2ibin.ObjectMeta.Finalizers, func(item string) bool {
return item == devopsv1alpha1.S2iBinaryFinalizerName
})
_, err := c.devopsClient.DevopsV1alpha1().S2iBinaries(namespace).Update(s2ibin)
if err != nil {
log.Error(err, fmt.Sprintf("failed to update s2ibin %s ", key))
return err
}
}
}
return nil
}
func (c *S2iBinaryController) DeleteBinaryInS3(s2ibin *devopsv1alpha1.S2iBinary) error {
s3client := s2is3.Client()
input := &s3.DeleteObjectInput{
Bucket: s2is3.Bucket(),
Key: aws.String(fmt.Sprintf("%s-%s", s2ibin.Namespace, s2ibin.Name)),
}
_, err := s3client.DeleteObject(input)
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case s3.ErrCodeNoSuchKey:
return nil
default:
log.Error(err, fmt.Sprintf("failed to delete s2ibin %s/%s in s3", s2ibin.Namespace, s2ibin.Name))
return err
}
} else {
log.Error(err, fmt.Sprintf("failed to delete s2ibin %s/%s in s3", s2ibin.Namespace, s2ibin.Name))
return err
}
}
return nil
}

View File

@@ -0,0 +1,11 @@
approvers:
- runzexia
- soulseen
reviewers:
- runzexia
- soulseen
labels:
- area/controller
- area/devops

View File

@@ -0,0 +1,269 @@
package s2irun
import (
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/util/metrics"
"kubesphere.io/kubesphere/pkg/utils/sliceutil"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
"time"
s2iv1alpha1 "github.com/kubesphere/s2ioperator/pkg/apis/devops/v1alpha1"
s2iclient "github.com/kubesphere/s2ioperator/pkg/client/clientset/versioned"
s2iinformers "github.com/kubesphere/s2ioperator/pkg/client/informers/externalversions/devops/v1alpha1"
s2ilisters "github.com/kubesphere/s2ioperator/pkg/client/listers/devops/v1alpha1"
devopsv1alpha1 "kubesphere.io/kubesphere/pkg/apis/devops/v1alpha1"
devopsclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
devopsinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/devops/v1alpha1"
devopslisters "kubesphere.io/kubesphere/pkg/client/listers/devops/v1alpha1"
)
var log = logf.Log.WithName("s2irun-controller")
type S2iRunController struct {
client clientset.Interface
s2iClient s2iclient.Interface
devopsClient devopsclient.Interface
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
s2iRunLister s2ilisters.S2iRunLister
s2iRunSynced cache.InformerSynced
s2iBinaryLister devopslisters.S2iBinaryLister
s2iBinarySynced cache.InformerSynced
workqueue workqueue.RateLimitingInterface
workerLoopPeriod time.Duration
}
func NewController(devopsclientset devopsclient.Interface, s2iclientset s2iclient.Interface,
client clientset.Interface,
s2ibinInformer devopsinformers.S2iBinaryInformer, s2iRunInformer s2iinformers.S2iRunInformer) *S2iRunController {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(func(format string, args ...interface{}) {
log.Info(fmt.Sprintf(format, args))
})
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "s2irun-controller"})
if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("s2irun_controller", client.CoreV1().RESTClient().GetRateLimiter())
}
v := &S2iRunController{
client: client,
devopsClient: devopsclientset,
s2iClient: s2iclientset,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "s2irun"),
s2iBinaryLister: s2ibinInformer.Lister(),
s2iBinarySynced: s2ibinInformer.Informer().HasSynced,
s2iRunLister: s2iRunInformer.Lister(),
s2iRunSynced: s2iRunInformer.Informer().HasSynced,
workerLoopPeriod: time.Second,
}
v.eventBroadcaster = broadcaster
v.eventRecorder = recorder
s2iRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: v.enqueueFoo,
UpdateFunc: func(oldObj, newObj interface{}) {
old := oldObj.(*s2iv1alpha1.S2iRun)
new := newObj.(*s2iv1alpha1.S2iRun)
if old.ResourceVersion == new.ResourceVersion {
return
}
v.enqueueFoo(newObj)
},
DeleteFunc: v.enqueueFoo,
})
return v
}
// enqueueFoo takes a Foo resource and converts it into a namespace/name
// string which is then put onto the work workqueue. This method should *not* be
// passed resources of any type other than Foo.
func (c *S2iRunController) enqueueFoo(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(key)
}
func (c *S2iRunController) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
// We wrap this block in a func so we can defer c.workqueue.Done.
err := func(obj interface{}) error {
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.workqueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workqueue.
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced.
if err := c.syncHandler(key); err != nil {
// Put the item back on the workqueue to handle any transient errors.
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
klog.Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
log.Error(err, "could not reconcile s2irun")
utilruntime.HandleError(err)
return true
}
return true
}
func (c *S2iRunController) worker() {
for c.processNextWorkItem() {
}
}
func (c *S2iRunController) Start(stopCh <-chan struct{}) error {
return c.Run(1, stopCh)
}
func (c *S2iRunController) Run(workers int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
log.Info("starting s2irun controller")
defer log.Info("shutting down s2irun controller")
if !cache.WaitForCacheSync(stopCh, c.s2iBinarySynced) {
return fmt.Errorf("failed to wait for caches to sync")
}
for i := 0; i < workers; i++ {
go wait.Until(c.worker, c.workerLoopPeriod, stopCh)
}
<-stopCh
return nil
}
// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Foo resource
// with the current status of the resource.
func (c *S2iRunController) syncHandler(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
log.Error(err, fmt.Sprintf("could not split s2irun meta %s ", key))
return nil
}
s2irun, err := c.s2iRunLister.S2iRuns(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
log.Info(fmt.Sprintf("s2irun '%s' in work queue no longer exists ", key))
return nil
}
log.Error(err, fmt.Sprintf("could not get s2irun %s ", key))
return err
}
if s2irun.Labels != nil {
_, ok := s2irun.Labels[devopsv1alpha1.S2iBinaryLabelKey]
if ok {
if s2irun.ObjectMeta.DeletionTimestamp.IsZero() {
if !sliceutil.HasString(s2irun.ObjectMeta.Finalizers, devopsv1alpha1.S2iBinaryFinalizerName) {
s2irun.ObjectMeta.Finalizers = append(s2irun.ObjectMeta.Finalizers, devopsv1alpha1.S2iBinaryFinalizerName)
_, err := c.s2iClient.DevopsV1alpha1().S2iRuns(namespace).Update(s2irun)
if err != nil {
log.Error(err, fmt.Sprintf("failed to update s2irun %s", key))
return err
}
}
} else {
if sliceutil.HasString(s2irun.ObjectMeta.Finalizers, devopsv1alpha1.S2iBinaryFinalizerName) {
if err := c.DeleteS2iBinary(s2irun); err != nil {
log.Error(err, fmt.Sprintf("failed to delete s2ibin %s in", key))
return err
}
s2irun.ObjectMeta.Finalizers = sliceutil.RemoveString(s2irun.ObjectMeta.Finalizers, func(item string) bool {
return item == devopsv1alpha1.S2iBinaryFinalizerName
})
_, err := c.s2iClient.DevopsV1alpha1().S2iRuns(namespace).Update(s2irun)
if err != nil {
log.Error(err, fmt.Sprintf("failed to update s2irun %s ", key))
return err
}
}
}
}
}
return nil
}
func (c *S2iRunController) DeleteS2iBinary(s2irun *s2iv1alpha1.S2iRun) error {
s2iBinName := s2irun.Labels[devopsv1alpha1.S2iBinaryLabelKey]
s2iBin, err := c.s2iBinaryLister.S2iBinaries(s2irun.Namespace).Get(s2iBinName)
if err != nil {
if errors.IsNotFound(err) {
log.Info(fmt.Sprintf("s2ibin '%s/%s' has been delted ", s2irun.Namespace, s2iBinName))
return nil
}
log.Error(err, fmt.Sprintf("failed to get s2ibin %s/%s ", s2irun.Namespace, s2iBinName))
return err
}
err = c.devopsClient.DevopsV1alpha1().S2iBinaries(s2iBin.Namespace).Delete(s2iBinName, nil)
if err != nil {
if errors.IsNotFound(err) {
log.Info(fmt.Sprintf("s2ibin '%s/%s' has been delted ", s2irun.Namespace, s2iBinName))
return nil
}
log.Error(err, fmt.Sprintf("failed to delete s2ibin %s/%s ", s2irun.Namespace, s2iBinName))
return err
}
return nil
}

View File

@@ -0,0 +1,198 @@
package devops
import (
"code.cloudfoundry.org/bytefmt"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/emicklei/go-restful"
"github.com/golang/glog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
"kubesphere.io/kubesphere/pkg/apis/devops/v1alpha1"
"kubesphere.io/kubesphere/pkg/informers"
"kubesphere.io/kubesphere/pkg/simple/client/k8s"
"kubesphere.io/kubesphere/pkg/simple/client/s2is3"
"mime/multipart"
"net/http"
"reflect"
"time"
)
const (
GetS2iBinaryURL = "http://ks-apiserver.kubesphere-system.svc/kapis/devops.kubesphere.io/v1alpha2/namespaces/%s/s2ibinaries/%s/file/%s"
)
func UploadS2iBinary(namespace, name, md5 string, fileHeader *multipart.FileHeader) (*v1alpha1.S2iBinary, error) {
binFile, err := fileHeader.Open()
if err != nil {
glog.Errorf("%+v", err)
return nil, err
}
defer binFile.Close()
origin, err := informers.KsSharedInformerFactory().Devops().V1alpha1().S2iBinaries().Lister().S2iBinaries(namespace).Get(name)
if err != nil {
glog.Errorf("%+v", err)
return nil, err
}
//Check file is uploading
if origin.Status.Phase == v1alpha1.StatusUploading {
err := restful.NewError(http.StatusConflict, "file is uploading, please try later")
glog.Error(err)
return nil, err
}
copy := origin.DeepCopy()
copy.Spec.MD5 = md5
copy.Spec.Size = bytefmt.ByteSize(uint64(fileHeader.Size))
copy.Spec.FileName = fileHeader.Filename
copy.Spec.DownloadURL = fmt.Sprintf(GetS2iBinaryURL, namespace, name, copy.Spec.FileName)
if origin.Status.Phase == v1alpha1.StatusReady && reflect.DeepEqual(origin, copy) {
return origin, nil
}
//Set status Uploading to lock resource
origin, err = SetS2iBinaryStatus(origin, v1alpha1.StatusUploading)
if err != nil {
err := restful.NewError(http.StatusConflict, fmt.Sprintf("could not set status: %+v", err))
glog.Error(err)
return nil, err
}
copy = origin.DeepCopy()
copy.Spec.MD5 = md5
copy.Spec.Size = bytefmt.ByteSize(uint64(fileHeader.Size))
copy.Spec.FileName = fileHeader.Filename
copy.Spec.DownloadURL = fmt.Sprintf(GetS2iBinaryURL, namespace, name, copy.Spec.FileName)
s3session := s2is3.Session()
if s3session == nil {
err := fmt.Errorf("could not connect to s2i s3")
glog.Error(err)
return nil, err
}
uploader := s3manager.NewUploader(s3session, func(uploader *s3manager.Uploader) {
uploader.PartSize = 5 * bytefmt.MEGABYTE
uploader.LeavePartsOnError = true
})
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: s2is3.Bucket(),
Key: aws.String(fmt.Sprintf("%s-%s", namespace, name)),
Body: binFile,
ContentMD5: aws.String(md5),
ContentDisposition: aws.String(fmt.Sprintf("attachment; filename=\"%s\"", copy.Spec.FileName)),
})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case s3.ErrCodeNoSuchBucket:
glog.Error(err)
_, serr := SetS2iBinaryStatusWithRetry(origin, origin.Status.Phase)
if serr != nil {
glog.Error(serr)
}
return nil, err
default:
glog.Error(err)
_, serr := SetS2iBinaryStatusWithRetry(origin, v1alpha1.StatusUnableToDownload)
if serr != nil {
glog.Error(serr)
}
return nil, err
}
}
glog.Error(err)
return nil, err
}
if copy.Spec.UploadTimeStamp == nil {
copy.Spec.UploadTimeStamp = new(metav1.Time)
}
*copy.Spec.UploadTimeStamp = metav1.Now()
resp, err := k8s.KsClient().DevopsV1alpha1().S2iBinaries(namespace).Update(copy)
if err != nil {
glog.Error(err)
return nil, err
}
resp, err = SetS2iBinaryStatusWithRetry(resp, v1alpha1.StatusReady)
if err != nil {
glog.Error(err)
return nil, err
}
return resp, nil
}
func DownloadS2iBinary(namespace, name, fileName string) (string, error) {
origin, err := informers.KsSharedInformerFactory().Devops().V1alpha1().S2iBinaries().Lister().S2iBinaries(namespace).Get(name)
if err != nil {
glog.Errorf("%+v", err)
return "", err
}
if origin.Spec.FileName != fileName {
err := fmt.Errorf("could not fould file %s", fileName)
glog.Error(err)
return "", err
}
if origin.Status.Phase != v1alpha1.StatusReady {
err := restful.NewError(http.StatusBadRequest, "file is not ready, please try later")
glog.Error(err)
return "", err
}
s3Client := s2is3.Client()
if s3Client == nil {
err := fmt.Errorf("could not get s3 client")
glog.Error(err)
return "", err
}
req, _ := s3Client.GetObjectRequest(&s3.GetObjectInput{
Bucket: s2is3.Bucket(),
Key: aws.String(fmt.Sprintf("%s-%s", namespace, name)),
ResponseContentDisposition: aws.String(fmt.Sprintf("attachment; filename=\"%s\"", origin.Spec.FileName)),
})
url, err := req.Presign(5 * time.Minute)
if err != nil {
glog.Error(err)
return "", err
}
return url, nil
}
func SetS2iBinaryStatus(s2ibin *v1alpha1.S2iBinary, status string) (*v1alpha1.S2iBinary, error) {
copy := s2ibin.DeepCopy()
copy.Status.Phase = status
copy, err := k8s.KsClient().DevopsV1alpha1().S2iBinaries(s2ibin.Namespace).Update(copy)
if err != nil {
glog.Error(err)
return nil, err
}
return copy, nil
}
func SetS2iBinaryStatusWithRetry(s2ibin *v1alpha1.S2iBinary, status string) (*v1alpha1.S2iBinary, error) {
var bin *v1alpha1.S2iBinary
var err error
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
bin, err = informers.KsSharedInformerFactory().Devops().V1alpha1().S2iBinaries().Lister().S2iBinaries(s2ibin.Namespace).Get(s2ibin.Name)
if err != nil {
glog.Error(err)
return err
}
bin.Status.Phase = status
bin, err = k8s.KsClient().DevopsV1alpha1().S2iBinaries(s2ibin.Namespace).Update(bin)
if err != nil {
glog.Error(err)
return err
}
return nil
})
if err != nil {
glog.Error(err)
return nil, err
}
return bin, nil
}

View File

@@ -38,7 +38,7 @@ var (
)
func init() {
flag.StringVar(&kubeConfigFile, "kubeconfig", "", "path to kubeconfig file")
flag.StringVar(&kubeConfigFile, "kubeconfig-path", "", "path to kubeconfig file")
flag.StringVar(&MasterURL, "master-url", "", "kube-apiserver url, only needed when out of cluster")
}

View File

@@ -0,0 +1,69 @@
package s2is3
import (
"flag"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/golang/glog"
)
var (
s3Region string
s3Endpoint string
s3DisableSSL bool
s3ForcePathStyle bool
s3AccessKeyID string
s3SecretAccessKey string
s3SessionToken string
s3Bucket string
)
var s2iS3 *s3.S3
var s2iS3Session *session.Session
func init() {
flag.StringVar(&s3Region, "s2i-s3-region", "us-east-1", "region of s2i s3")
flag.StringVar(&s3Endpoint, "s2i-s3-endpoint", "http://ks-minio.kubesphere-system.svc", "endpoint of s2i s3")
flag.StringVar(&s3AccessKeyID, "s2i-s3-access-key-id", "AKIAIOSFODNN7EXAMPLE", "access key of s2i s3")
flag.StringVar(&s3SecretAccessKey, "s2i-s3-secret-access-key", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", "secret access key of s2i s3")
flag.StringVar(&s3SessionToken, "s2i-s3-session-token", "", "session token of s2i s3")
flag.StringVar(&s3Bucket, "s2i-s3-bucket", "s2i-binaries", "bucket name of s2i s3")
flag.BoolVar(&s3DisableSSL, "s2i-s3-disable-SSL", true, "disable ssl")
flag.BoolVar(&s3ForcePathStyle, "s2i-s3-force-path-style", true, "force path style")
}
func Client() *s3.S3 {
if s2iS3 != nil {
return s2iS3
}
creds := credentials.NewStaticCredentials(
s3AccessKeyID, s3SecretAccessKey, s3SessionToken,
)
config := &aws.Config{
Region: aws.String(s3Region),
Endpoint: aws.String(s3Endpoint),
DisableSSL: aws.Bool(s3DisableSSL),
S3ForcePathStyle: aws.Bool(s3ForcePathStyle),
Credentials: creds,
}
sess, err := session.NewSession(config)
if err != nil {
glog.Errorf("failed to connect to s2i s3: %+v", err)
return nil
}
s2iS3Session = sess
s2iS3 = s3.New(sess)
return s2iS3
}
func Session() *session.Session {
if s2iS3Session != nil {
return s2iS3Session
}
Client()
return s2iS3Session
}
func Bucket() *string {
return aws.String(s3Bucket)
}

31
pkg/utils/hashutil/MD5.go Normal file
View File

@@ -0,0 +1,31 @@
package hashutil
import (
"code.cloudfoundry.org/bytefmt"
"encoding/hex"
"github.com/golang/glog"
"io"
"kubesphere.io/kubesphere/pkg/utils/readerutils"
)
func GetMD5(reader io.ReadCloser) (string, error) {
md5reader := readerutils.NewMD5Reader(reader)
data := make([]byte, bytefmt.KILOBYTE)
for {
_, err := md5reader.Read(data)
if err != nil {
if err == io.EOF {
break
}
glog.Error(err)
return "", err
}
}
err := reader.Close()
if err != nil {
return "", err
}
return hex.EncodeToString(md5reader.MD5()), nil
}

View File

@@ -0,0 +1,31 @@
package readerutils
import (
"crypto/md5"
"hash"
"io"
)
type MD5Reader struct {
md5 hash.Hash
body io.Reader
}
func (reader *MD5Reader) Read(b []byte) (int, error) {
n, err := reader.body.Read(b)
if err != nil {
return n, err
}
return reader.md5.Write(b[:n])
}
func (reader *MD5Reader) MD5() []byte {
return reader.md5.Sum(nil)
}
func NewMD5Reader(reader io.Reader) *MD5Reader {
return &MD5Reader{
md5: md5.New(),
body: reader,
}
}