Skip to content

Commit

Permalink
xds: Cleanup CDS balancer code and tests. (#3916)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Oct 6, 2020
1 parent 9a3c02f commit c073660
Show file tree
Hide file tree
Showing 2 changed files with 229 additions and 147 deletions.
50 changes: 23 additions & 27 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@ type cdsBB struct{}
// Build creates a new CDS balancer with the ClientConn.
func (cdsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
b := &cdsBalancer{
cc: cc,
bOpts: opts,
updateCh: buffer.NewUnbounded(),
closed: grpcsync.NewEvent(),
cc: cc,
bOpts: opts,
updateCh: buffer.NewUnbounded(),
closed: grpcsync.NewEvent(),
cancelWatch: func() {}, // No-op at this point.
}
b.logger = prefixLogger((b))
b.logger.Infof("Created")
Expand Down Expand Up @@ -158,22 +159,15 @@ type cdsBalancer struct {
// run is a long-running goroutine which handles all updates from gRPC. All
// methods which are invoked directly by gRPC or xdsClient simply push an
// update onto a channel which is read and acted upon right here.
//
// 1. Good clientConn updates lead to registration of a CDS watch. Updates with
// error lead to cancellation of existing watch and propagation of the same
// error to the edsBalancer.
// 2. SubConn updates are passthrough and are simply handed over to the
// underlying edsBalancer.
// 3. Watch API updates lead to clientConn updates being invoked on the
// underlying edsBalancer.
// 4. Close results in cancellation of the CDS watch and closing of the
// underlying edsBalancer and is the only way to exit this goroutine.
func (b *cdsBalancer) run() {
for {
select {
case u := <-b.updateCh.Get():
b.updateCh.Load()
switch update := u.(type) {
// Good clientConn updates lead to registration of a CDS watch.
// Updates with error lead to cancellation of existing watch and
// propagation of the same error to the edsBalancer.
case *ccUpdate:
// We first handle errors, if any, and then proceed with handling
// the update, only if the status quo has changed.
Expand All @@ -187,9 +181,7 @@ func (b *cdsBalancer) run() {
// Since the cdsBalancer doesn't own the xdsClient object, we
// don't have to bother about closing the old client here, but
// we still need to cancel the watch on the old client.
if b.cancelWatch != nil {
b.cancelWatch()
}
b.cancelWatch()
b.client = update.client
}
if update.clusterName != "" {
Expand All @@ -201,12 +193,18 @@ func (b *cdsBalancer) run() {
}
b.clusterToWatch = update.clusterName
}

// SubConn updates are passthrough and are simply handed over to the
// underlying edsBalancer.
case *scUpdate:
if b.edsLB == nil {
b.logger.Errorf("xds: received scUpdate {%+v} with no edsBalancer", update)
break
}
b.edsLB.UpdateSubConnState(update.subConn, update.state)

// Watch API updates lead to clientConn updates being invoked on the
// underlying edsBalancer.
case *watchUpdate:
if err := update.err; err != nil {
b.logger.Warningf("Watch error from xds-client %p: %v", b.client, err)
Expand Down Expand Up @@ -243,11 +241,13 @@ func (b *cdsBalancer) run() {
b.logger.Errorf("xds: edsBalancer.UpdateClientConnState(%+v) returned error: %v", ccState, err)
}
}

// Close results in cancellation of the CDS watch and closing of the
// underlying edsBalancer and is the only way to exit this goroutine.
case <-b.closed.Done():
if b.cancelWatch != nil {
b.cancelWatch()
b.cancelWatch = nil
}
b.cancelWatch()
b.cancelWatch = func() {}

if b.edsLB != nil {
b.edsLB.Close()
b.edsLB = nil
Expand Down Expand Up @@ -282,11 +282,8 @@ func (b *cdsBalancer) handleErrorFromUpdate(err error, fromParent bool) {
//
// This is not necessary today, because xds client never sends connection
// errors.

if fromParent && xdsclient.ErrType(err) == xdsclient.ErrorTypeResourceNotFound {
if b.cancelWatch != nil {
b.cancelWatch()
}
b.cancelWatch()
}
if b.edsLB != nil {
b.edsLB.ResolverError(err)
Expand Down Expand Up @@ -319,7 +316,7 @@ func (b *cdsBalancer) UpdateClientConnState(state balancer.ClientConnState) erro
return errBalancerClosed
}

b.logger.Infof("Receive update from resolver, balancer config: %+v", state.BalancerConfig)
b.logger.Infof("Received update from resolver, balancer config: %+v", state.BalancerConfig)
// The errors checked here should ideally never happen because the
// ServiceConfig in this case is prepared by the xdsResolver and is not
// something that is received on the wire.
Expand Down Expand Up @@ -352,7 +349,6 @@ func (b *cdsBalancer) ResolverError(err error) {
b.logger.Warningf("xds: received resolver error {%v} after cdsBalancer was closed", err)
return
}

b.updateCh.Put(&ccUpdate{err: err})
}

Expand Down
Loading

0 comments on commit c073660

Please sign in to comment.