Files
kubesphere/pkg/models/controllers/common.go

143 lines
3.1 KiB
Go

/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/golang/glog"
"github.com/jinzhu/gorm"
"github.com/pkg/errors"
)
func listWithConditions(db *gorm.DB, total *int, object, list interface{}, conditions string, paging *Paging, order string) {
if len(conditions) == 0 {
db.Model(object).Count(total)
} else {
db.Model(object).Where(conditions).Count(total)
}
if paging != nil {
if len(conditions) > 0 {
db.Where(conditions).Order(order).Limit(paging.Limit).Offset(paging.Offset).Find(list)
} else {
db.Order(order).Limit(paging.Limit).Offset(paging.Offset).Find(list)
}
} else {
if len(conditions) > 0 {
db.Where(conditions).Order(order).Find(list)
} else {
db.Order(order).Find(list)
}
}
}
func countWithConditions(db *gorm.DB, conditions string, object interface{}) int {
var count int
if len(conditions) == 0 {
db.Model(object).Count(&count)
} else {
db.Model(object).Where(conditions).Count(&count)
}
return count
}
func makeHttpRequest(method, url, data string) ([]byte, error) {
var req *http.Request
var err error
if method == "GET" {
req, err = http.NewRequest(method, url, nil)
} else {
req, err = http.NewRequest(method, url, strings.NewReader(data))
}
if err != nil {
glog.Error(err)
return nil, err
}
httpClient := &http.Client{}
resp, err := httpClient.Do(req)
if err != nil {
err := fmt.Errorf("Request to %s failed, method: %s, reason: %s ", url, method, err)
glog.Error(err)
return nil, err
}
body, err := ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
if resp.StatusCode >= http.StatusBadRequest {
err = errors.New(string(body))
}
return body, err
}
func handleCrash(ctl Controller) {
close(ctl.chanAlive())
if err := recover(); err != nil {
glog.Errorf("panic occur in %s controller's listAndWatch function, reason: %s", ctl.Name(), err)
return
}
}
func hasSynced(ctl Controller) bool {
totalInDb := ctl.CountWithConditions("")
totalInK8s := ctl.total()
if totalInDb == totalInK8s {
return true
}
return false
}
func checkAndResync(ctl Controller, stopChan chan struct{}) {
defer close(stopChan)
for {
select {
case <-ctl.chanStop():
return
default:
time.Sleep(30 * time.Minute)
if !hasSynced(ctl) {
glog.Errorf("the data in db and kubernetes is inconsistent, resync %s controller", ctl.Name())
close(stopChan)
stopChan = make(chan struct{})
go ctl.sync(stopChan)
}
}
}
}
func listAndWatch(ctl Controller) {
defer handleCrash(ctl)
stopChan := make(chan struct{})
go ctl.sync(stopChan)
checkAndResync(ctl, stopChan)
}