Merge pull request #172 from richardxz/master
ensure db connections are successfully closed when process exit
This commit is contained in:
@@ -33,6 +33,11 @@ import (
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
"syscall"
|
||||
|
||||
_ "kubesphere.io/kubesphere/pkg/apis/v1alpha"
|
||||
"kubesphere.io/kubesphere/pkg/client"
|
||||
"kubesphere.io/kubesphere/pkg/constants"
|
||||
@@ -118,7 +123,10 @@ func (server *kubeSphereServer) run() {
|
||||
return
|
||||
}
|
||||
|
||||
go controllers.Run()
|
||||
var wg sync.WaitGroup
|
||||
stopChan := make(chan struct{})
|
||||
wg.Add(1)
|
||||
go controllers.Run(stopChan, &wg)
|
||||
|
||||
registerSwagger()
|
||||
|
||||
@@ -147,7 +155,12 @@ func (server *kubeSphereServer) run() {
|
||||
go func() { glog.Fatal(http.ListenAndServe(insecureAddr, nil)) }()
|
||||
}
|
||||
|
||||
select {}
|
||||
sigs := make(chan os.Signal)
|
||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
<-sigs
|
||||
close(stopChan)
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func Run() {
|
||||
|
||||
@@ -23,11 +23,18 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"sync"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/jinzhu/gorm"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const (
|
||||
checkPeriod = 30 * time.Minute
|
||||
sleepPeriod = 15 * time.Second
|
||||
)
|
||||
|
||||
func listWithConditions(db *gorm.DB, total *int, object, list interface{}, conditions string, paging *Paging, order string) {
|
||||
if len(conditions) == 0 {
|
||||
db.Model(object).Count(total)
|
||||
@@ -114,13 +121,20 @@ func hasSynced(ctl Controller) bool {
|
||||
|
||||
func checkAndResync(ctl Controller, stopChan chan struct{}) {
|
||||
defer close(stopChan)
|
||||
|
||||
lastTime := time.Now()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctl.chanStop():
|
||||
return
|
||||
default:
|
||||
time.Sleep(30 * time.Minute)
|
||||
if time.Now().Sub(lastTime) < checkPeriod {
|
||||
time.Sleep(sleepPeriod)
|
||||
break
|
||||
}
|
||||
|
||||
lastTime = time.Now()
|
||||
if !hasSynced(ctl) {
|
||||
glog.Errorf("the data in db and kubernetes is inconsistent, resync %s controller", ctl.Name())
|
||||
close(stopChan)
|
||||
@@ -131,9 +145,10 @@ func checkAndResync(ctl Controller, stopChan chan struct{}) {
|
||||
}
|
||||
}
|
||||
|
||||
func listAndWatch(ctl Controller) {
|
||||
func listAndWatch(ctl Controller, wg *sync.WaitGroup) {
|
||||
defer handleCrash(ctl)
|
||||
defer ctl.CloseDB()
|
||||
defer wg.Done()
|
||||
stopChan := make(chan struct{})
|
||||
|
||||
go ctl.sync(stopChan)
|
||||
|
||||
@@ -23,6 +23,10 @@ import (
|
||||
"github.com/jinzhu/gorm"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
|
||||
"os"
|
||||
"sync"
|
||||
"syscall"
|
||||
|
||||
"kubesphere.io/kubesphere/pkg/client"
|
||||
)
|
||||
|
||||
@@ -33,7 +37,7 @@ type resourceControllers struct {
|
||||
|
||||
var ResourceControllers resourceControllers
|
||||
|
||||
func (rec *resourceControllers) runContoller(name string, stopChan chan struct{}) {
|
||||
func (rec *resourceControllers) runContoller(name string, stopChan chan struct{}, wg *sync.WaitGroup) {
|
||||
var ctl Controller
|
||||
attr := CommonAttribute{DB: client.NewDBClient(), K8sClient: rec.k8sClient, stopChan: stopChan,
|
||||
aliveChan: make(chan struct{}), Name: name}
|
||||
@@ -75,7 +79,8 @@ func (rec *resourceControllers) runContoller(name string, stopChan chan struct{}
|
||||
}
|
||||
|
||||
rec.Controllers[name] = ctl
|
||||
go listAndWatch(ctl)
|
||||
wg.Add(1)
|
||||
go listAndWatch(ctl, wg)
|
||||
|
||||
}
|
||||
|
||||
@@ -94,23 +99,21 @@ func dbHealthCheck(db *gorm.DB) {
|
||||
}
|
||||
|
||||
if count > 3 {
|
||||
panic(err)
|
||||
syscall.Kill(os.Getpid(), syscall.SIGTERM)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func Run() {
|
||||
|
||||
stopChan := make(chan struct{})
|
||||
defer close(stopChan)
|
||||
func Run(stopChan chan struct{}, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
|
||||
k8sClient := client.NewK8sClient()
|
||||
ResourceControllers = resourceControllers{k8sClient: k8sClient, Controllers: make(map[string]Controller)}
|
||||
|
||||
for _, item := range []string{Deployments, Statefulsets, Daemonsets, PersistentVolumeClaim, Pods, Services,
|
||||
Ingresses, Roles, ClusterRoles, Namespaces, StorageClasses, Jobs, Cronjobs, Nodes, Replicasets, ControllerRevisions} {
|
||||
ResourceControllers.runContoller(item, stopChan)
|
||||
ResourceControllers.runContoller(item, stopChan, wg)
|
||||
}
|
||||
|
||||
go dbHealthCheck(client.NewDBClient())
|
||||
@@ -118,10 +121,12 @@ func Run() {
|
||||
for {
|
||||
for ctlName, controller := range ResourceControllers.Controllers {
|
||||
select {
|
||||
case <-stopChan:
|
||||
return
|
||||
case _, isClose := <-controller.chanAlive():
|
||||
if !isClose {
|
||||
glog.Errorf("controller %s have stopped, restart it", ctlName)
|
||||
ResourceControllers.runContoller(ctlName, stopChan)
|
||||
ResourceControllers.runContoller(ctlName, stopChan, wg)
|
||||
}
|
||||
default:
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
Reference in New Issue
Block a user