-
Notifications
You must be signed in to change notification settings - Fork 339
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix compressed offset bug #506
Conversation
This should be updated to reflect #505 (comment) |
lib/kafka/protocol/message.rb
Outdated
# The contained messages need to have their offset corrected. | ||
base_offset = offset - max_relative_offset | ||
messages = message_set.messages.each_with_index.map do |message, i| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i no longer needed
lib/kafka/compressor.rb
Outdated
@@ -28,14 +28,15 @@ def initialize(codec_name: nil, threshold: 1, instrumenter:) | |||
|
|||
# @param message_set [Protocol::MessageSet] | |||
# @return [Protocol::MessageSet] | |||
def compress(message_set) | |||
def compress(message_set, offset: -1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you document that offset
is just used to simulate the broker behavior?
lib/kafka/protocol/message.rb
Outdated
@@ -58,10 +58,14 @@ def decompress | |||
message_set_decoder = Decoder.from_string(data) | |||
message_set = MessageSet.decode(message_set_decoder) | |||
|
|||
max_relative_offset = message_set.messages.last.offset | |||
return message_set if max_relative_offset == offset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This definitely needs a comment explaining why that's the case :)
# of the container message. The broker will set the offset in normal operation, | ||
# but at the client-side we set it to -1. | ||
expect(messages.map(&:offset)).to eq [-1, 0] | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We moved tests to message_set_spec, and we beleived this case was already covered in the other three tests we provided. This test does has -1 as wrapper message offset, and both messages are -1, -1 as well. Hence our code will accept this as "correct from broker side" and return [-1, -1]
In the end we just thought the other test cases was more realistic. Should we add this back?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK – no, that's fine, just needed to follow along.
Thanks for the PR! In order to ensure this doesn't break in the future, I'd love to get a few comments in the code describing the behavior. |
Looks good! Can you verify that this works with real workloads? |
lib/kafka/protocol/message.rb
Outdated
# All other cases, compressed inner messages should have relative offset, with below attributes: | ||
# - The container message should have the 'real' offset | ||
# - The container message's offset should be the 'real' offset of the last message in the compressed batch | ||
# - The first inner message should always have offset = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, I was wrong about this.
It doesn’t always start with 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, amended 👍🏼
@dasch We are going on christmas vacation right now, so we wont be able to work on this for about 3 weeks. So, this may take a while unfortunately i.e. creating a long feedback loop. Up to you. We are running 0.5.0 while we are on vacation which is stable for us. Besides, even if we did run this version our consumers would use the short-circuit of not fixing the offsets since last message offset == wrapped message offset so it wont be a fair test. |
@klippx a change of this magnitude requires some real-life testing before I dare merge it – let's wait until you're back. |
This pr is about the same magnitude as 42821e9 |
totally agree with @zmstone |
OK, seems like I have a volunteer who can test it. I'll merge and cut a pre-release. |
Addresses #505
We are happy to change the test, since we were not sure how to set up the test scenario properly we used the "raw" approach of simply using data from our system console.