From 3b3a2e92504f70f37fc157079f69f2a01848431a Mon Sep 17 00:00:00 2001 From: Antoine Tollenaere Date: Thu, 18 Jul 2024 09:59:43 +0200 Subject: [PATCH 1/3] ringhash: fix bug where ring hash can be stuck in transient failure Fixes #7363, which has a thorough problem description. The test cases added reproduce the error without the fix, but not reliably, since the ring may be constructed in a way where we don't end up getting stuck. Running the test multiple times definitely end up triggering the problem. The solution is to keep a separate list of available addresses, and when there are no picks and we trigger connection attempts, try them one at a time. RELEASE NOTES: ringhash: fix bug that could prevent the balancer to recover from transient failure. --- internal/testutils/blocking_context_dialer.go | 10 + .../testutils/blocking_context_dialer_test.go | 13 + .../ringhash/e2e/ringhash_balancer_test.go | 446 ++++++++++++++++++ xds/internal/balancer/ringhash/picker.go | 25 - xds/internal/balancer/ringhash/ringhash.go | 41 +- 5 files changed, 493 insertions(+), 42 deletions(-) diff --git a/internal/testutils/blocking_context_dialer.go b/internal/testutils/blocking_context_dialer.go index 32dfabf3609b..f99a1a87dd3d 100644 --- a/internal/testutils/blocking_context_dialer.go +++ b/internal/testutils/blocking_context_dialer.go @@ -127,3 +127,13 @@ func (h *Hold) Fail(err error) { h.blockCh <- err close(h.blockCh) } + +// IsStarted returns true if this hold has received a connection attempt. +func (h *Hold) IsStarted() bool { + select { + case <-h.waitCh: + return true + default: + return false + } +} diff --git a/internal/testutils/blocking_context_dialer_test.go b/internal/testutils/blocking_context_dialer_test.go index d2b5954148d3..eb5d418d4baf 100644 --- a/internal/testutils/blocking_context_dialer_test.go +++ b/internal/testutils/blocking_context_dialer_test.go @@ -59,6 +59,10 @@ func (s) TestBlockingDialer_HoldWaitResume(t *testing.T) { d := NewBlockingDialer() h := d.Hold(lis.Addr().String()) + if h.IsStarted() { + t.Fatalf("hold.IsStarted() = true, want false") + } + done := make(chan struct{}) ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() @@ -69,6 +73,10 @@ func (s) TestBlockingDialer_HoldWaitResume(t *testing.T) { t.Errorf("BlockingDialer.DialContext() got error: %v, want success", err) return } + + if !h.IsStarted() { + t.Errorf("hold.IsStarted() = false, want true") + } conn.Close() }() @@ -76,6 +84,11 @@ func (s) TestBlockingDialer_HoldWaitResume(t *testing.T) { if !h.Wait(ctx) { t.Fatalf("Timeout while waiting for a connection attempt to %q", h.addr) } + + if !h.IsStarted() { + t.Errorf("hold.IsStarted() = false, want true") + } + select { case <-done: t.Fatalf("Expected dialer to be blocked.") diff --git a/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go b/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go index df6b84fbfb69..47762acfc877 100644 --- a/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go +++ b/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go @@ -20,6 +20,7 @@ package ringhash_test import ( "context" + "errors" "fmt" "math" "math/rand" @@ -1566,3 +1567,448 @@ func (s) TestRingHash_ReattemptWhenGoingFromTransientFailureToIdle(t *testing.T) t.Errorf("conn.GetState(): got %v, want %v", got, want) } } + +// Tests that when all backends are down and then up, we may pick a TF backend +// and we will then jump to ready backend. +func (s) TestRingHash_TransientFailureSkipToAvailableReady(t *testing.T) { + emptyCallF := func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil + } + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("Failed to create listener: %v", err) + } + restartableListener1 := testutils.NewRestartableListener(lis) + restartableServer1 := stubserver.StartTestService(t, &stubserver.StubServer{ + Listener: restartableListener1, + EmptyCallF: emptyCallF, + }) + defer restartableServer1.Stop() + + lis, err = testutils.LocalTCPListener() + if err != nil { + t.Fatalf("Failed to create listener: %v", err) + } + restartableListener2 := testutils.NewRestartableListener(lis) + restartableServer2 := stubserver.StartTestService(t, &stubserver.StubServer{ + Listener: restartableListener2, + EmptyCallF: emptyCallF, + }) + defer restartableServer2.Stop() + + nonExistantBackends := makeNonExistentBackends(t, 2) + + const clusterName = "cluster" + backends := []string{restartableServer1.Address, restartableServer2.Address} + backends = append(backends, nonExistantBackends...) + endpoints := endpointResource(t, clusterName, backends) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + Policy: e2e.LoadBalancingPolicyRingHash, + }) + route := headerHashRoute("new_route", virtualHostName, clusterName, "address_hash") + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + opts := []grpc.DialOption{ + grpc.WithConnectParams(grpc.ConnectParams{ + // Disable backoff to speed up the test. + MinConnectTimeout: 100 * time.Millisecond, + }), + grpc.WithResolvers(xdsResolver), + grpc.WithTransportCredentials(insecure.NewCredentials()), + } + conn, err := grpc.NewClient("xds:///test.server", opts...) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + if got, want := conn.GetState(), connectivity.Idle; got != want { + t.Errorf("conn.GetState(): got %v, want %v", got, want) + } + + // Test starts with backends not listening. + restartableListener1.Stop() + restartableListener2.Stop() + + // Send a request with a hash that should go to restartableServer1. + // Because it is not accepting connections, and no other backend is + // listening, the RPC fails. + ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("address_hash", restartableServer1.Address+"_0")) + _, err = client.EmptyCall(ctx, &testpb.Empty{}) + if err == nil { + t.Errorf("rpc EmptyCall() succeeded, want error") + } + + if got, want := conn.GetState(), connectivity.TransientFailure; got != want { + t.Errorf("conn.GetState(): got %v, want %v", got, want) + } + + // Bring up first backend. The channel should become Ready without any + // picks, because in TF, we are always trying to connect to at least one + // backend at all times. + restartableListener1.Restart() + testutils.AwaitState(ctx, t, conn, connectivity.Ready) + + // Bring down backend 1 and bring up backend 2. + // Note the RPC contains a header value that will always be hashed to + // backend 1. So by purposely bringing down backend 1 and bringing up + // another backend, this will ensure Picker's first choice of backend 1 + // fails and it will go through the remaining subchannels to find one in + // READY. Since the entries in the ring are pretty distributed and we have + // unused ports to fill the ring, it is almost guaranteed that the Picker + // will go through some non-READY entries and skip them as per design. + t.Logf("bringing down backend 1") + restartableListener1.Stop() + + testutils.AwaitState(ctx, t, conn, connectivity.TransientFailure) + _, err = client.EmptyCall(ctx, &testpb.Empty{}) + if err == nil { + t.Errorf("rpc EmptyCall() succeeded, want error") + } + + t.Logf("bringing up backend 2") + restartableListener2.Restart() + testutils.AwaitState(ctx, t, conn, connectivity.Ready) + + peerAddr := "" + for peerAddr != restartableServer2.Address { + p := peer.Peer{} + _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&p)) + if errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("Timed out waiting for rpc EmptyCall() to be routed to the expected backend") + } + peerAddr = p.Addr.String() + } +} + +// Tests that when all backends are down, we keep reattempting. +func (s) TestRingHash_ReattemptWhenAllEndpointsUnreachable(t *testing.T) { + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("Failed to create listener: %v", err) + } + restartableListener := testutils.NewRestartableListener(lis) + restartableServer := stubserver.StartTestService(t, &stubserver.StubServer{ + Listener: restartableListener, + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil + }, + }) + defer restartableServer.Stop() + + const clusterName = "cluster" + endpoints := endpointResource(t, clusterName, []string{restartableServer.Address}) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + Policy: e2e.LoadBalancingPolicyRingHash, + }) + route := headerHashRoute("new_route", virtualHostName, clusterName, "address_hash") + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + dopts := []grpc.DialOption{ + grpc.WithResolvers(xdsResolver), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithConnectParams(fastConnectParams), + } + conn, err := grpc.NewClient("xds:///test.server", dopts...) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + if got, want := conn.GetState(), connectivity.Idle; got != want { + t.Errorf("conn.GetState(): got %v, want %v", got, want) + } + + t.Log("Stopping the backend server") + restartableListener.Stop() + + _, err = client.EmptyCall(ctx, &testpb.Empty{}) + if err == nil || status.Code(err) != codes.Unavailable { + t.Errorf("rpc EmptyCall() succeeded, want Unavailable error") + } + + // Wait for channel to fail. + testutils.AwaitState(ctx, t, conn, connectivity.TransientFailure) + + t.Log("Restarting the backend server") + restartableListener.Restart() + + // Wait for channel to become connected without any pending RPC. + testutils.AwaitState(ctx, t, conn, connectivity.Ready) +} + +// Tests that when a backend goes down, we will move on to the next subchannel +// (with a lower priority). When the backend comes back up, traffic will move +// back. +func (s) TestRingHash_SwitchToLowerPriorityAndThenBack(t *testing.T) { + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("Failed to create listener: %v", err) + } + restartableListener := testutils.NewRestartableListener(lis) + restartableServer := stubserver.StartTestService(t, &stubserver.StubServer{ + Listener: restartableListener, + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil + }, + }) + defer restartableServer.Stop() + + otherBackend := startTestServiceBackends(t, 1)[0] + + // We must set the host name socket address in EDS, as the ring hash policy + // uses it to construct the ring. + host, _, err := net.SplitHostPort(otherBackend) + if err != nil { + t.Fatalf("Failed to split host and port from stubserver: %v", err) + } + + const clusterName = "cluster" + endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: clusterName, + Host: host, + Localities: []e2e.LocalityOptions{{ + Backends: backendOptions(t, []string{restartableServer.Address}), + Weight: 1, + }, { + Backends: backendOptions(t, []string{otherBackend}), + Weight: 1, + Priority: 1, + }}}) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + Policy: e2e.LoadBalancingPolicyRingHash, + }) + route := headerHashRoute("new_route", virtualHostName, clusterName, "address_hash") + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + dopts := []grpc.DialOption{ + grpc.WithResolvers(xdsResolver), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithConnectParams(fastConnectParams), + } + conn, err := grpc.NewClient("xds:///test.server", dopts...) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + // Note each type of RPC contains a header value that will always be hashed + // to the value that was used to place the non-existent endpoint on the ring. + ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("address_hash", restartableServer.Address+"_0")) + var got string + for got = range checkRPCSendOK(ctx, t, client, 1) { + } + if want := restartableServer.Address; got != want { + t.Errorf("Got RPC routed to addr %v, want %v", got, want) + } + + // Trigger failure with the existing backend, which should cause the + // balancer to go in transient failure and the priority balancer to move + // to the lower priority. + restartableListener.Stop() + + for { + p := peer.Peer{} + _, err = client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)) + + // Ignore errors: we may need to attempt to send an RPC to detect the + // failure (the next write on connection fails). + if err == nil { + if got, want := p.Addr.String(), otherBackend; got != want { + t.Errorf("Got RPC routed to addr %v, want %v", got, want) + } + break + } + } + + // Now we start the backend with the address hash that is used in the + // metadata, so eventually RPCs should be routed to it, since it is in a + // locality with higher priority. + peerAddr := "" + restartableListener.Restart() + for peerAddr != restartableServer.Address { + p := peer.Peer{} + _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&p)) + if errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("Timed out waiting for rpc EmptyCall() to be routed to the expected backend") + } + peerAddr = p.Addr.String() + } +} + +// Tests that when we trigger internal connection attempts without picks, we do +// so for only one subchannel at a time. +func (s) TestRingHash_ContinuesConnectingWithoutPicksOneSubchannelAtATime(t *testing.T) { + backends := startTestServiceBackends(t, 1) + nonExistantBackends := makeNonExistentBackends(t, 3) + + const clusterName = "cluster" + + endpoints := endpointResource(t, clusterName, append(nonExistantBackends, backends...)) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + Policy: e2e.LoadBalancingPolicyRingHash, + }) + route := headerHashRoute("new_route", virtualHostName, clusterName, "address_hash") + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + dialer := testutils.NewBlockingDialer() + dialOpts := []grpc.DialOption{ + grpc.WithResolvers(xdsResolver), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithContextDialer(dialer.DialContext), + grpc.WithConnectParams(fastConnectParams), + } + conn, err := grpc.NewClient("xds:///test.server", dialOpts...) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + holdNonExistant0 := dialer.Hold(nonExistantBackends[0]) + holdNonExistant1 := dialer.Hold(nonExistantBackends[1]) + holdNonExistant2 := dialer.Hold(nonExistantBackends[2]) + holdGood := dialer.Hold(backends[0]) + + rpcCtx, rpcCancel := context.WithCancel(ctx) + go func() { + rpcCtx = metadata.NewOutgoingContext(rpcCtx, metadata.Pairs("address_hash", nonExistantBackends[0]+"_0")) // XXX + _, err := client.EmptyCall(rpcCtx, &testpb.Empty{}) + if status.Code(err) != codes.Canceled { + t.Errorf("Expected RPC to be canceled, got error: %v", err) + } + }() + + // Wait for the RPC to trigger a connection attempt to the first address, + // then cancel the RPC. No other connection attempts should be started yet. + if !holdNonExistant0.Wait(ctx) { + t.Fatalf("Timeout waiting for connection attempt to backend 0") + } + rpcCancel() + + // Allow the connection attempt to the first address to resume and wait for + // the attempt for the second address. No other connection attempts should + // be started yet. + if holdNonExistant1.IsStarted() { + t.Errorf("Got connection attempt to backend 1, expected no connection attempt.") + } + if holdNonExistant2.IsStarted() { + t.Errorf("Got connection attempt to backend 2, expected no connection attempt.") + } + if holdGood.IsStarted() { + t.Errorf("Got connection attempt to good backend, expected no connection attempt.") + } + + // Allow the connection attempt to the first address to resume and wait for + // the attempt for the second address. No other connection attempts should + // be started yet. + holdNonExistant0Again := dialer.Hold(nonExistantBackends[0]) + holdNonExistant0.Resume() + if !holdNonExistant1.Wait(ctx) { + t.Fatalf("Timeout waiting for connection attempt to backend 1") + } + if holdNonExistant0Again.IsStarted() { + t.Errorf("Got connection attempt to backend 0 again, expected no connection attempt.") + } + if holdNonExistant2.IsStarted() { + t.Errorf("Got connection attempt to backend 2, expected no connection attempt.") + } + if holdGood.IsStarted() { + t.Errorf("Got connection attempt to good backend, expected no connection attempt.") + } + + // Allow the connection attempt to the second address to resume and wait for + // the attempt for the third address. No other connection attempts should + // be started yet. + holdNonExistant1Again := dialer.Hold(nonExistantBackends[1]) + holdNonExistant1.Resume() + if !holdNonExistant2.Wait(ctx) { + t.Fatalf("Timeout waiting for connection attempt to backend 2") + } + if holdNonExistant0Again.IsStarted() { + t.Errorf("Got connection attempt to backend 0 again, expected no connection attempt.") + } + if holdNonExistant1Again.IsStarted() { + t.Errorf("Got connection attempt to backend 1 again, expected no connection attempt.") + } + if holdGood.IsStarted() { + t.Errorf("Got connection attempt to good backend, expected no connection attempt.") + } + + // Allow the connection attempt to the third address to resume and wait + // for the attempt for the final address. No other connection attempts + // should be started yet. + holdNonExistant2Again := dialer.Hold(nonExistantBackends[2]) + holdNonExistant2.Resume() + if !holdGood.Wait(ctx) { + t.Fatalf("Timeout waiting for connection attempt to good backend") + } + if holdNonExistant0Again.IsStarted() { + t.Errorf("Got connection attempt to backend 0 again, expected no connection attempt.") + } + if holdNonExistant1Again.IsStarted() { + t.Errorf("Got connection attempt to backend 1 again, expected no connection attempt.") + } + if holdNonExistant2Again.IsStarted() { + t.Errorf("Got connection attempt to backend 2 again, expected no connection attempt.") + } + + // Allow the final attempt to resume. + holdGood.Resume() + + // Wait for channel to become connected without any pending RPC. + testutils.AwaitState(ctx, t, conn, connectivity.Ready) + + // No other connection attempts should have been started + if holdNonExistant0Again.IsStarted() { + t.Errorf("Got connection attempt to backend 0 again, expected no connection attempt.") + } + if holdNonExistant1Again.IsStarted() { + t.Errorf("Got connection attempt to backend 1 again, expected no connection attempt.") + } + if holdNonExistant2Again.IsStarted() { + t.Errorf("Got connection attempt to backend 2 again, expected no connection attempt.") + } +} diff --git a/xds/internal/balancer/ringhash/picker.go b/xds/internal/balancer/ringhash/picker.go index b450716fa0f0..5ce72caded48 100644 --- a/xds/internal/balancer/ringhash/picker.go +++ b/xds/internal/balancer/ringhash/picker.go @@ -159,28 +159,3 @@ func nextSkippingDuplicates(ring *ring, entry *ringEntry) *ringEntry { // There's no qualifying next entry. return nil } - -// nextSkippingDuplicatesSubConn finds the next subconn in the ring, that's -// different from the given subconn. -func nextSkippingDuplicatesSubConn(ring *ring, sc *subConn) *subConn { - var entry *ringEntry - for _, it := range ring.items { - if it.sc == sc { - entry = it - break - } - } - if entry == nil { - // If the given subconn is not in the ring (e.g. it was deleted), return - // the first one. - if len(ring.items) > 0 { - return ring.items[0].sc - } - return nil - } - ee := nextSkippingDuplicates(ring, entry) - if ee == nil { - return nil - } - return ee.sc -} diff --git a/xds/internal/balancer/ringhash/ringhash.go b/xds/internal/balancer/ringhash/ringhash.go index e63c6f653904..c2802b24253a 100644 --- a/xds/internal/balancer/ringhash/ringhash.go +++ b/xds/internal/balancer/ringhash/ringhash.go @@ -46,10 +46,11 @@ type bb struct{} func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { b := &ringhashBalancer{ - cc: cc, - subConns: resolver.NewAddressMap(), - scStates: make(map[balancer.SubConn]*subConn), - csEvltr: &connectivityStateEvaluator{}, + cc: cc, + subConns: resolver.NewAddressMap(), + scStates: make(map[balancer.SubConn]*subConn), + csEvltr: &connectivityStateEvaluator{}, + orderedSubConns: make([]*subConn, 0), } b.logger = prefixLogger(b) b.logger.Infof("Created") @@ -197,6 +198,14 @@ type ringhashBalancer struct { resolverErr error // the last error reported by the resolver; cleared on successful resolution connErr error // the last connection error; cleared upon leaving TransientFailure + + // orderedSubConns contains the list of subconns in the order that addresses + // appear from the resolver. Together with lastInternallyTriggeredSCIndex, + // this allows triggering connection attempts to all SubConns independently + // of the order they appear on the ring. Always in sync with ring and + // subConns. The index is reset when addresses change. + orderedSubConns []*subConn + lastInternallyTriggeredSCIndex int } // updateAddresses creates new SubConns and removes SubConns, based on the @@ -214,6 +223,10 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool { var addrsUpdated bool // addrsSet is the set converted from addrs, used for quick lookup. addrsSet := resolver.NewAddressMap() + + b.orderedSubConns = b.orderedSubConns[:0] // reuse the underlying array. + b.lastInternallyTriggeredSCIndex = 0 + for _, addr := range addrs { addrsSet.Set(addr, true) newWeight := getWeightAttribute(addr) @@ -234,6 +247,7 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool { b.state = b.csEvltr.recordTransition(connectivity.Shutdown, connectivity.Idle) b.subConns.Set(addr, scs) b.scStates[sc] = scs + b.orderedSubConns = append(b.orderedSubConns, scs) addrsUpdated = true } else { // We have seen this address before and created a subConn for it. If the @@ -247,6 +261,7 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool { if oldWeight := scInfo.weight; oldWeight != newWeight { scInfo.weight = newWeight b.subConns.Set(addr, scInfo) + b.orderedSubConns = append(b.orderedSubConns, scInfo) // Return true to force recreation of the ring. addrsUpdated = true } @@ -399,19 +414,11 @@ func (b *ringhashBalancer) updateSubConnState(sc balancer.SubConn, state balance return } } - // Trigger a SubConn (this updated SubConn's next SubConn in the ring) - // to connect if nobody is attempting to connect. - sc := nextSkippingDuplicatesSubConn(b.ring, scs) - if sc != nil { - sc.queueConnect() - return - } - // This handles the edge case where we have a single subConn in the - // ring. nextSkippingDuplicatesSubCon() would have returned nil. We - // still need to ensure that some subConn is attempting to connect, in - // order to give the LB policy a chance to move out of - // TRANSIENT_FAILURE. Hence, we try connecting on the current subConn. - scs.queueConnect() + + // Trigger a SubConn (the next in the order addresses appear in the + // resolver) to connect if nobody is attempting to connect. + b.lastInternallyTriggeredSCIndex = (b.lastInternallyTriggeredSCIndex + 1) % len(b.orderedSubConns) + b.orderedSubConns[b.lastInternallyTriggeredSCIndex].queueConnect() } } From dd30bc845172b11b12eabfff066998dd7d8311e8 Mon Sep 17 00:00:00 2001 From: Antoine Tollenaere Date: Tue, 13 Aug 2024 09:21:41 +0200 Subject: [PATCH 2/3] feedback from arvind --- xds/internal/balancer/ringhash/ringhash.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/xds/internal/balancer/ringhash/ringhash.go b/xds/internal/balancer/ringhash/ringhash.go index c2802b24253a..8cc7fd53400b 100644 --- a/xds/internal/balancer/ringhash/ringhash.go +++ b/xds/internal/balancer/ringhash/ringhash.go @@ -225,7 +225,6 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool { addrsSet := resolver.NewAddressMap() b.orderedSubConns = b.orderedSubConns[:0] // reuse the underlying array. - b.lastInternallyTriggeredSCIndex = 0 for _, addr := range addrs { addrsSet.Set(addr, true) @@ -258,10 +257,10 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool { // since *only* the weight attribute has changed, and that does not affect // subConn uniqueness. scInfo := val.(*subConn) + b.orderedSubConns = append(b.orderedSubConns, scInfo) if oldWeight := scInfo.weight; oldWeight != newWeight { scInfo.weight = newWeight b.subConns.Set(addr, scInfo) - b.orderedSubConns = append(b.orderedSubConns, scInfo) // Return true to force recreation of the ring. addrsUpdated = true } @@ -279,6 +278,9 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool { // The entry will be deleted in updateSubConnState. } } + if addrsUpdated { + b.lastInternallyTriggeredSCIndex = 0 + } return addrsUpdated } From 67181a16dca13c0d83a344c0580e58b85109fc24 Mon Sep 17 00:00:00 2001 From: Antoine Tollenaere Date: Fri, 16 Aug 2024 09:14:48 +0200 Subject: [PATCH 3/3] comments from easwars --- .../ringhash/e2e/ringhash_balancer_test.go | 57 +++++++++---------- 1 file changed, 26 insertions(+), 31 deletions(-) diff --git a/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go b/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go index 47762acfc877..cf10d23043b4 100644 --- a/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go +++ b/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go @@ -1632,9 +1632,7 @@ func (s) TestRingHash_TransientFailureSkipToAvailableReady(t *testing.T) { defer conn.Close() client := testgrpc.NewTestServiceClient(conn) - if got, want := conn.GetState(), connectivity.Idle; got != want { - t.Errorf("conn.GetState(): got %v, want %v", got, want) - } + testutils.AwaitState(ctx, t, conn, connectivity.Idle) // Test starts with backends not listening. restartableListener1.Stop() @@ -1644,14 +1642,11 @@ func (s) TestRingHash_TransientFailureSkipToAvailableReady(t *testing.T) { // Because it is not accepting connections, and no other backend is // listening, the RPC fails. ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("address_hash", restartableServer1.Address+"_0")) - _, err = client.EmptyCall(ctx, &testpb.Empty{}) - if err == nil { - t.Errorf("rpc EmptyCall() succeeded, want error") + if _, err = client.EmptyCall(ctx, &testpb.Empty{}); err == nil { + t.Fatalf("rpc EmptyCall() succeeded, want error") } - if got, want := conn.GetState(), connectivity.TransientFailure; got != want { - t.Errorf("conn.GetState(): got %v, want %v", got, want) - } + testutils.AwaitState(ctx, t, conn, connectivity.TransientFailure) // Bring up first backend. The channel should become Ready without any // picks, because in TF, we are always trying to connect to at least one @@ -1671,23 +1666,21 @@ func (s) TestRingHash_TransientFailureSkipToAvailableReady(t *testing.T) { restartableListener1.Stop() testutils.AwaitState(ctx, t, conn, connectivity.TransientFailure) - _, err = client.EmptyCall(ctx, &testpb.Empty{}) - if err == nil { - t.Errorf("rpc EmptyCall() succeeded, want error") + if _, err = client.EmptyCall(ctx, &testpb.Empty{}); err == nil { + t.Fatalf("rpc EmptyCall() succeeded, want error") } t.Logf("bringing up backend 2") restartableListener2.Restart() testutils.AwaitState(ctx, t, conn, connectivity.Ready) - peerAddr := "" - for peerAddr != restartableServer2.Address { + wantPeerAddr := "" + for wantPeerAddr != restartableServer2.Address { p := peer.Peer{} - _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&p)) - if errors.Is(err, context.DeadlineExceeded) { + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&p)); errors.Is(err, context.DeadlineExceeded) { t.Fatalf("Timed out waiting for rpc EmptyCall() to be routed to the expected backend") } - peerAddr = p.Addr.String() + wantPeerAddr = p.Addr.String() } } @@ -1736,16 +1729,13 @@ func (s) TestRingHash_ReattemptWhenAllEndpointsUnreachable(t *testing.T) { defer conn.Close() client := testgrpc.NewTestServiceClient(conn) - if got, want := conn.GetState(), connectivity.Idle; got != want { - t.Errorf("conn.GetState(): got %v, want %v", got, want) - } + testutils.AwaitState(ctx, t, conn, connectivity.Idle) t.Log("Stopping the backend server") restartableListener.Stop() - _, err = client.EmptyCall(ctx, &testpb.Empty{}) - if err == nil || status.Code(err) != codes.Unavailable { - t.Errorf("rpc EmptyCall() succeeded, want Unavailable error") + if _, err = client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable { + t.Fatalf("rpc EmptyCall() succeeded, want Unavailable error") } // Wait for channel to fail. @@ -1831,7 +1821,7 @@ func (s) TestRingHash_SwitchToLowerPriorityAndThenBack(t *testing.T) { for got = range checkRPCSendOK(ctx, t, client, 1) { } if want := restartableServer.Address; got != want { - t.Errorf("Got RPC routed to addr %v, want %v", got, want) + t.Fatalf("Got RPC routed to addr %v, want %v", got, want) } // Trigger failure with the existing backend, which should cause the @@ -1847,7 +1837,7 @@ func (s) TestRingHash_SwitchToLowerPriorityAndThenBack(t *testing.T) { // failure (the next write on connection fails). if err == nil { if got, want := p.Addr.String(), otherBackend; got != want { - t.Errorf("Got RPC routed to addr %v, want %v", got, want) + t.Fatalf("Got RPC routed to addr %v, want %v", got, want) } break } @@ -1913,12 +1903,15 @@ func (s) TestRingHash_ContinuesConnectingWithoutPicksOneSubchannelAtATime(t *tes holdGood := dialer.Hold(backends[0]) rpcCtx, rpcCancel := context.WithCancel(ctx) + errCh := make(chan error, 1) go func() { - rpcCtx = metadata.NewOutgoingContext(rpcCtx, metadata.Pairs("address_hash", nonExistantBackends[0]+"_0")) // XXX + rpcCtx = metadata.NewOutgoingContext(rpcCtx, metadata.Pairs("address_hash", nonExistantBackends[0]+"_0")) _, err := client.EmptyCall(rpcCtx, &testpb.Empty{}) - if status.Code(err) != codes.Canceled { - t.Errorf("Expected RPC to be canceled, got error: %v", err) + if status.Code(err) == codes.Canceled { + errCh <- nil + return } + errCh <- err }() // Wait for the RPC to trigger a connection attempt to the first address, @@ -1927,10 +1920,12 @@ func (s) TestRingHash_ContinuesConnectingWithoutPicksOneSubchannelAtATime(t *tes t.Fatalf("Timeout waiting for connection attempt to backend 0") } rpcCancel() + if err := <-errCh; err != nil { + t.Fatalf("Expected RPC to fail be canceled, got %v", err) + } - // Allow the connection attempt to the first address to resume and wait for - // the attempt for the second address. No other connection attempts should - // be started yet. + // Since the connection attempt to the first address is still blocked, no + // other connection attempts should be started yet. if holdNonExistant1.IsStarted() { t.Errorf("Got connection attempt to backend 1, expected no connection attempt.") }