Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual commit multiple partitions consumer #137

Merged
merged 6 commits into from
Mar 1, 2022
Merged

Conversation

JacobSMoller
Copy link
Contributor

@JacobSMoller JacobSMoller commented Feb 24, 2022

Slightly modified version of https:/twmb/franz-go/tree/master/examples/goroutine_per_partition_consuming

Using manual commits.

During testing of this i noticed that when adding new consumers I see errors from err := cl.CommitRecords(context.Background(), recs...)
The errors where mainly

error”:“ILLEGAL_GENERATION: Specified group generation id is not valid.”

But also included

REBALANCE_IN_PROGRESS: The group is rebalancing, so a rejoin is needed.”

There did not seem to be a clear pattern in the errors

@brunsgaard
Copy link
Contributor

brunsgaard commented Feb 24, 2022

@twmb I would still like to ask you about this one even though @JacobSMoller closed this PR.

From the logs I see

{"severity":"INFO","timestamp":"2022-02-24T13:26:37.267794+01:00","message":"balancing group as leader"}
{"severity":"INFO","timestamp":"2022-02-24T13:26:37.26782+01:00","message":"balance group member","id":"kgo-607afdd2-fd8c-4965-8e6f-c06c403dc690","interests":"interested topics: [franz-debug], previously owned: "}
{"severity":"INFO","timestamp":"2022-02-24T13:26:37.267837+01:00","message":"balance group member","id":"kgo-eb6756b4-76f9-47ff-b5fc-f692f8971059","interests":"interested topics: [franz-debug], previously owned: franz-debug[0 1 2 3 4]"}
{"severity":"INFO","timestamp":"2022-02-24T13:26:37.267917+01:00","message":"balanced","plan":"kgo-607afdd2-fd8c-4965-8e6f-c06c403dc690{franz-debug[5 6 7 8 9]}, kgo-eb6756b4-76f9-47ff-b5fc-f692f8971059{franz-debug[0 1 2 3 4]}"}
{"severity":"INFO","timestamp":"2022-02-24T13:26:37.267949+01:00","message":"syncing","group":"i1","protocol_type":"consumer","protocol":"cooperative-sticky"}
{"severity":"ERROR","timestamp":"2022-02-24T13:26:37.28076+01:00","message":"Error when committing offsets to kafka","error":"ILLEGAL_GENERATION: Specified group generation id is not valid.","Commited offset":90}
.{"severity":"INFO","timestamp":"2022-02-24T13:26:37.418444+01:00","message":"synced","group":"i1","assigned":"franz-debug[0 1 2 3 4]"}

Essentially we are using a goroutine per partition and we manually commit offsets. When a rebalance occurs and a worker is calling onrevoke for some partitions, the remaining goroutine consumers most often throws

{"severity":"ERROR","timestamp":"2022-02-24T13:26:37.28076+01:00","message":"Error when committing offsets to kafka","error":"ILLEGAL_GENERATION: Specified group generation id is not valid.","Commited offset":90}

It seems to me like franz-go is trying to commit offsets, while the group is not synced. Is this expected? I have no idea what the expected behavior should be here. The problem seems to increase with when latency to the kafka brokers are high. On low latency clusters the problem disappears.

What are your two cents on this issue, is this expected or should I look further into this trying to provide a better example reproducing the error?

@twmb
Copy link
Owner

twmb commented Feb 24, 2022

@JacobSMoller was this accidentally closed? I'm always open to new examples.

@brunsgaard how often are you running into this error? If it's what I think it is, I expect this to be rare. If it isn't rare, then I need to look into this more. Currently, committing offsets does not block rebalancing, so what can happen is that you issue a commit immediately before a rebalance which grabs the group's current generation, and then a rebalance starts and finishes before the commit is issued, and then the commit when finally issued uses an old generation. A way to fix this internally would be to block rebalances during any commit.

If this is what you're running into, I'd favor blocking rebalances during commit, because a few other people have ran into this as well.

edit: similar thing to the first error, REBALANCE_IN_PROGRESS, in this case the commit happened during the rebalance rather than after.

@twmb twmb reopened this Feb 24, 2022
@JacobSMoller
Copy link
Contributor Author

JacobSMoller commented Feb 24, 2022

Hi wasn't accidental, just realized i didn't quite have the time to clean it up properly and put in the description @brunsgaard has now added, at the time i opened this :).

We are seeing the ILLEGAL_GENERATION error quite consistently, when running our code locally from Europe towards a kafka cluster in the US, which has quite high latency. If we run the code from a k8s cluster in a US region we don't see it.

@JacobSMoller JacobSMoller changed the title ILLEGAL_GENERATION/REBALANCE_IN_PROGRESS error when manually committing. Manual commit multiple partitions consumer Feb 24, 2022
@brunsgaard
Copy link
Contributor

brunsgaard commented Feb 25, 2022

