Files
kubesphere/vendor/github.com/projectcalico/libcalico-go/lib/backend/etcdv3/watcher.go
2019-08-17 15:34:02 +08:00

213 lines
6.5 KiB
Go

// Copyright (c) 2016-2019 Tigera, Inc. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdv3
import (
"context"
goerrors "errors"
"strconv"
"sync/atomic"
"github.com/coreos/etcd/clientv3"
log "github.com/sirupsen/logrus"
"github.com/projectcalico/libcalico-go/lib/backend/api"
"github.com/projectcalico/libcalico-go/lib/backend/model"
"github.com/projectcalico/libcalico-go/lib/errors"
)
const (
resultsBufSize = 100
)
// Watch entries in the datastore matching the resources specified by the ListInterface.
func (c *etcdV3Client) Watch(cxt context.Context, l model.ListInterface, revision string) (api.WatchInterface, error) {
var rev int64
if len(revision) != 0 {
var err error
rev, err = strconv.ParseInt(revision, 10, 64)
if err != nil {
return nil, err
}
}
wc := &watcher{
client: c,
list: l,
initialRev: rev,
resultChan: make(chan api.WatchEvent, resultsBufSize),
}
wc.ctx, wc.cancel = context.WithCancel(cxt)
go wc.watchLoop()
return wc, nil
}
// watcher implements watch.Interface.
type watcher struct {
client *etcdV3Client
initialRev int64
ctx context.Context
cancel context.CancelFunc
resultChan chan api.WatchEvent
list model.ListInterface
terminated uint32
}
// Stop stops the watcher and releases associated resources.
// This calls through to the context cancel function.
func (wc *watcher) Stop() {
wc.cancel()
}
// ResultChan returns a channel used to receive WatchEvents.
func (wc *watcher) ResultChan() <-chan api.WatchEvent {
return wc.resultChan
}
// HasTerminated returns true when the watcher has completed termination processing.
func (wc *watcher) HasTerminated() bool {
return atomic.LoadUint32(&wc.terminated) != 0
}
// watchLoop starts a watch on the required path prefix and sends a stream of
// event updates for internal processing.
func (wc *watcher) watchLoop() {
// When this loop exits, make sure we terminate the watcher resources.
defer wc.terminateWatcher()
log.Debug("Starting watcher.watchLoop")
if wc.initialRev == 0 {
// No initial revision supplied, so perform a list of current configuration
// which will also get the current revision we will start our watch from.
if err := wc.listCurrent(); err != nil {
log.Errorf("failed to list current with latest state: %v", err)
wc.sendError(err, true)
return
}
}
// If we are not watching a specific resource then this is a prefix watch.
logCxt := log.WithField("list", wc.list)
key, opts := calculateListKeyAndOptions(logCxt, wc.list)
opts = append(opts, clientv3.WithRev(wc.initialRev+1), clientv3.WithPrevKV())
logCxt = logCxt.WithFields(log.Fields{
"etcdv3-etcdKey": key,
"rev": wc.initialRev,
})
logCxt.Debug("Starting etcdv3 watch")
wch := wc.client.etcdClient.Watch(wc.ctx, key, opts...)
for wres := range wch {
if wres.Err() != nil {
// A watch channel error is a terminating event, so exit the loop.
err := wres.Err()
log.WithError(err).Error("Watch channel error")
wc.sendError(err, true)
return
}
for _, e := range wres.Events {
// Convert the etcdv3 event to the equivalent Watcher event. An error
// parsing the event is returned as an error, but don't exit the watcher as
// restarting the watcher is unlikely to fix the conversion error.
if ae, err := convertWatchEvent(e, wc.list); ae != nil {
wc.sendEvent(ae)
} else if err != nil {
wc.sendError(err, false)
}
}
}
// If we exit the loop, it means the watcher has closed for some reason.
// Bubble this up as a watch termination error.
log.Warn("etcdv3 watch channel closed")
wc.sendError(goerrors.New("etcdv3 watch channel closed"), true)
}
// listCurrent retrieves the existing entries and sends an event for each listed
func (wc *watcher) listCurrent() error {
log.Info("Performing initial list with no revision")
list, err := wc.client.List(wc.ctx, wc.list, "")
if err != nil {
return err
}
wc.initialRev, err = strconv.ParseInt(list.Revision, 10, 64)
if err != nil {
log.WithError(err).Error("List returned revision that could not be parsed")
return err
}
// We are sending an initial sync of entries to the watcher to provide current
// state. To the perspective of the watcher, these are added entries, so set the
// event type to WatchAdded.
log.WithField("NumEntries", len(list.KVPairs)).Debug("Sending create events for each existing entry")
for _, kv := range list.KVPairs {
wc.sendEvent(&api.WatchEvent{
Type: api.WatchAdded,
New: kv,
})
}
return nil
}
// terminateWatcher terminates the resources associated with the watcher.
func (wc *watcher) terminateWatcher() {
log.Debug("Terminating etcdv3 watcher")
// Cancel the context - which will cancel the etcd Watch, this may have already been
// cancelled through an explicit Stop, but it is fine to cancel multiple times.
wc.cancel()
// Close the results channel.
close(wc.resultChan)
// Increment the terminated counter using a goroutine safe operation.
atomic.AddUint32(&wc.terminated, 1)
}
// sendError packages up the error as an event and sends it in the results channel.
func (wc *watcher) sendError(err error, terminating bool) {
// The response from etcd commands may include a context.Canceled error if the context
// was cancelled before completion. Since with our Watcher we don't include that as
// an error type skip over the Canceled error, the error processing in the main
// watch thread will terminate the watcher.
if err == context.Canceled {
return
}
// If this is a terminating error, wrap the error up in an errors.ErrorWatchTerminated
// error type.
if terminating {
err = errors.ErrorWatchTerminated{Err: err}
}
// Wrap the error up in a WatchEvent and use sendEvent to send it.
errEvent := &api.WatchEvent{
Type: api.WatchError,
Error: err,
}
wc.sendEvent(errEvent)
}
// sendEvent sends an event in the results channel.
func (wc *watcher) sendEvent(e *api.WatchEvent) {
if len(wc.resultChan) == resultsBufSize {
log.Warningf("Watch events backing up: %d events", resultsBufSize)
}
select {
case wc.resultChan <- *e:
case <-wc.ctx.Done():
}
}