ensure db connections are successfully closed when process exit

This commit is contained in:
richardxz
2018-09-20 05:40:50 -04:00
parent 1389332205
commit dc93d00aed
3 changed files with 46 additions and 13 deletions

View File

@@ -33,6 +33,11 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"os"
"os/signal"
"sync"
"syscall"
_ "kubesphere.io/kubesphere/pkg/apis/v1alpha"
"kubesphere.io/kubesphere/pkg/client"
"kubesphere.io/kubesphere/pkg/constants"
@@ -118,7 +123,10 @@ func (server *kubeSphereServer) run() {
return
}
go controllers.Run()
var wg sync.WaitGroup
stopChan := make(chan struct{})
wg.Add(1)
go controllers.Run(stopChan, &wg)
registerSwagger()
@@ -147,7 +155,12 @@ func (server *kubeSphereServer) run() {
go func() { glog.Fatal(http.ListenAndServe(insecureAddr, nil)) }()
}
select {}
sigs := make(chan os.Signal)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
close(stopChan)
wg.Wait()
}
func Run() {

View File

@@ -23,11 +23,18 @@ import (
"strings"
"time"
"sync"
"github.com/golang/glog"
"github.com/jinzhu/gorm"
"github.com/pkg/errors"
)
const (
checkPeriod = 30 * time.Minute
sleepPeriod = 15 * time.Second
)
func listWithConditions(db *gorm.DB, total *int, object, list interface{}, conditions string, paging *Paging, order string) {
if len(conditions) == 0 {
db.Model(object).Count(total)
@@ -114,13 +121,20 @@ func hasSynced(ctl Controller) bool {
func checkAndResync(ctl Controller, stopChan chan struct{}) {
defer close(stopChan)
lastTime := time.Now()
for {
select {
case <-ctl.chanStop():
return
default:
time.Sleep(30 * time.Minute)
if time.Now().Sub(lastTime) < checkPeriod {
time.Sleep(sleepPeriod)
break
}
lastTime = time.Now()
if !hasSynced(ctl) {
glog.Errorf("the data in db and kubernetes is inconsistent, resync %s controller", ctl.Name())
close(stopChan)
@@ -131,9 +145,10 @@ func checkAndResync(ctl Controller, stopChan chan struct{}) {
}
}
func listAndWatch(ctl Controller) {
func listAndWatch(ctl Controller, wg *sync.WaitGroup) {
defer handleCrash(ctl)
defer ctl.CloseDB()
defer wg.Done()
stopChan := make(chan struct{})
go ctl.sync(stopChan)

View File

@@ -23,6 +23,10 @@ import (
"github.com/jinzhu/gorm"
"k8s.io/client-go/kubernetes"
"os"
"sync"
"syscall"
"kubesphere.io/kubesphere/pkg/client"
)
@@ -33,7 +37,7 @@ type resourceControllers struct {
var ResourceControllers resourceControllers
func (rec *resourceControllers) runContoller(name string, stopChan chan struct{}) {
func (rec *resourceControllers) runContoller(name string, stopChan chan struct{}, wg *sync.WaitGroup) {
var ctl Controller
attr := CommonAttribute{DB: client.NewDBClient(), K8sClient: rec.k8sClient, stopChan: stopChan,
aliveChan: make(chan struct{}), Name: name}
@@ -75,7 +79,8 @@ func (rec *resourceControllers) runContoller(name string, stopChan chan struct{}
}
rec.Controllers[name] = ctl
go listAndWatch(ctl)
wg.Add(1)
go listAndWatch(ctl, wg)
}
@@ -94,23 +99,21 @@ func dbHealthCheck(db *gorm.DB) {
}
if count > 3 {
panic(err)
syscall.Kill(os.Getpid(), syscall.SIGTERM)
}
}
}
func Run() {
stopChan := make(chan struct{})
defer close(stopChan)
func Run(stopChan chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
k8sClient := client.NewK8sClient()
ResourceControllers = resourceControllers{k8sClient: k8sClient, Controllers: make(map[string]Controller)}
for _, item := range []string{Deployments, Statefulsets, Daemonsets, PersistentVolumeClaim, Pods, Services,
Ingresses, Roles, ClusterRoles, Namespaces, StorageClasses, Jobs, Cronjobs, Nodes, Replicasets, ControllerRevisions} {
ResourceControllers.runContoller(item, stopChan)
ResourceControllers.runContoller(item, stopChan, wg)
}
go dbHealthCheck(client.NewDBClient())
@@ -118,10 +121,12 @@ func Run() {
for {
for ctlName, controller := range ResourceControllers.Controllers {
select {
case <-stopChan:
return
case _, isClose := <-controller.chanAlive():
if !isClose {
glog.Errorf("controller %s have stopped, restart it", ctlName)
ResourceControllers.runContoller(ctlName, stopChan)
ResourceControllers.runContoller(ctlName, stopChan, wg)
}
default:
time.Sleep(5 * time.Second)