Our workloads essentially reads in 10000 elements per partition and puts them in a batch and forwards them to a grpc endpoint before committing. In out test cases we are reading from the start of the topic, so there is a lot of data and thus we commit very frequently, which would not be the case in production. (but it also happens with @JacobSMoller's example code that does not commit that often, hmm)

I dont understand the generel impact and consequences of blocking rebalances during commit, but It sounds like a good idea.

What are next steps @twmb?

@JacobSMoller I suggest adding a done channel to the pconsumer so we can make sure current work is finished and committed before onrewokes/lost returns.

@JacobSMoller
Copy link
Contributor Author

JacobSMoller commented Feb 25, 2022

@twmb

Did a bit more testing to reproduce the ILLEGAL_GENERATION, using the example code from this PR. With minor changes in adding a comment above <-pc.done on line 84, enabling SASL to connect to our cluster and added the kgo.WithLogger option to get you some logs.

Full DEBUG log here: https://gist.github.com/JacobSMoller/d3a2688ff3383a44f360e19066c97266

And INFO log version of the same run below

[2022-02-25 10:34:58.91] [INFO] metadata update triggered; why: client initialization
[2022-02-25 10:35:00.307] [INFO] beginning to manage the group lifecycle; group: g6
[2022-02-25 10:35:00.307] [INFO] joining group; group: g6
[2022-02-25 10:35:02.167] [INFO] join returned MemberIDRequired, rejoining with response's MemberID; group: g6, member_id: kgo-b6fc36e6-f058-44c0-b2fc-6f523ddaffff
[2022-02-25 10:35:05.317] [INFO] joined, balancing group; group: g6, member_id: kgo-b6fc36e6-f058-44c0-b2fc-6f523ddaffff, instance_id: <nil>, generation: 1, balance_protocol: cooperative-sticky, leader: true
[2022-02-25 10:35:05.317] [INFO] balancing group as leader
[2022-02-25 10:35:05.317] [INFO] balance group member; id: kgo-b6fc36e6-f058-44c0-b2fc-6f523ddaffff, interests: interested topics: [franz-debug], previously owned:
[2022-02-25 10:35:05.317] [INFO] balanced; plan: kgo-b6fc36e6-f058-44c0-b2fc-6f523ddaffff{franz-debug[0 1 2 3 4 5 6 7 8 9]}
[2022-02-25 10:35:05.317] [INFO] syncing; group: g6, protocol_type: consumer, protocol: cooperative-sticky
[2022-02-25 10:35:05.467] [INFO] synced; group: g6, assigned: franz-debug[0 1 2 3 4 5 6 7 8 9]
[2022-02-25 10:35:05.467] [INFO] new group session begun; group: g6, added: franz-debug[0 1 2 3 4 5 6 7 8 9], lost:
[2022-02-25 10:35:05.467] [INFO] beginning heartbeat loop; group: g6
Starting consume for  t franz-debug p 2
Starting consume for  t franz-debug p 7
Starting consume for  t franz-debug p 0
Starting consume for  t franz-debug p 4
Starting consume for  t franz-debug p 6
Starting consume for  t franz-debug p 1
Starting consume for  t franz-debug p 3
Starting consume for  t franz-debug p 9
Starting consume for  t franz-debug p 8
Starting consume for  t franz-debug p 5
[2022-02-25 10:35:06.24] [INFO] assigning partitions; why: newly fetched offsets for group g6, how: assigning everything new, keeping current assignment, input: franz-debug[3{-2.-1 0} 2{-2.-1 0} 1{-2.-1 0} 4{-2.-1 0} 8{-2.-1 0} 7{-2.-1 0} 9{-2.-1 0} 0{-2.-1 0} 6{-2.-1 0} 5{-2.-1 0}]
[2022-02-25 10:35:06.24] [INFO] metadata update triggered; why: loading offsets in new session from assign
Some sort of work done, about to commit t franz-debug p 3
Some sort of work done, about to commit t franz-debug p 3
[2022-02-25 10:35:11.614] [INFO] heartbeat errored; group: g6, err: REBALANCE_IN_PROGRESS: The group is rebalancing, so a rejoin is needed.
[2022-02-25 10:35:11.614] [INFO] cooperative consumer calling onRevoke at the end of a session even though no partitions were lost; group: g6
[2022-02-25 10:35:11.614] [INFO] heartbeat loop complete; group: g6, err: REBALANCE_IN_PROGRESS: The group is rebalancing, so a rejoin is needed.
[2022-02-25 10:35:11.614] [INFO] joining group; group: g6
[2022-02-25 10:35:11.763] [INFO] joined, balancing group; group: g6, member_id: kgo-b6fc36e6-f058-44c0-b2fc-6f523ddaffff, instance_id: <nil>, generation: 2, balance_protocol: cooperative-sticky, leader: true
[2022-02-25 10:35:11.763] [INFO] balancing group as leader
[2022-02-25 10:35:11.763] [INFO] balance group member; id: kgo-0aa48bd6-24a4-4b6d-81e5-c9e8bca482b7, interests: interested topics: [franz-debug], previously owned:
[2022-02-25 10:35:11.763] [INFO] balance group member; id: kgo-b6fc36e6-f058-44c0-b2fc-6f523ddaffff, interests: interested topics: [franz-debug], previously owned: franz-debug[0 1 2 3 4 5 6 7 8 9]
[2022-02-25 10:35:11.763] [INFO] balanced; plan: kgo-0aa48bd6-24a4-4b6d-81e5-c9e8bca482b7{}, kgo-b6fc36e6-f058-44c0-b2fc-6f523ddaffff{franz-debug[0 1 2 3 4]}
[2022-02-25 10:35:11.763] [INFO] syncing; group: g6, protocol_type: consumer, protocol: cooperative-sticky
Some sort of work done, about to commit t franz-debug p 3
[2022-02-25 10:35:11.914] [INFO] synced; group: g6, assigned: franz-debug[0 1 2 3 4]
[2022-02-25 10:35:11.914] [INFO] new group session begun; group: g6, added: , lost: franz-debug[8 9 5 7 6]
[2022-02-25 10:35:11.914] [INFO] beginning heartbeat loop; group: g6
[2022-02-25 10:35:11.914] [INFO] assigning partitions; why: revoking assignments from cooperative consuming, how: unassigning any currently assigned matching partition that is in the input, input: franz-debug[8{0.0 0} 9{0.0 0} 5{0.0 0} 7{0.0 0} 6{0.0 0}]
[2022-02-25 10:35:11.914] [INFO] cooperative consumer calling onRevoke; group: g6, lost: map[franz-debug:[8 9 5 7 6]], stage: 0
Waiting for work to finish t franz-debug p 8
Closing consume for t franz-debug p 8
Waiting for work to finish t franz-debug p 9
Closing consume for t franz-debug p 9
Waiting for work to finish t franz-debug p 5
Closing consume for t franz-debug p 5
Waiting for work to finish t franz-debug p 7
Closing consume for t franz-debug p 7
Waiting for work to finish t franz-debug p 6
Closing consume for t franz-debug p 6
[2022-02-25 10:35:11.915] [INFO] forced rejoin quitting heartbeat loop; why: cooperative rejoin after revoking what we lost
[2022-02-25 10:35:11.915] [INFO] heartbeat errored; group: g6, err: REBALANCE_IN_PROGRESS: The group is rebalancing, so a rejoin is needed.
[2022-02-25 10:35:11.915] [INFO] cooperative consumer calling onRevoke at the end of a session even though no partitions were lost; group: g6
[2022-02-25 10:35:11.915] [INFO] heartbeat loop complete; group: g6, err: REBALANCE_IN_PROGRESS: The group is rebalancing, so a rejoin is needed.
[2022-02-25 10:35:11.915] [INFO] joining group; group: g6
Some sort of work done, about to commit t franz-debug p 3
Some sort of work done, about to commit t franz-debug p 0
Some sort of work done, about to commit t franz-debug p 0
Some sort of work done, about to commit t franz-debug p 0
Some sort of work done, about to commit t franz-debug p 0
[2022-02-25 10:35:13.344] [INFO] joined, balancing group; group: g6, member_id: kgo-b6fc36e6-f058-44c0-b2fc-6f523ddaffff, instance_id: <nil>, generation: 3, balance_protocol: cooperative-sticky, leader: true
[2022-02-25 10:35:13.344] [INFO] balancing group as leader
[2022-02-25 10:35:13.344] [INFO] balance group member; id: kgo-0aa48bd6-24a4-4b6d-81e5-c9e8bca482b7, interests: interested topics: [franz-debug], previously owned:
[2022-02-25 10:35:13.344] [INFO] balance group member; id: kgo-b6fc36e6-f058-44c0-b2fc-6f523ddaffff, interests: interested topics: [franz-debug], previously owned: franz-debug[0 1 2 3 4]
[2022-02-25 10:35:13.344] [INFO] balanced; plan: kgo-0aa48bd6-24a4-4b6d-81e5-c9e8bca482b7{franz-debug[5 6 7 8 9]}, kgo-b6fc36e6-f058-44c0-b2fc-6f523ddaffff{franz-debug[0 1 2 3 4]}
[2022-02-25 10:35:13.344] [INFO] syncing; group: g6, protocol_type: consumer, protocol: cooperative-sticky
Error when committing offsets to kafka err: ILLEGAL_GENERATION: Specified group generation id is not valid. t: franz-debug p: 0 offset 40
[2022-02-25 10:35:13.493] [INFO] synced; group: g6, assigned: franz-debug[0 1 2 3 4]
[2022-02-25 10:35:13.493] [INFO] new group session begun; group: g6, added: , lost:
[2022-02-25 10:35:13.494] [INFO] beginning heartbeat loop; group: g6
Some sort of work done, about to commit t franz-debug p 0
Some sort of work done, about to commit t franz-debug p 0

Hope it helps understanding the behaviour we are talking mentioning with the ILLEGAL_GENERATION error.

@twmb
Copy link
Owner

twmb commented Feb 25, 2022

So from those logs, what I can see:

  • You swapped "Killing" for "Closing" 😄
  • You start one member
  • Shortly after, (~10s) you start another member. This causes the first rebalance
  • They go through the cooperative sticky protocol, which causes another rebalance (because on the first, the first member keeps consuming until it knows which specific partitions to revoke)
  • Before the second rebalance, partitions 5 6 7 8 9 are revoked and committed properly
  • The second rebalance starts, the first member consumed a few records and sleeps
  • The first member tries to commit (I'm assuming the timing here), then the second rebalance finishes, then (again assuming timing) the first member receives the generation error.

Committing should probably block joining a group, or wait until after a group rejoin has finished. If committing is blocked on a rebalance, that's probably fine because onRevoked can be used to fence out committing for records that the client no longer owns.

Also I wonder if the per-partition consuming pattern can be made easier.

@brunsgaard
Copy link
Contributor

brunsgaard commented Feb 25, 2022

Committing should probably block joining a group, or wait until after a group rejoin has finished

@twmb is this something you will take on?

Also I wonder if the per-partition consuming pattern can be made easier.

It could make sense, but I also like that franz-go is a bit more transparent and less magic compared to e.g. Sarama. That said I think it makes sense to "package" a default SplitConume and pconsumer as part of the library (maybe not exposed to the enduser), giving an enduser an interface/api where he/she just have to provide a callback function and go routine management is handled by franz-go, the pc.quit is something that I think you should hold on too as it is very nice with an efficient way to "drain" uncomitted records doing rebalance and shutdown. With your guidance @JacobSMoller, could be assigned time to help with this.

@twmb twmb mentioned this pull request Feb 27, 2022
18 tasks
@twmb
Copy link
Owner

twmb commented Feb 27, 2022

After looking at it, I think it would be easier to have the client reissue the commit if it errors with ILLEGAL_GENERATION. It looks to be a good bit of wiring to block the join / sync / heartbeat loop on committing -- technically the only place to commit is during heartbeating.

Perhaps, if the commit fails with ILLEGAL_GENERATION, but the member still owns all partitions being committed to, the commit will be reissued.

twmb added a commit that referenced this pull request Feb 27, 2022
See #137. Cooperative consumers can consume during rebalancing. If they
commit at the start of a rebalance that ends after, then the commit will
fail with ILLEGAL_GENERATION (or sometimes, REBALANCE_IN_PROGRESS).

For cooperative specifically, if the commit fails but the consumer still
owns all partitions being committed, we now retry the commit once. This
should help alleviate commit errors that well written consumers are
currently running into.

We retry up to twice because the rebalancing when cooperative results in
two rebalances. A third failure is more unexpected.
@twmb
Copy link
Owner

twmb commented Feb 27, 2022

If possible, can you try 9b8da34? I plan to tag this in v1.4 this next week.

@JacobSMoller
Copy link
Contributor Author

JacobSMoller commented Feb 28, 2022

@twmb I can still reproduce after doing a go get -u github.com/twmb/franz-go/pkg/kgo@9b8da34c26ac718eabe1bfc7ed93a7bca2f42e5b

Debug logs for a run using the new commit. https://gist.github.com/JacobSMoller/44cf79f61cbf305ddd7c0f0210319ba4

INFO logs

[2022-02-28 11:34:14.268] [INFO] metadata update triggered; why: client initialization
[2022-02-28 11:34:15.345] [INFO] beginning to manage the group lifecycle; group: h1
[2022-02-28 11:34:15.345] [INFO] joining group; group: h1
[2022-02-28 11:34:17.226] [INFO] join returned MemberIDRequired, rejoining with response's MemberID; group: h1, member_id: kgo-125a4681-b75b-47cf-9b11-3ca1e46699a4
[2022-02-28 11:34:20.377] [INFO] joined, balancing group; group: h1, member_id: kgo-125a4681-b75b-47cf-9b11-3ca1e46699a4, instance_id: <nil>, generation: 1, balance_protocol: cooperative-sticky, leader: true
[2022-02-28 11:34:20.377] [INFO] balancing group as leader
[2022-02-28 11:34:20.377] [INFO] balance group member; id: kgo-125a4681-b75b-47cf-9b11-3ca1e46699a4, interests: interested topics: [franz-debug], previously owned: 
[2022-02-28 11:34:20.377] [INFO] balanced; plan: kgo-125a4681-b75b-47cf-9b11-3ca1e46699a4{franz-debug[0 1 2 3 4 5 6 7 8 9]}
[2022-02-28 11:34:20.377] [INFO] syncing; group: h1, protocol_type: consumer, protocol: cooperative-sticky
[2022-02-28 11:34:20.529] [INFO] synced; group: h1, assigned: franz-debug[0 1 2 3 4 5 6 7 8 9]
[2022-02-28 11:34:20.529] [INFO] new group session begun; group: h1, added: franz-debug[0 1 2 3 4 5 6 7 8 9], lost: 
[2022-02-28 11:34:20.529] [INFO] beginning heartbeat loop; group: h1
Starting consume for  t franz-debug p 3
Starting consume for  t franz-debug p 8
Starting consume for  t franz-debug p 4
Starting consume for  t franz-debug p 6
Starting consume for  t franz-debug p 1
Starting consume for  t franz-debug p 9
Starting consume for  t franz-debug p 5
Starting consume for  t franz-debug p 2
Starting consume for  t franz-debug p 7
Starting consume for  t franz-debug p 0
[2022-02-28 11:34:21.321] [INFO] assigning partitions; why: newly fetched offsets for group h1, how: assigning everything new, keeping current assignment, input: franz-debug[1{-2.-1 0} 0{-2.-1 0} 7{-2.-1 0} 3{-2.-1 0} 2{-2.-1 0} 5{-2.-1 0} 4{-2.-1 0} 8{-2.-1 0} 9{-2.-1 0} 6{-2.-1 0}]
[2022-02-28 11:34:21.322] [INFO] metadata update triggered; why: loading offsets in new session from assign
Some sort of work done, about to commit t franz-debug p 2
Some sort of work done, about to commit t franz-debug p 2
Some sort of work done, about to commit t franz-debug p 2
[2022-02-28 11:34:35.675] [INFO] heartbeat errored; group: h1, err: REBALANCE_IN_PROGRESS: The group is rebalancing, so a rejoin is needed.
[2022-02-28 11:34:35.675] [INFO] immediate metadata update triggered; why: waitmeta after heartbeat error
[2022-02-28 11:34:35.675] [INFO] cooperative consumer calling onRevoke at the end of a session even though no partitions were lost; group: h1
[2022-02-28 11:34:35.826] [INFO] joining group; group: h1
[2022-02-28 11:34:35.976] [INFO] joined, balancing group; group: h1, member_id: kgo-125a4681-b75b-47cf-9b11-3ca1e46699a4, instance_id: <nil>, generation: 2, balance_protocol: cooperative-sticky, leader: true
[2022-02-28 11:34:35.976] [INFO] balancing group as leader
[2022-02-28 11:34:35.976] [INFO] balance group member; id: kgo-125a4681-b75b-47cf-9b11-3ca1e46699a4, interests: interested topics: [franz-debug], previously owned: franz-debug[0 1 2 3 4 5 6 7 8 9]
[2022-02-28 11:34:35.976] [INFO] balance group member; id: kgo-f1b28b15-e393-472a-8fde-c55409900572, interests: interested topics: [franz-debug], previously owned: 
[2022-02-28 11:34:35.976] [INFO] balanced; plan: kgo-125a4681-b75b-47cf-9b11-3ca1e46699a4{franz-debug[0 1 2 3 4]}, kgo-f1b28b15-e393-472a-8fde-c55409900572{}
[2022-02-28 11:34:35.976] [INFO] syncing; group: h1, protocol_type: consumer, protocol: cooperative-sticky
Error when committing offsets to kafka err: ILLEGAL_GENERATION: Specified group generation id is not valid. t: franz-debug p: 2 offset 210
[2022-02-28 11:34:36.129] [INFO] synced; group: h1, assigned: franz-debug[0 1 2 3 4]
[2022-02-28 11:34:36.129] [INFO] new group session begun; group: h1, added: , lost: franz-debug[5 6 7 8 9]
[2022-02-28 11:34:36.129] [INFO] beginning heartbeat loop; group: h1
[2022-02-28 11:34:36.129] [INFO] assigning partitions; why: revoking assignments from cooperative consuming, how: unassigning any currently assigned matching partition that is in the input, input: franz-debug[9{0.0 0} 6{0.0 0} 5{0.0 0} 7{0.0 0} 8{0.0 0}]
[2022-02-28 11:34:36.129] [INFO] cooperative consumer calling onRevoke; group: h1, lost: map[franz-debug:[8 9 6 5 7]], stage: 0
Waiting for work to finish t franz-debug p 8
Closing consume for t franz-debug p 8
Waiting for work to finish t franz-debug p 9
Closing consume for t franz-debug p 9
Waiting for work to finish t franz-debug p 6
Closing consume for t franz-debug p 6
Waiting for work to finish t franz-debug p 5
Closing consume for t franz-debug p 5
Waiting for work to finish t franz-debug p 7
Closing consume for t franz-debug p 7
[2022-02-28 11:34:36.13] [INFO] forced rejoin quitting heartbeat loop; why: cooperative rejoin after revoking what we lost from a rebalance
[2022-02-28 11:34:36.13] [INFO] heartbeat errored; group: h1, err: REBALANCE_IN_PROGRESS: The group is rebalancing, so a rejoin is needed.
[2022-02-28 11:34:36.13] [INFO] cooperative consumer calling onRevoke at the end of a session even though no partitions were lost; group: h1
[2022-02-28 11:34:36.13] [INFO] joining group; group: h1
Some sort of work done, about to commit t franz-debug p 2
Some sort of work done, about to commit t franz-debug p 2
Some sort of work done, about to commit t franz-debug p 2
[2022-02-28 11:34:37.726] [INFO] joined, balancing group; group: h1, member_id: kgo-125a4681-b75b-47cf-9b11-3ca1e46699a4, instance_id: <nil>, generation: 3, balance_protocol: cooperative-sticky, leader: true
[2022-02-28 11:34:37.726] [INFO] balancing group as leader
[2022-02-28 11:34:37.726] [INFO] balance group member; id: kgo-125a4681-b75b-47cf-9b11-3ca1e46699a4, interests: interested topics: [franz-debug], previously owned: franz-debug[0 1 2 3 4]
[2022-02-28 11:34:37.726] [INFO] balance group member; id: kgo-f1b28b15-e393-472a-8fde-c55409900572, interests: interested topics: [franz-debug], previously owned: 
[2022-02-28 11:34:37.727] [INFO] balanced; plan: kgo-125a4681-b75b-47cf-9b11-3ca1e46699a4{franz-debug[0 1 2 3 4]}, kgo-f1b28b15-e393-472a-8fde-c55409900572{franz-debug[5 6 7 8 9]}
[2022-02-28 11:34:37.727] [INFO] syncing; group: h1, protocol_type: consumer, protocol: cooperative-sticky
[2022-02-28 11:34:37.88] [INFO] synced; group: h1, assigned: franz-debug[0 1 2 3 4]
[2022-02-28 11:34:37.88] [INFO] new group session begun; group: h1, added: , lost: 
[2022-02-28 11:34:37.88] [INFO] beginning heartbeat loop; group: h1
Some sort of work done, about to commit t franz-debug p 2

@brunsgaard
Copy link
Contributor

brunsgaard commented Feb 28, 2022

@twmb I think it would be more correct to block joining a group (even it is more complicated implementation wise) for three reasons

With the re-issue commit after join approach:

  1. The error (ILLEGAL_GENERATION) is still returned to the caller, thus it is hard to reason about if the offset is committed or not. (At least this is what I read from Jacobs last reply/log)
  2. The retry "might" run after new commits, I do not believe order of the commits are preserved in this case.
  3. ILLEGAL_GENERATION errors are only handled for commit calls related to partitions staying with the same group member after rebalance.

Any feedback on these concerns?

@twmb
Copy link
Owner

twmb commented Feb 28, 2022

Those logs have shown I have one bit of logic wrong and I'd like to fix it. Every commit was happening 3x -- I stayed inside a conditional which reset the bool being used to track whether a recommit was necessary.

The error (ILLEGAL_GENERATION) is still returned to the caller, thus it is hard to reason about if the offset is committed or not. (At least this is what I read from Jacobs last reply/log)

This shouldn't be the case, unless you committed to a partition that was lost.

The retry "might" run after new commits, I do not believe order of the commits are preserved in this case.

CommitRecords uses CommitOffsetsSync under the hood, which globally blocks any other commits. So, commit ordering is preserved. As well, since this will only apply during cooperative rebalancing when the client keeps the partitions, there is no offset fetch that'll do a rewind.

ILLEGAL_GENERATION errors are only handled for commit calls related to partitions staying with the same group member after rebalance.

Yes, true.


The fundamental problem is that clients have to make a choice during implementation: should the client wait for the user to process records and commit before rebalancing, or should the client rebalance and punt all responsibilities for when to commit to the user? The first choice poses the risk for the member to be booted from the group if processing records is so long that a rebalance causes the member to be kicked from the group (this is the sarama choice), the second choice makes it a lot harder for end users to guard against rebalances under the hood but ensures rebalancing will never be blocked on the application (this is the franz-go choice).

The no-wait-for-user also makes duplicates more likely around rebalances: you may have polled records, and then a rebalance happened, and now something else will consume those same records. If the rebalance waited for an end user, then you could PollRecords, take two minutes to process, and then be booted from the group. But, it also means that you could PollRecords, always process quickly, and ensure you commit before allowing a rebalance to continue.

In the non-cooperative world, a rebalance that waits for applications is also problematic because rebalances are slow and there would be a large stop-the-world effect as all members of the group waits for the slowest to finish what it's doing and rejoin. In a cooperative world, waiting for an end user is of less effect because consuming continues during the rebalance.


In thinking about it more though, if a user can always guarantee that processing records is quick (and the user usually can), franz-go may be taking an overly paranoid approach by rebalancing under the hood transparently. I'm not sure what it would take to add an option, BlockConcurrentRebalancing, or alternatively, default to blocking and add AllowConcurrentRebalancing, and then add another API on the client, func (cl *Client) AllowRebalance(), which would be required after a user polls.

Another thought is that with concurrent rebalancing, perhaps I should embed the group generation as a hidden field in a record and use that as the generation when committing, rather than using the group's current generation. This would guard against accidental rewinds when an application loses a partition across a rebalance.

twmb added a commit that referenced this pull request Feb 28, 2022
See #137. Cooperative consumers can consume during rebalancing. If they
commit at the start of a rebalance that ends after, then the commit will
fail with ILLEGAL_GENERATION (or sometimes, REBALANCE_IN_PROGRESS).

For cooperative specifically, if the commit fails but the consumer still
owns all partitions being committed, we now retry the commit once. This
should help alleviate commit errors that well written consumers are
currently running into.

We retry up to twice because the rebalancing when cooperative results in
two rebalances. A third failure is more unexpected.
@twmb
Copy link
Owner

twmb commented Feb 28, 2022

If you could try bf0e5d7, that should have been the more proper fix (I think).

twmb added a commit that referenced this pull request Feb 28, 2022
See #137. Cooperative consumers can consume during rebalancing. If they
commit at the start of a rebalance that ends after, then the commit will
fail with ILLEGAL_GENERATION (or sometimes, REBALANCE_IN_PROGRESS).

For cooperative specifically, if the commit fails but the consumer still
owns all partitions being committed, we now retry the commit once. This
should help alleviate commit errors that well written consumers are
currently running into.

We retry up to twice because the rebalancing when cooperative results in
two rebalances. A third failure is more unexpected.
twmb added a commit that referenced this pull request Mar 1, 2022
One strength of Sarama's consumer group is that it is easier to reason
about where record processing logic falls within the group management
lifecycle: if you are processing records, you can be sure that the group
will not rebalance underneath you. This is also a risk, though, if your
processing logic is so long that your group member is booted from the
group.

This client took the opposite approach of separating the group
management logic from the consuming logic. This essentially eliminated
the risk of long processing booting the member, but made it much more
difficult to reason about when to commit. There are plenty of
disclaimers about potential duplicates, and any non-transactional
consumer should expect duplicates at some point, but if a user wants to
opt for the simpler approach of consuming & processing within one group
generation, we should support that. In fact, if a user's processing loop
is very fast, we really should encourage it.

Helps #137.
@twmb
Copy link
Owner

twmb commented Mar 1, 2022

Besides the prior commit, what do you think of this? 0c8c1a6

I'm not 100% on the naming yet but I'm going to sleep on it, I haven't thought of better API names all day, so this may be the final API.

Also, I evaluated having that SplitConume type in kgo, and while it could be useful, there are enough small details that make it such that I don't think the API should be in kgo:

  • how are partition functions spun up: is concurrency managed by kgo or by the user?
  • what happens when sending to these partition consumers blocks?
  • should this use PollFetches or PollRecords?
  • some of rebalancing is standard, but some details may be better left to the user

So, I'll probably add another example once all this is over (or perhaps we can redux this PR?) and call it good.

twmb added a commit that referenced this pull request Mar 1, 2022
One strength of Sarama's consumer group is that it is easier to reason
about where record processing logic falls within the group management
lifecycle: if you are processing records, you can be sure that the group
will not rebalance underneath you. This is also a risk, though, if your
processing logic is so long that your group member is booted from the
group.

This client took the opposite approach of separating the group
management logic from the consuming logic. This essentially eliminated
the risk of long processing booting the member, but made it much more
difficult to reason about when to commit. There are plenty of
disclaimers about potential duplicates, and any non-transactional
consumer should expect duplicates at some point, but if a user wants to
opt for the simpler approach of consuming & processing within one group
generation, we should support that. In fact, if a user's processing loop
is very fast, we really should encourage it.

Helps #137.
@brunsgaard
Copy link
Contributor

@JacobSMoller bf0e5d7 maps to 748509a

@JacobSMoller
Copy link
Contributor Author

JacobSMoller commented Mar 1, 2022

@twmb using bf0e5d7 and 0c8c1a6 we can no longer reproduce the error.

What do you think about the example code using the lost/assigned lock around cl.AllowRebalance, to allow all lost partition consumers to finish their work and committing before allowing a rebalance?

Thanks a lot for your quick work on this. Let me know if you have some changes to the example

PS. Do you have an ETA on v1.4 ?

}
})
})
s.mu.Lock()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm reading this correctly, this lock is to ensure onLost is not running?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the main comments, yes. Perhaps a comment here on that would be good.

Copy link
Contributor

@brunsgaard brunsgaard Mar 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onLost, is having <-pc.done that ensures the records are committed before the goroutine returns, thus before onLost returns. Thus committing work in progress before rebalance is allowed. Does it makes sense?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm planning to merge this then add followup commit that adds a few more comments and potentially avoids EachTopic (which internally allocates a map). I'm pretty sure on the new API naming.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are unsure if that makes sense our self. Please advice :)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll merge this now and touch up a few things (file reorganization now that there are two similar examples, potentially avoid FetchTopics, add an example using AutoCommitMarks). Will ping here once done.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see why this lock is necessary, and I think I may switch BlockRebalanceOnPoll to also block polling while user provided OnAssiged / OnRevoked / OnLost are called.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check https:/twmb/franz-go/blob/v1.4/examples/goroutine_per_partition_consuming/manual_commit/main.go to see what this example has become :)

I think it's a good bit simpler now to avoid locking.

// Mimick work to happen before committing records
time.Sleep(time.Duration(rand.Intn(150)+100) * time.Millisecond)
fmt.Printf("Some sort of work done, about to commit t %s p %d\n", topic, partition)
err := cl.CommitRecords(context.Background(), recs...)
Copy link
Owner

@twmb twmb Mar 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if you would prefer MarkCommitRecords here, and AutoCommitMarks? Perhaps I need another, CommitMarks CommitUncommittedOffsets does exactly this.

This would allow more batching for your commits, vs. each commit here being blocking.

Not sure, what do you think? This is fine too.

Copy link
Contributor

@brunsgaard brunsgaard Mar 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our "real" usecase we dont commit that often, we rollup 10000 messages over a period of minutes so the commit per partition is called infrequently. For this example i like we stick to the commit per []*kgo.records if your prefer to use the markcommit strategy for this example we will do that. We are aware committing too often is a undesirable pattern.

@brunsgaard
Copy link
Contributor

brunsgaard commented Mar 1, 2022

@twmb I would also like to call out "like jacob" the way you have worked with us on this issue/PR, It over and beyond thank you.

@twmb twmb merged commit 7bd8990 into twmb:master Mar 1, 2022
@twmb
Copy link
Owner

twmb commented Mar 1, 2022

Thanks! As mentioned, I'll merge this now and touch up a few things (file reorganization now that there are two similar examples, potentially avoid FetchTopics, add an example using AutoCommitMarks). Will ping here once done.

twmb added a commit that referenced this pull request Mar 1, 2022
See #137. Cooperative consumers can consume during rebalancing. If they
commit at the start of a rebalance that ends after, then the commit will
fail with ILLEGAL_GENERATION (or sometimes, REBALANCE_IN_PROGRESS).

For cooperative specifically, if the commit fails but the consumer still
owns all partitions being committed, we now retry the commit once. This
should help alleviate commit errors that well written consumers are
currently running into.

We retry up to twice because the rebalancing when cooperative results in
two rebalances. A third failure is more unexpected.
twmb added a commit that referenced this pull request Mar 1, 2022
One strength of Sarama's consumer group is that it is easier to reason
about where record processing logic falls within the group management
lifecycle: if you are processing records, you can be sure that the group
will not rebalance underneath you. This is also a risk, though, if your
processing logic is so long that your group member is booted from the
group.

This client took the opposite approach of separating the group
management logic from the consuming logic. This essentially eliminated
the risk of long processing booting the member, but made it much more
difficult to reason about when to commit. There are plenty of
disclaimers about potential duplicates, and any non-transactional
consumer should expect duplicates at some point, but if a user wants to
opt for the simpler approach of consuming & processing within one group
generation, we should support that. In fact, if a user's processing loop
is very fast, we really should encourage it.

Helps #137.
twmb added a commit that referenced this pull request Mar 23, 2023
See comments. This is also a different and arguably better approach to
the old issue #137.

Closes #409.
Closes #137.
@twmb
Copy link
Owner

twmb commented Mar 23, 2023

I'm changing the logic that I introduced for this patch, #410 is likely better.

@brunsgaard
Copy link
Contributor

Thanks for the heads up on the change.

twmb added a commit that referenced this pull request Mar 27, 2023
See comments. This is also a different and arguably better approach to
the old issue #137.

The implementation has to be a bit insane because we want to obey the
context while _also_ trying to grab a lock, which cannot be canceled.

Closes #409.
Closes #137.
twmb added a commit that referenced this pull request Mar 27, 2023
See comments. This is also a different and arguably better approach to
the old issue #137.

The implementation has to be a bit insane because we want to obey the
context while _also_ trying to grab a lock, which cannot be canceled.

Closes #409.
Closes #137.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants