400
vendor/sigs.k8s.io/controller-runtime/pkg/manager/internal.go
generated
vendored
400
vendor/sigs.k8s.io/controller-runtime/pkg/manager/internal.go
generated
vendored
@@ -18,15 +18,18 @@ package manager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/leaderelection"
|
||||
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
||||
@@ -44,12 +47,14 @@ import (
|
||||
|
||||
const (
|
||||
// Values taken from: https://github.com/kubernetes/apiserver/blob/master/pkg/apis/config/v1alpha1/defaults.go
|
||||
defaultLeaseDuration = 15 * time.Second
|
||||
defaultRenewDeadline = 10 * time.Second
|
||||
defaultRetryPeriod = 2 * time.Second
|
||||
defaultLeaseDuration = 15 * time.Second
|
||||
defaultRenewDeadline = 10 * time.Second
|
||||
defaultRetryPeriod = 2 * time.Second
|
||||
defaultGracefulShutdownPeriod = 30 * time.Second
|
||||
|
||||
defaultReadinessEndpoint = "/readyz"
|
||||
defaultLivenessEndpoint = "/healthz"
|
||||
defaultReadinessEndpoint = "/readyz/"
|
||||
defaultLivenessEndpoint = "/healthz/"
|
||||
defaultMetricsEndpoint = "/metrics"
|
||||
)
|
||||
|
||||
var log = logf.RuntimeLog.WithName("manager")
|
||||
@@ -95,6 +100,9 @@ type controllerManager struct {
|
||||
// metricsListener is used to serve prometheus metrics
|
||||
metricsListener net.Listener
|
||||
|
||||
// metricsExtraHandlers contains extra handlers to register on http server that serves metrics.
|
||||
metricsExtraHandlers map[string]http.Handler
|
||||
|
||||
// healthProbeListener is used to serve liveness probe
|
||||
healthProbeListener net.Listener
|
||||
|
||||
@@ -114,11 +122,7 @@ type controllerManager struct {
|
||||
started bool
|
||||
startedLeader bool
|
||||
healthzStarted bool
|
||||
|
||||
// NB(directxman12): we don't just use an error channel here to avoid the situation where the
|
||||
// error channel is too small and we end up blocking some goroutines waiting to report their errors.
|
||||
// errSignal lets us track when we should stop because an error occurred
|
||||
errSignal *errSignaler
|
||||
errChan chan error
|
||||
|
||||
// internalStop is the stop channel *actually* used by everything involved
|
||||
// with the manager as a stop channel, so that we can pass a stop channel
|
||||
@@ -130,6 +134,23 @@ type controllerManager struct {
|
||||
// It and `internalStop` should point to the same channel.
|
||||
internalStopper chan<- struct{}
|
||||
|
||||
// Logger is the logger that should be used by this manager.
|
||||
// If none is set, it defaults to log.Log global logger.
|
||||
logger logr.Logger
|
||||
|
||||
// leaderElectionCancel is used to cancel the leader election. It is distinct from internalStopper,
|
||||
// because for safety reasons we need to os.Exit() when we lose the leader election, meaning that
|
||||
// it must be deferred until after gracefulShutdown is done.
|
||||
leaderElectionCancel context.CancelFunc
|
||||
|
||||
// stop procedure engaged. In other words, we should not add anything else to the manager
|
||||
stopProcedureEngaged bool
|
||||
|
||||
// elected is closed when this manager becomes the leader of a group of
|
||||
// managers, either because it won a leader election or because no leader
|
||||
// election was configured.
|
||||
elected chan struct{}
|
||||
|
||||
startCache func(stop <-chan struct{}) error
|
||||
|
||||
// port is the port that the webhook server serves at.
|
||||
@@ -146,63 +167,38 @@ type controllerManager struct {
|
||||
// leaseDuration is the duration that non-leader candidates will
|
||||
// wait to force acquire leadership.
|
||||
leaseDuration time.Duration
|
||||
// renewDeadline is the duration that the acting master will retry
|
||||
// renewDeadline is the duration that the acting controlplane will retry
|
||||
// refreshing leadership before giving up.
|
||||
renewDeadline time.Duration
|
||||
// retryPeriod is the duration the LeaderElector clients should wait
|
||||
// between tries of actions.
|
||||
retryPeriod time.Duration
|
||||
}
|
||||
|
||||
type errSignaler struct {
|
||||
// errSignal indicates that an error occurred, when closed. It shouldn't
|
||||
// be written to.
|
||||
errSignal chan struct{}
|
||||
// waitForRunnable is holding the number of runnables currently running so that
|
||||
// we can wait for them to exit before quitting the manager
|
||||
waitForRunnable sync.WaitGroup
|
||||
|
||||
// err is the received error
|
||||
err error
|
||||
// gracefulShutdownTimeout is the duration given to runnable to stop
|
||||
// before the manager actually returns on stop.
|
||||
gracefulShutdownTimeout time.Duration
|
||||
|
||||
mu sync.Mutex
|
||||
}
|
||||
// onStoppedLeading is callled when the leader election lease is lost.
|
||||
// It can be overridden for tests.
|
||||
onStoppedLeading func()
|
||||
|
||||
func (r *errSignaler) SignalError(err error) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
if err == nil {
|
||||
// non-error, ignore
|
||||
log.Error(nil, "SignalError called without an (with a nil) error, which should never happen, ignoring")
|
||||
return
|
||||
}
|
||||
|
||||
if r.err != nil {
|
||||
// we already have an error, don't try again
|
||||
return
|
||||
}
|
||||
|
||||
// save the error and report it
|
||||
r.err = err
|
||||
close(r.errSignal)
|
||||
}
|
||||
|
||||
func (r *errSignaler) Error() error {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
return r.err
|
||||
}
|
||||
|
||||
func (r *errSignaler) GotError() chan struct{} {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
return r.errSignal
|
||||
// shutdownCtx is the context that can be used during shutdown. It will be cancelled
|
||||
// after the gracefulShutdownTimeout ended. It must not be accessed before internalStop
|
||||
// is closed because it will be nil.
|
||||
shutdownCtx context.Context
|
||||
}
|
||||
|
||||
// Add sets dependencies on i, and adds it to the list of Runnables to start.
|
||||
func (cm *controllerManager) Add(r Runnable) error {
|
||||
cm.mu.Lock()
|
||||
defer cm.mu.Unlock()
|
||||
if cm.stopProcedureEngaged {
|
||||
return errors.New("can't accept new runnable as stop procedure is already engaged")
|
||||
}
|
||||
|
||||
// Set dependencies on the object
|
||||
if err := cm.SetFields(r); err != nil {
|
||||
@@ -222,11 +218,7 @@ func (cm *controllerManager) Add(r Runnable) error {
|
||||
|
||||
if shouldStart {
|
||||
// If already started, start the controller
|
||||
go func() {
|
||||
if err := r.Start(cm.internalStop); err != nil {
|
||||
cm.errSignal.SignalError(err)
|
||||
}
|
||||
}()
|
||||
cm.startRunnable(r)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -257,6 +249,28 @@ func (cm *controllerManager) SetFields(i interface{}) error {
|
||||
if _, err := inject.MapperInto(cm.mapper, i); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := inject.LoggerInto(log, i); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddMetricsExtraHandler adds extra handler served on path to the http server that serves metrics.
|
||||
func (cm *controllerManager) AddMetricsExtraHandler(path string, handler http.Handler) error {
|
||||
if path == defaultMetricsEndpoint {
|
||||
return fmt.Errorf("overriding builtin %s endpoint is not allowed", defaultMetricsEndpoint)
|
||||
}
|
||||
|
||||
cm.mu.Lock()
|
||||
defer cm.mu.Unlock()
|
||||
|
||||
_, found := cm.metricsExtraHandlers[path]
|
||||
if found {
|
||||
return fmt.Errorf("can't register extra handler by duplicate path %q on metrics http server", path)
|
||||
}
|
||||
|
||||
cm.metricsExtraHandlers[path] = handler
|
||||
log.V(2).Info("Registering metrics http server extra handler", "path", path)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -265,6 +279,10 @@ func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker)
|
||||
cm.mu.Lock()
|
||||
defer cm.mu.Unlock()
|
||||
|
||||
if cm.stopProcedureEngaged {
|
||||
return errors.New("can't accept new healthCheck as stop procedure is already engaged")
|
||||
}
|
||||
|
||||
if cm.healthzStarted {
|
||||
return fmt.Errorf("unable to add new checker because healthz endpoint has already been created")
|
||||
}
|
||||
@@ -282,6 +300,10 @@ func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker)
|
||||
cm.mu.Lock()
|
||||
defer cm.mu.Unlock()
|
||||
|
||||
if cm.stopProcedureEngaged {
|
||||
return errors.New("can't accept new ready check as stop procedure is already engaged")
|
||||
}
|
||||
|
||||
if cm.healthzStarted {
|
||||
return fmt.Errorf("unable to add new checker because readyz endpoint has already been created")
|
||||
}
|
||||
@@ -327,48 +349,77 @@ func (cm *controllerManager) GetAPIReader() client.Reader {
|
||||
}
|
||||
|
||||
func (cm *controllerManager) GetWebhookServer() *webhook.Server {
|
||||
if cm.webhookServer == nil {
|
||||
server, wasNew := func() (*webhook.Server, bool) {
|
||||
cm.mu.Lock()
|
||||
defer cm.mu.Unlock()
|
||||
|
||||
if cm.webhookServer != nil {
|
||||
return cm.webhookServer, false
|
||||
}
|
||||
|
||||
cm.webhookServer = &webhook.Server{
|
||||
Port: cm.port,
|
||||
Host: cm.host,
|
||||
CertDir: cm.certDir,
|
||||
}
|
||||
if err := cm.Add(cm.webhookServer); err != nil {
|
||||
panic("unable to add webhookServer to the controller manager")
|
||||
return cm.webhookServer, true
|
||||
}()
|
||||
|
||||
// only add the server if *we ourselves* just registered it.
|
||||
// Add has its own lock, so just do this separately -- there shouldn't
|
||||
// be a "race" in this lock gap because the condition is the population
|
||||
// of cm.webhookServer, not anything to do with Add.
|
||||
if wasNew {
|
||||
if err := cm.Add(server); err != nil {
|
||||
panic("unable to add webhook server to the controller manager")
|
||||
}
|
||||
}
|
||||
return cm.webhookServer
|
||||
return server
|
||||
}
|
||||
|
||||
func (cm *controllerManager) GetLogger() logr.Logger {
|
||||
return cm.logger
|
||||
}
|
||||
|
||||
func (cm *controllerManager) serveMetrics(stop <-chan struct{}) {
|
||||
var metricsPath = "/metrics"
|
||||
handler := promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{
|
||||
ErrorHandling: promhttp.HTTPErrorOnError,
|
||||
})
|
||||
// TODO(JoelSpeed): Use existing Kubernetes machinery for serving metrics
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle(metricsPath, handler)
|
||||
mux.Handle(defaultMetricsEndpoint, handler)
|
||||
|
||||
func() {
|
||||
cm.mu.Lock()
|
||||
defer cm.mu.Unlock()
|
||||
|
||||
for path, extraHandler := range cm.metricsExtraHandlers {
|
||||
mux.Handle(path, extraHandler)
|
||||
}
|
||||
}()
|
||||
|
||||
server := http.Server{
|
||||
Handler: mux,
|
||||
}
|
||||
// Run the server
|
||||
go func() {
|
||||
log.Info("starting metrics server", "path", metricsPath)
|
||||
cm.startRunnable(RunnableFunc(func(stop <-chan struct{}) error {
|
||||
log.Info("starting metrics server", "path", defaultMetricsEndpoint)
|
||||
if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed {
|
||||
cm.errSignal.SignalError(err)
|
||||
return err
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}))
|
||||
|
||||
// Shutdown the server when stop is closed
|
||||
select {
|
||||
case <-stop:
|
||||
if err := server.Shutdown(context.Background()); err != nil {
|
||||
cm.errSignal.SignalError(err)
|
||||
}
|
||||
<-stop
|
||||
if err := server.Shutdown(cm.shutdownCtx); err != nil {
|
||||
cm.errChan <- err
|
||||
}
|
||||
}
|
||||
|
||||
func (cm *controllerManager) serveHealthProbes(stop <-chan struct{}) {
|
||||
// TODO(hypnoglow): refactor locking to use anonymous func in the similar way
|
||||
// it's done in serveMetrics.
|
||||
cm.mu.Lock()
|
||||
mux := http.NewServeMux()
|
||||
|
||||
@@ -383,29 +434,48 @@ func (cm *controllerManager) serveHealthProbes(stop <-chan struct{}) {
|
||||
Handler: mux,
|
||||
}
|
||||
// Run server
|
||||
go func() {
|
||||
cm.startRunnable(RunnableFunc(func(stop <-chan struct{}) error {
|
||||
if err := server.Serve(cm.healthProbeListener); err != nil && err != http.ErrServerClosed {
|
||||
cm.errSignal.SignalError(err)
|
||||
return err
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}))
|
||||
cm.healthzStarted = true
|
||||
cm.mu.Unlock()
|
||||
|
||||
// Shutdown the server when stop is closed
|
||||
select {
|
||||
case <-stop:
|
||||
if err := server.Shutdown(context.Background()); err != nil {
|
||||
cm.errSignal.SignalError(err)
|
||||
}
|
||||
<-stop
|
||||
if err := server.Shutdown(cm.shutdownCtx); err != nil {
|
||||
cm.errChan <- err
|
||||
}
|
||||
}
|
||||
|
||||
func (cm *controllerManager) Start(stop <-chan struct{}) error {
|
||||
// join the passed-in stop channel as an upstream feeding into cm.internalStopper
|
||||
defer close(cm.internalStopper)
|
||||
func (cm *controllerManager) Start(stop <-chan struct{}) (err error) {
|
||||
// This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request
|
||||
stopComplete := make(chan struct{})
|
||||
defer close(stopComplete)
|
||||
// This must be deferred after closing stopComplete, otherwise we deadlock
|
||||
defer func() {
|
||||
// https://hips.hearstapps.com/hmg-prod.s3.amazonaws.com/images/gettyimages-459889618-1533579787.jpg
|
||||
stopErr := cm.engageStopProcedure(stopComplete)
|
||||
if stopErr != nil {
|
||||
if err != nil {
|
||||
// Utilerrors.Aggregate allows to use errors.Is for all contained errors
|
||||
// whereas fmt.Errorf allows wrapping at most one error which means the
|
||||
// other one can not be found anymore.
|
||||
err = utilerrors.NewAggregate([]error{err, stopErr})
|
||||
} else {
|
||||
err = stopErr
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// initialize this here so that we reset the signal channel state on every start
|
||||
cm.errSignal = &errSignaler{errSignal: make(chan struct{})}
|
||||
// Everything that might write into this channel must be started in a new goroutine,
|
||||
// because otherwise we might block this routine trying to write into the full channel
|
||||
// and will not be able to enter the deferred cm.engageStopProcedure() which drains
|
||||
// it.
|
||||
cm.errChan = make(chan error)
|
||||
|
||||
// Metrics should be served whether the controller is leader or not.
|
||||
// (If we don't serve metrics for non-leaders, prometheus will still scrape
|
||||
@@ -421,25 +491,88 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
|
||||
|
||||
go cm.startNonLeaderElectionRunnables()
|
||||
|
||||
if cm.resourceLock != nil {
|
||||
err := cm.startLeaderElection()
|
||||
if err != nil {
|
||||
return err
|
||||
go func() {
|
||||
if cm.resourceLock != nil {
|
||||
err := cm.startLeaderElection()
|
||||
if err != nil {
|
||||
cm.errChan <- err
|
||||
}
|
||||
} else {
|
||||
// Treat not having leader election enabled the same as being elected.
|
||||
close(cm.elected)
|
||||
go cm.startLeaderElectionRunnables()
|
||||
}
|
||||
} else {
|
||||
go cm.startLeaderElectionRunnables()
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-stop:
|
||||
// We are done
|
||||
return nil
|
||||
case <-cm.errSignal.GotError():
|
||||
// Error starting a controller
|
||||
return cm.errSignal.Error()
|
||||
case err := <-cm.errChan:
|
||||
// Error starting or running a runnable
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// engageStopProcedure signals all runnables to stop, reads potential errors
|
||||
// from the errChan and waits for them to end. It must not be called more than once.
|
||||
func (cm *controllerManager) engageStopProcedure(stopComplete chan struct{}) error {
|
||||
var cancel context.CancelFunc
|
||||
if cm.gracefulShutdownTimeout > 0 {
|
||||
cm.shutdownCtx, cancel = context.WithTimeout(context.Background(), cm.gracefulShutdownTimeout)
|
||||
} else {
|
||||
cm.shutdownCtx, cancel = context.WithCancel(context.Background())
|
||||
}
|
||||
defer cancel()
|
||||
close(cm.internalStopper)
|
||||
// Start draining the errors before acquiring the lock to make sure we don't deadlock
|
||||
// if something that has the lock is blocked on trying to write into the unbuffered
|
||||
// channel after something else already wrote into it.
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case err, ok := <-cm.errChan:
|
||||
if ok {
|
||||
log.Error(err, "error received after stop sequence was engaged")
|
||||
}
|
||||
case <-stopComplete:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
if cm.gracefulShutdownTimeout == 0 {
|
||||
return nil
|
||||
}
|
||||
cm.mu.Lock()
|
||||
defer cm.mu.Unlock()
|
||||
cm.stopProcedureEngaged = true
|
||||
return cm.waitForRunnableToEnd(cm.shutdownCtx, cancel)
|
||||
}
|
||||
|
||||
// waitForRunnableToEnd blocks until all runnables ended or the
|
||||
// tearDownTimeout was reached. In the latter case, an error is returned.
|
||||
func (cm *controllerManager) waitForRunnableToEnd(ctx context.Context, cancel context.CancelFunc) error {
|
||||
defer cancel()
|
||||
|
||||
// Cancel leader election only after we waited. It will os.Exit() the app for safety.
|
||||
defer func() {
|
||||
if cm.leaderElectionCancel != nil {
|
||||
cm.leaderElectionCancel()
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
cm.waitForRunnable.Wait()
|
||||
cancel()
|
||||
}()
|
||||
|
||||
<-ctx.Done()
|
||||
if err := ctx.Err(); err != nil && err != context.Canceled {
|
||||
return fmt.Errorf("failed waiting for all runnables to end within grace period of %s: %w", cm.gracefulShutdownTimeout, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cm *controllerManager) startNonLeaderElectionRunnables() {
|
||||
cm.mu.Lock()
|
||||
defer cm.mu.Unlock()
|
||||
@@ -450,15 +583,7 @@ func (cm *controllerManager) startNonLeaderElectionRunnables() {
|
||||
for _, c := range cm.nonLeaderElectionRunnables {
|
||||
// Controllers block, but we want to return an error if any have an error starting.
|
||||
// Write any Start errors to a channel so we can return them
|
||||
ctrl := c
|
||||
go func() {
|
||||
if err := ctrl.Start(cm.internalStop); err != nil {
|
||||
cm.errSignal.SignalError(err)
|
||||
}
|
||||
// we use %T here because we don't have a good stand-in for "name",
|
||||
// and the full runnable might not serialize (mutexes, etc)
|
||||
log.V(1).Info("non-leader-election runnable finished", "runnable type", fmt.Sprintf("%T", ctrl))
|
||||
}()
|
||||
cm.startRunnable(c)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -472,15 +597,7 @@ func (cm *controllerManager) startLeaderElectionRunnables() {
|
||||
for _, c := range cm.leaderElectionRunnables {
|
||||
// Controllers block, but we want to return an error if any have an error starting.
|
||||
// Write any Start errors to a channel so we can return them
|
||||
ctrl := c
|
||||
go func() {
|
||||
if err := ctrl.Start(cm.internalStop); err != nil {
|
||||
cm.errSignal.SignalError(err)
|
||||
}
|
||||
// we use %T here because we don't have a good stand-in for "name",
|
||||
// and the full runnable might not serialize (mutexes, etc)
|
||||
log.V(1).Info("leader-election runnable finished", "runnable type", fmt.Sprintf("%T", ctrl))
|
||||
}()
|
||||
cm.startRunnable(c)
|
||||
}
|
||||
|
||||
cm.startedLeader = true
|
||||
@@ -495,19 +612,37 @@ func (cm *controllerManager) waitForCache() {
|
||||
if cm.startCache == nil {
|
||||
cm.startCache = cm.cache.Start
|
||||
}
|
||||
go func() {
|
||||
if err := cm.startCache(cm.internalStop); err != nil {
|
||||
cm.errSignal.SignalError(err)
|
||||
}
|
||||
}()
|
||||
cm.startRunnable(RunnableFunc(func(stop <-chan struct{}) error {
|
||||
return cm.startCache(stop)
|
||||
}))
|
||||
|
||||
// Wait for the caches to sync.
|
||||
// TODO(community): Check the return value and write a test
|
||||
cm.cache.WaitForCacheSync(cm.internalStop)
|
||||
// TODO: This should be the return value of cm.cache.WaitForCacheSync but we abuse
|
||||
// cm.started as check if we already started the cache so it must always become true.
|
||||
// Making sure that the cache doesn't get started twice is needed to not get a "close
|
||||
// of closed channel" panic
|
||||
cm.started = true
|
||||
}
|
||||
|
||||
func (cm *controllerManager) startLeaderElection() (err error) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cm.mu.Lock()
|
||||
cm.leaderElectionCancel = cancel
|
||||
cm.mu.Unlock()
|
||||
|
||||
if cm.onStoppedLeading == nil {
|
||||
cm.onStoppedLeading = func() {
|
||||
// Make sure graceful shutdown is skipped if we lost the leader lock without
|
||||
// intending to.
|
||||
cm.gracefulShutdownTimeout = time.Duration(0)
|
||||
// Most implementations of leader election log.Fatal() here.
|
||||
// Since Start is wrapped in log.Fatal when called, we can just return
|
||||
// an error here which will cause the program to exit.
|
||||
cm.errChan <- errors.New("leader election lost")
|
||||
}
|
||||
}
|
||||
l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
|
||||
Lock: cm.resourceLock,
|
||||
LeaseDuration: cm.leaseDuration,
|
||||
@@ -515,30 +650,31 @@ func (cm *controllerManager) startLeaderElection() (err error) {
|
||||
RetryPeriod: cm.retryPeriod,
|
||||
Callbacks: leaderelection.LeaderCallbacks{
|
||||
OnStartedLeading: func(_ context.Context) {
|
||||
close(cm.elected)
|
||||
cm.startLeaderElectionRunnables()
|
||||
},
|
||||
OnStoppedLeading: func() {
|
||||
// Most implementations of leader election log.Fatal() here.
|
||||
// Since Start is wrapped in log.Fatal when called, we can just return
|
||||
// an error here which will cause the program to exit.
|
||||
cm.errSignal.SignalError(fmt.Errorf("leader election lost"))
|
||||
},
|
||||
OnStoppedLeading: cm.onStoppedLeading,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go func() {
|
||||
select {
|
||||
case <-cm.internalStop:
|
||||
cancel()
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}()
|
||||
|
||||
// Start the leader elector process
|
||||
go l.Run(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cm *controllerManager) Elected() <-chan struct{} {
|
||||
return cm.elected
|
||||
}
|
||||
|
||||
func (cm *controllerManager) startRunnable(r Runnable) {
|
||||
cm.waitForRunnable.Add(1)
|
||||
go func() {
|
||||
defer cm.waitForRunnable.Done()
|
||||
if err := r.Start(cm.internalStop); err != nil {
|
||||
cm.errChan <- err
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user