Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reader: fix setting max_in_flight to 0 #229

Closed

Conversation

andyxning
Copy link
Member

@andyxning andyxning commented Feb 25, 2019

Currently, when we set max_in_flight to 0 to disable consuming by set_in_max_flight, it is not working as expected.

As we need, when we set max_in_flight to 0, the return value of

new_rdy = max(self.total_rdy - conn.rdy + value, 0)

in _send_rdy is a positive number. Thus the if clause

if new_rdy > self.max_in_flight:

will be always true, as currently the value for self.max_in_flight is 0.

Thus the _send_rdy will shortly return and the new 0 rdy will not be set.

@andyxning
Copy link
Member Author

/cc @mreiferson @ploxiln

@ploxiln ploxiln changed the title fix set max_in_flight to 0 not work reader: fix setting max_in_flight to 0 Feb 25, 2019
@ploxiln ploxiln added the bug label Feb 25, 2019
@ploxiln
Copy link
Member

ploxiln commented Feb 25, 2019

Thanks for submitting this. This is a tricky case ... I'm not sure exactly how setting max_in_flight lower in general works. This might be a good patch in the short term as-is.

But I have some more thoughts :) This is what I think are some interesting sections of code:

    def _send_rdy(self, conn, value):
        ...
        if value and (self.disabled() or self.max_in_flight == 0):
            logger.info('[%s:%s] disabled, delaying RDY state change', conn.id, self.name)
            # retry later ... elided
            return
        if value > conn.max_rdy_count:
            value = conn.max_rdy_count

        new_rdy = max(self.total_rdy - conn.rdy + value, 0)
        if new_rdy > self.max_in_flight:
            return

        if conn.send_rdy(value):
            self.total_rdy = new_rdy


    def set_max_in_flight(self, max_in_flight):
        self.max_in_flight = max_in_flight
        if max_in_flight == 0:
            # set RDY 0 to all connections
            for conn in itervalues(self.conns):
                if conn.rdy > 0:
                    logger.debug('[%s:%s] rdy: %d -> 0', conn.id, self.name, conn.rdy)
                    self._send_rdy(conn, 0)
            self.total_rdy = 0
        else:
            self.need_rdy_redistributed = True
            self._redistribute_rdy_state()

Note that, if there is just one connection, this does work as intended:

        new_rdy = max(self.total_rdy - conn.rdy + value, 0)
        if new_rdy > self.max_in_flight:
            return

self.total_rdy should equal conn.rdy and value is zero, so new_rdy is zero, self.max_in_flight is also zero, so it is not greater, so it does not return and it does send zero.

But, if there are multiple connections ... then self.total_rdy is most likely greater than conn.rdy, so new_rdy ends up positive, and this aborts. This bit of code is sort of protecting against conn.rdy going up too much, but not expecting max_in_flight to go down. Maybe the thing to check is if value < conn.rdy? (If so, it should always be allowed, right?)

The normal non-zero case goes through _redistribute_rdy_state(), and _send_rdy() is called in a number of places, so ... this is all a bit complicated, and I am sure we have asked ourselves before if this could all be simplified :)

@andyxning
Copy link
Member Author

if there is just one connection, this does work as intended

Yes. It is.

This bit of code is sort of protecting against conn.rdy going up too much, but not expecting max_in_flight to go down.

I am not quite sure about this logic. :) So i only add the bypass logic for the special case where max_in_flight is set to 0.

This bit of code is sort of protecting against conn.rdy going up too much, but not expecting max_in_flight to go down. Maybe the thing to check is if value < conn.rdy? (If so, it should always be allowed, right?)

Actually i am not quite sure about this. So in order to not influence other logic, i only added the special case where max_in_flight is set to 0.

@andyxning
Copy link
Member Author

and I am sure we have asked ourselves before if this could all be simplified :)

I am also not quite sure about the deterministic transfer logic. It is quite complicated. If you are sure that when value < conn.rdy, we can really update the connection rdy, i will update this PR. Others i suggest we just keep this case by case, i.e., we just fix the special case for setting max_in_flight to 0.

@mreiferson
Copy link
Member

I haven't dug into this yet, but I'm curious what @alpaker thinks, who's touched some of this code most recently.

@alpaker
Copy link
Contributor

alpaker commented Feb 25, 2019

I think this:

        if new_rdy > self.max_in_flight:
            return

can be removed entirely, since any protection it supplies is superfluous. If a call to _send_rdy() passes a value for the connection's rdy that would break the invariant total_rdy <= max_in_flight, that's a bug in the caller. Silently rejecting such requests here is the wrong thing to do. That should address both the issues mentioned so far (setting max_in_flight to 0 and reducing it to a positive value).

@alpaker
Copy link
Contributor

alpaker commented Feb 25, 2019

And (I'm pretty sure) nothing in the reader currently relies on that part of _send_rdy()'s behavior.

@ploxiln
Copy link
Member

ploxiln commented Feb 25, 2019

I'm sold, thanks.

@alpaker
Copy link
Contributor

alpaker commented Feb 25, 2019

PR for this alternative fix: #230

@mreiferson
Copy link
Member

thanks @alpaker and thanks for raising the issue @andyxning!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants