Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix compressed offset bug #506

Merged
merged 8 commits into from
Dec 20, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/kafka/compressor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ def initialize(codec_name: nil, threshold: 1, instrumenter:)

# @param message_set [Protocol::MessageSet]
# @return [Protocol::MessageSet]
def compress(message_set)
def compress(message_set, offset: -1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you document that offset is just used to simulate the broker behavior?

return message_set if @codec.nil? || message_set.size < @threshold

compressed_data = compress_data(message_set)

wrapper_message = Protocol::Message.new(
value: compressed_data,
codec_id: @codec.codec_id,
offset: offset
)

Protocol::MessageSet.new(messages: [wrapper_message])
Expand Down
6 changes: 5 additions & 1 deletion lib/kafka/protocol/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,14 @@ def decompress
message_set_decoder = Decoder.from_string(data)
message_set = MessageSet.decode(message_set_decoder)

max_relative_offset = message_set.messages.last.offset
return message_set if max_relative_offset == offset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This definitely needs a comment explaining why that's the case :)


# The contained messages need to have their offset corrected.
base_offset = offset - max_relative_offset
messages = message_set.messages.each_with_index.map do |message, i|
Copy link

@zmstone zmstone Dec 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i no longer needed

Message.new(
offset: offset + i,
offset: message.offset + base_offset,
value: message.value,
key: message.key,
create_time: message.create_time,
Expand Down
23 changes: 0 additions & 23 deletions spec/compressor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,6 @@
describe ".compress" do
let(:instrumenter) { Kafka::Instrumenter.new(client_id: "test") }

it "encodes and decodes compressed messages" do
compressor = Kafka::Compressor.new(codec_name: :snappy, threshold: 1, instrumenter: instrumenter)

message1 = Kafka::Protocol::Message.new(value: "hello1")
message2 = Kafka::Protocol::Message.new(value: "hello2")

message_set = Kafka::Protocol::MessageSet.new(messages: [message1, message2])
compressed_message_set = compressor.compress(message_set)

data = Kafka::Protocol::Encoder.encode_with(compressed_message_set)
decoder = Kafka::Protocol::Decoder.from_string(data)
decoded_message = Kafka::Protocol::Message.decode(decoder)
decoded_message_set = decoded_message.decompress
messages = decoded_message_set.messages

expect(messages.map(&:value)).to eq ["hello1", "hello2"]

# When decoding a compressed message, the offsets are calculated relative to that
# of the container message. The broker will set the offset in normal operation,
# but at the client-side we set it to -1.
expect(messages.map(&:offset)).to eq [-1, 0]
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We moved tests to message_set_spec, and we beleived this case was already covered in the other three tests we provided. This test does has -1 as wrapper message offset, and both messages are -1, -1 as well. Hence our code will accept this as "correct from broker side" and return [-1, -1]

In the end we just thought the other test cases was more realistic. Should we add this back?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK – no, that's fine, just needed to follow along.


it "only compresses the messages if there are at least the configured threshold" do
compressor = Kafka::Compressor.new(codec_name: :snappy, threshold: 3, instrumenter: instrumenter)

Expand Down
55 changes: 55 additions & 0 deletions spec/protocol/message_set_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,59 @@
Kafka::Protocol::MessageSet.decode(decoder)
}.to raise_exception(Kafka::MessageTooLargeToRead)
end

describe '.decode' do
let(:instrumenter) { Kafka::Instrumenter.new(client_id: "test") }
let(:compressor) { Kafka::Compressor.new(codec_name: :snappy, threshold: 1, instrumenter: instrumenter) }

def encode(messages: [], wrapper_message_offset: -1)
message_set = Kafka::Protocol::MessageSet.new(messages: messages)
compressed_message_set = compressor.compress(message_set, offset: wrapper_message_offset)
Kafka::Protocol::Encoder.encode_with(compressed_message_set)
end

def decode(data)
decoder = Kafka::Protocol::Decoder.from_string(data)
Kafka::Protocol::MessageSet
.decode(decoder)
.messages
end

it "sets offsets correctly for compressed messages with relative offsets" do
compressor = Kafka::Compressor.new(codec_name: :snappy, threshold: 1, instrumenter: instrumenter)

message1 = Kafka::Protocol::Message.new(value: "hello1", offset: 0)
message2 = Kafka::Protocol::Message.new(value: "hello2", offset: 1)
message3 = Kafka::Protocol::Message.new(value: "hello3", offset: 2)

data = encode(messages: [message1, message2, message3], wrapper_message_offset: 1000)
messages = decode(data)

expect(messages.map(&:offset)).to eq [998, 999, 1000]
end

it "sets offsets correctly for compressed messages with relative offsets on a compacted topic" do
message1 = Kafka::Protocol::Message.new(value: "hello1", offset: 0)
message2 = Kafka::Protocol::Message.new(value: "hello2", offset: 2)
message3 = Kafka::Protocol::Message.new(value: "hello3", offset: 3)

data = encode(messages: [message1, message2, message3], wrapper_message_offset: 1000)
messages = decode(data)

expect(messages.map(&:offset)).to eq [997, 999, 1000]
end

it "keeps the predefined offsets for messages delivered in 0.9 format" do
compressor = Kafka::Compressor.new(codec_name: :snappy, threshold: 1, instrumenter: instrumenter)

message1 = Kafka::Protocol::Message.new(value: "hello1", offset: 997)
message2 = Kafka::Protocol::Message.new(value: "hello2", offset: 999)
message3 = Kafka::Protocol::Message.new(value: "hello3", offset: 1000)

data = encode(messages: [message1, message2, message3], wrapper_message_offset: 1000)
messages = decode(data)

expect(messages.map(&:offset)).to eq [997, 999, 1000]
end
end
end