Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reader: don't send RDY 1 on a new connection if that would violate max-in-flight #254

Merged
merged 4 commits into from
Oct 24, 2021

Conversation

alpaker
Copy link
Contributor

@alpaker alpaker commented Sep 4, 2020

Fixes #252.

Here we add a condition to Reader._on_connection_ready() so that we only send RDY 1 on a new connection if doing so won't lead to a max-in-flight violation. Because we earlier in _on_connection_ready() ensure that all connections have RDY less than or equal to the new per-connection max in flight, this condition will only prevent us from sending RDY 1 on a new connection when max_in_flight is less than the connection count, in which case we know that RDY redistribution will give RDY to this connection when it's safe.

(Closed the previous PR and opened a new one using a different branch for the sake of cleaner git history.)

@alpaker
Copy link
Contributor Author

alpaker commented Sep 4, 2020

Because we earlier in _on_connection_ready() ensure that all connections have RDY less than or equal to the new per-connection max in flight, this condition will only prevent us from sending RDY 1 on a new connection when max_in_flight is less than the connection count

It might be worth elaborating on the above. Before sending RDY on the new connection, _on_connection_ready() ensures all existing connections respect the new, lower per-connection max-in-flight that results from adding a connection:

        conn_max_in_flight = self._connection_max_in_flight()
        for c in self.conns.values():
            if c.rdy > conn_max_in_flight:
                self._send_rdy(c, conn_max_in_flight)

After this adjustment, we know that

(n - 1) * conn_max_in_flight >= total_rdy

where n is the new connection count (the RDY on the n - 1 incumbent connections is now bounded by conn_max_in_flight and the new connection is still at RDY 0).

If max_in_flight >= n, then conn_max_in_flight = floor(max_in_flight / n), so

max_in_flight >= n * conn_max_in_flight

The two above displayed inequalities together imply

max_in_flight > total_rdy

It follows that if max_in_flight <= total_rdy, it must be the case that max_in_flight < n.

@mreiferson
Copy link
Member

Thanks @alpaker

