From dc93d00aed7955754b1f38864e476ab901e24a5f Mon Sep 17 00:00:00 2001 From: richardxz Date: Thu, 20 Sep 2018 05:40:50 -0400 Subject: [PATCH] ensure db connections are successfully closed when process exit --- pkg/app/app.go | 17 +++++++++++++++-- pkg/models/controllers/common.go | 19 +++++++++++++++++-- pkg/models/controllers/run.go | 23 ++++++++++++++--------- 3 files changed, 46 insertions(+), 13 deletions(-) diff --git a/pkg/app/app.go b/pkg/app/app.go index b33abb4a8..78cac589a 100644 --- a/pkg/app/app.go +++ b/pkg/app/app.go @@ -33,6 +33,11 @@ import ( "k8s.io/apimachinery/pkg/api/errors" + "os" + "os/signal" + "sync" + "syscall" + _ "kubesphere.io/kubesphere/pkg/apis/v1alpha" "kubesphere.io/kubesphere/pkg/client" "kubesphere.io/kubesphere/pkg/constants" @@ -118,7 +123,10 @@ func (server *kubeSphereServer) run() { return } - go controllers.Run() + var wg sync.WaitGroup + stopChan := make(chan struct{}) + wg.Add(1) + go controllers.Run(stopChan, &wg) registerSwagger() @@ -147,7 +155,12 @@ func (server *kubeSphereServer) run() { go func() { glog.Fatal(http.ListenAndServe(insecureAddr, nil)) }() } - select {} + sigs := make(chan os.Signal) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + + <-sigs + close(stopChan) + wg.Wait() } func Run() { diff --git a/pkg/models/controllers/common.go b/pkg/models/controllers/common.go index 3bd269305..589bb13f8 100644 --- a/pkg/models/controllers/common.go +++ b/pkg/models/controllers/common.go @@ -23,11 +23,18 @@ import ( "strings" "time" + "sync" + "github.com/golang/glog" "github.com/jinzhu/gorm" "github.com/pkg/errors" ) +const ( + checkPeriod = 30 * time.Minute + sleepPeriod = 15 * time.Second +) + func listWithConditions(db *gorm.DB, total *int, object, list interface{}, conditions string, paging *Paging, order string) { if len(conditions) == 0 { db.Model(object).Count(total) @@ -114,13 +121,20 @@ func hasSynced(ctl Controller) bool { func checkAndResync(ctl Controller, stopChan chan struct{}) { defer close(stopChan) + + lastTime := time.Now() + for { select { case <-ctl.chanStop(): return default: - time.Sleep(30 * time.Minute) + if time.Now().Sub(lastTime) < checkPeriod { + time.Sleep(sleepPeriod) + break + } + lastTime = time.Now() if !hasSynced(ctl) { glog.Errorf("the data in db and kubernetes is inconsistent, resync %s controller", ctl.Name()) close(stopChan) @@ -131,9 +145,10 @@ func checkAndResync(ctl Controller, stopChan chan struct{}) { } } -func listAndWatch(ctl Controller) { +func listAndWatch(ctl Controller, wg *sync.WaitGroup) { defer handleCrash(ctl) defer ctl.CloseDB() + defer wg.Done() stopChan := make(chan struct{}) go ctl.sync(stopChan) diff --git a/pkg/models/controllers/run.go b/pkg/models/controllers/run.go index 36eff2001..c9ba0322f 100644 --- a/pkg/models/controllers/run.go +++ b/pkg/models/controllers/run.go @@ -23,6 +23,10 @@ import ( "github.com/jinzhu/gorm" "k8s.io/client-go/kubernetes" + "os" + "sync" + "syscall" + "kubesphere.io/kubesphere/pkg/client" ) @@ -33,7 +37,7 @@ type resourceControllers struct { var ResourceControllers resourceControllers -func (rec *resourceControllers) runContoller(name string, stopChan chan struct{}) { +func (rec *resourceControllers) runContoller(name string, stopChan chan struct{}, wg *sync.WaitGroup) { var ctl Controller attr := CommonAttribute{DB: client.NewDBClient(), K8sClient: rec.k8sClient, stopChan: stopChan, aliveChan: make(chan struct{}), Name: name} @@ -75,7 +79,8 @@ func (rec *resourceControllers) runContoller(name string, stopChan chan struct{} } rec.Controllers[name] = ctl - go listAndWatch(ctl) + wg.Add(1) + go listAndWatch(ctl, wg) } @@ -94,23 +99,21 @@ func dbHealthCheck(db *gorm.DB) { } if count > 3 { - panic(err) + syscall.Kill(os.Getpid(), syscall.SIGTERM) } } } -func Run() { - - stopChan := make(chan struct{}) - defer close(stopChan) +func Run(stopChan chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() k8sClient := client.NewK8sClient() ResourceControllers = resourceControllers{k8sClient: k8sClient, Controllers: make(map[string]Controller)} for _, item := range []string{Deployments, Statefulsets, Daemonsets, PersistentVolumeClaim, Pods, Services, Ingresses, Roles, ClusterRoles, Namespaces, StorageClasses, Jobs, Cronjobs, Nodes, Replicasets, ControllerRevisions} { - ResourceControllers.runContoller(item, stopChan) + ResourceControllers.runContoller(item, stopChan, wg) } go dbHealthCheck(client.NewDBClient()) @@ -118,10 +121,12 @@ func Run() { for { for ctlName, controller := range ResourceControllers.Controllers { select { + case <-stopChan: + return case _, isClose := <-controller.chanAlive(): if !isClose { glog.Errorf("controller %s have stopped, restart it", ctlName) - ResourceControllers.runContoller(ctlName, stopChan) + ResourceControllers.runContoller(ctlName, stopChan, wg) } default: time.Sleep(5 * time.Second)