It looks like as of the commit I originally added the comment block in this method, we were checking for max in flight violations in _send_rdy (de9e43e#diff-6f9e6aa2ad7e79ccc43351a9e24687f4R392).

Trying to find when/why we dropped that check...

@mreiferson
Copy link
Member

mreiferson commented Sep 5, 2020

47d1693#diff-2a8bc85bf9c95f482da1eb0490a60251

and then ultimately removed in

d94d2e5#diff-2a8bc85bf9c95f482da1eb0490a60251

Naively enforcing that invariant prevents us from correctly updating each conn's rdy after reducing max_in_flight. Additionally, there's no need for this check in _send_rdy(), as all callers have logic that's supposed to determine an appropriate rdy value.

Looks like we just missed a call site...

@alpaker
Copy link
Contributor Author

alpaker commented Sep 5, 2020

So, my doing 😬. Sorry about that.

On a quick scan, these other call sites seem like they might also need checks:

Reader._maybe_update_rdy()
Reader._complete_backoff_block()
Reader._rdy_retry()

Should I work something up for those as well?

@mreiferson
Copy link
Member

I'm not sure, need to get all this logic back in my head 😄

If we need to put that check at every call site, why wouldn't we put it back in _send_rdy? As long as we also address the scenario where max in flight is reduced, too?

@alpaker
Copy link
Contributor Author

alpaker commented Sep 5, 2020

I can't think of a functional reason to prefer one approach. (I find it easier to think through various cases if _send_rdy() behaves more like a naive setter, fwiw.)

If we do restore the max-in-flight check to _send_rdy(), the fix @ploxiln suggested when the problem with reducing max in flight was first reported seems like the right thing. The short circuit test I removed from _send_rdy(), the one that interfered with max in flight reduction, was:

        if new_rdy > self.max_in_flight:
            return

where new_rdy is what the total RDY across all conns would be after updating this conn to the requested value. What we need is to be able to step down the RDY count of an individual conn even if that leaves total RDY greater than max in flight:

        if not (new_rdy <= self.max_in_flight or new_rdy < self.total_rdy):
            return

@mreiferson
Copy link
Member

  • For _complete_backoff_block, my naive read is that when max_in_flight < num_conns this will indeed violate the max in flight invariant (because _connection_max_in_flight will return 1). It should probably just trigger RDY redistribution, which already contains all the necessary logic.

  • _maybe_update_rdy needs to check total RDY vs max in flight.

  • For _rdy_retry, seems like we just need to check/clear self._rdy_timeout in set_max_in_flight.

Would be great to prove this by adding tests that fail and then pass when we fix these up (for _rdy_retry I don't think anything is broken, but it will certainly take longer to respond to reader intent the way it's currently implemented).

@alpaker
Copy link
Contributor Author

alpaker commented Sep 6, 2020

I'm working up the test cases.

One question about what you had in mind here:

For _complete_backoff_block, my naive read is that when max_in_flight < num_conns this will indeed violate the max in flight invariant (because _connection_max_in_flight will return 1). It should probably just trigger RDY redistribution, which already contains all the necessary logic.

If we just make this change:

     def _complete_backoff_block(self):
         self.backoff_block_completed = True
-        rdy = self._connection_max_in_flight()
         logger.info('[%s] backoff complete, resuming normal operation (%d connections)',
                     self.name, len(self.conns))
-        for c in self.conns.values():
-            if self._can_send_rdy(c, rdy):
-                self._send_rdy(c, rdy)
+        self.need_rdy_redistributed = True
+        self._redistribute_rdy_state()

The behavior when max_in_flight >= num_conns will change (instead of going full throttle on all conns when exiting backoff we'll go to RDY 1 on all conns and then each will go full throttle after its next message). So to preserve intended behavior we'd need:

        if len(self.conns) > self.max_in_flight:
            self.need_rdy_redistributed = True
            self._redistribute_rdy_state()
        else:
            rdy = self._connection_max_in_flight()
            for c in self.conns.values():
                self._send_rdy(c, rdy)

Is that what you were thinking?

@mreiferson
Copy link
Member

Yes, the latter.

@alpaker
Copy link
Contributor Author

alpaker commented Sep 9, 2020

Am going on vacation for a couple of days but should have something by the end of the weekend.

@alpaker alpaker force-pushed the respect-max-in-flight-on-startup branch from 9d04289 to cec3349 Compare September 14, 2020 22:08
@alpaker
Copy link
Contributor Author

alpaker commented Sep 14, 2020

@mreiferson I've made the changes we discussed and added relevant unit tests in 6f2e662 and 0dcb417.

There's an edge case I encountered while bulking out the tests that appears unrelated to the series of changes I started in #179, but I included a fix because it interacts with the changes made here: When max_in_flight is dynamically reduced from a value above the conn count to one below it, total RDY can remain above the new max_in_flight. set_max_in_flight() invokes the RDY redistribution routine, but that doesn't necessarily set RDY 0 on any connections (and if no connections are idle we can remain at positive RDY on all connections). I could see putting the relevant code in set_max_in_flight() rather than _redistribute_rdy_state() as I have it here.

An unrelated issue beyond RDY management that I noticed, although I'm not sure it'd be considered a bug: When probing a single connection in backoff, a FIN of any message is taken as a success condition, even if the message was received before entering backoff, on another connection. That is, we can have a history like:

  1. conn1 and conn2 receive messages m1 and m2
  2. m1 is REQed and the reader enters backoff
  3. reader exits backoff wait and sets RDY 1 on conn1
  4. m2 is FINed and we return to full blast on all conns

Copy link
Member

@mreiferson mreiferson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are awesome, great work, thanks! 💯 😍

Left a few questions...

nsq/reader.py Outdated
@@ -626,6 +632,10 @@ def set_max_in_flight(self, max_in_flight):
self._send_rdy(conn, 0)
self.total_rdy = 0
else:
for conn in self.conns.values():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just reset rdy_timeout on all conns regardless of max_in_flight?

I realize that we only need it when max_in_flight > 0 because we reset rdy_timeout in _send_rdy, which is forcibly called on all connections when max_in_flight == 0, but that's mostly true for _redistribute_rdy_state too. To me, this is a readability thing for our future selves...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Will make that change.

@@ -660,6 +670,17 @@ def _redistribute_rdy_state(self):
if self.need_rdy_redistributed:
self.need_rdy_redistributed = False

if self.total_rdy > self.max_in_flight:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this block instead just set RDY 0 for all connections and let the rest of the logic below identify conns to receive non-zero ready?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the current redistribution logic I think that change could make it unsafe to call set_max_in_flight() with messages in flight. Right now redistribution avoids giving positive RDY to a connection with a message in flight, so something like the following could happen:

  1. Let max-in-flight be > len(conns).
  2. With a message in flight on conn c, reduce m-i-f but keep it >= len(conns).
  3. _redistribute_rdy_state() sets RDY 0 on all connections, including c.
  4. Because c has something in flight, this redistribution round leaves c at RDY 0.
  5. Subsequent redistribution rounds see m-i-f >= len(conns) and so don't redistribute RDY, leaving c starved.

We could make the simplification you suggest if we allow redistribution to give RDY to a conn with something in flight:

--- a/nsq/reader.py
+++ b/nsq/reader.py
@@ -720,7 +720,7 @@ class Reader(Client):
             # We also don't attempt to avoid the connections who previously might have had RDY 1
             # because it would be overly complicated and not actually worth it (ie. given enough
             # redistribution rounds it doesn't matter).
-            possible_conns = [c for c in conns if not (c.in_flight or c.rdy)]
+            possible_conns = [c for c in conns if not c.rdy]
             while possible_conns and available_rdy:
                 available_rdy -= 1
                 conn = possible_conns.pop(random.randrange(len(possible_conns)))

which (I think) is fine from a correctness standpoint and will at worst increase the variance in how long it takes to service all connections equally.

@mreiferson
Copy link
Member

An unrelated issue beyond RDY management that I noticed, although I'm not sure it'd be considered a bug: When probing a single connection in backoff, a FIN of any message is taken as a success condition, even if the message was received before entering backoff, on another connection.

Good catch, but this code is already complicated enough :)

@alpaker alpaker force-pushed the respect-max-in-flight-on-startup branch from cec3349 to 41c1b4a Compare October 19, 2020 21:13
@mreiferson
Copy link
Member

@alpaker want to squash that rebase commit and we can land this?

@danbf
Copy link

danbf commented Sep 30, 2021

@alpaker @mreiferson anyway we can get this merged? this hits us every now and again and makes us sad. this PR looks ready.

crying-sad

@alpaker
Copy link
Contributor Author

alpaker commented Oct 1, 2021 via email

@alpaker alpaker force-pushed the respect-max-in-flight-on-startup branch from c499512 to 16abeb7 Compare October 4, 2021 05:29
@alpaker
Copy link
Contributor Author

alpaker commented Oct 4, 2021

@mreiferson Squashed and GTG.

@danbf
Copy link

danbf commented Oct 4, 2021

@alpaker thanks very much, as soon as this lands i'll roll it out.

cute-puppy

@mreiferson mreiferson merged commit ab14efe into nsqio:master Oct 24, 2021
@danbf
Copy link

danbf commented Oct 25, 2021

happy-dance

just need a push to pypi.org and we are golden

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

reader: connection initialization doesn't respect max-in-flight constraint
3 participants