diff --git a/lib/kafka/compressor.rb b/lib/kafka/compressor.rb index 27699f8af..f397c8cc8 100644 --- a/lib/kafka/compressor.rb +++ b/lib/kafka/compressor.rb @@ -27,8 +27,9 @@ def initialize(codec_name: nil, threshold: 1, instrumenter:) end # @param message_set [Protocol::MessageSet] + # @param offset [Integer] used to simulate broker behaviour in tests # @return [Protocol::MessageSet] - def compress(message_set) + def compress(message_set, offset: -1) return message_set if @codec.nil? || message_set.size < @threshold compressed_data = compress_data(message_set) @@ -36,6 +37,7 @@ def compress(message_set) wrapper_message = Protocol::Message.new( value: compressed_data, codec_id: @codec.codec_id, + offset: offset ) Protocol::MessageSet.new(messages: [wrapper_message]) diff --git a/lib/kafka/protocol/message.rb b/lib/kafka/protocol/message.rb index 9b3f459a5..e4a1a5dae 100644 --- a/lib/kafka/protocol/message.rb +++ b/lib/kafka/protocol/message.rb @@ -58,18 +58,7 @@ def decompress message_set_decoder = Decoder.from_string(data) message_set = MessageSet.decode(message_set_decoder) - # The contained messages need to have their offset corrected. - messages = message_set.messages.each_with_index.map do |message, i| - Message.new( - offset: offset + i, - value: message.value, - key: message.key, - create_time: message.create_time, - codec_id: message.codec_id - ) - end - - MessageSet.new(messages: messages) + correct_offsets(message_set) end def self.decode(decoder) @@ -113,6 +102,36 @@ def self.decode(decoder) private + # Offsets may be relative with regards to wrapped message offset, but there are special cases. + # + # Cases when client will receive corrected offsets: + # - When fetch request is version 0, kafka will correct relative offset on broker side before replying fetch response + # - When messages is stored in 0.9 format on disk (broker configured to do so). + # + # All other cases, compressed inner messages should have relative offset, with below attributes: + # - The container message should have the 'real' offset + # - The container message's offset should be the 'real' offset of the last message in the compressed batch + def correct_offsets(message_set) + max_relative_offset = message_set.messages.last.offset + + # The offsets are already correct, do nothing. + return message_set if max_relative_offset == offset + + # The contained messages have relative offsets, and needs to be corrected. + base_offset = offset - max_relative_offset + messages = message_set.messages.map do |message| + Message.new( + offset: message.offset + base_offset, + value: message.value, + key: message.key, + create_time: message.create_time, + codec_id: message.codec_id + ) + end + + MessageSet.new(messages: messages) + end + def encode_with_crc buffer = StringIO.new encoder = Encoder.new(buffer) diff --git a/spec/compressor_spec.rb b/spec/compressor_spec.rb index bd11ea7d6..abe28176d 100644 --- a/spec/compressor_spec.rb +++ b/spec/compressor_spec.rb @@ -2,29 +2,6 @@ describe ".compress" do let(:instrumenter) { Kafka::Instrumenter.new(client_id: "test") } - it "encodes and decodes compressed messages" do - compressor = Kafka::Compressor.new(codec_name: :snappy, threshold: 1, instrumenter: instrumenter) - - message1 = Kafka::Protocol::Message.new(value: "hello1") - message2 = Kafka::Protocol::Message.new(value: "hello2") - - message_set = Kafka::Protocol::MessageSet.new(messages: [message1, message2]) - compressed_message_set = compressor.compress(message_set) - - data = Kafka::Protocol::Encoder.encode_with(compressed_message_set) - decoder = Kafka::Protocol::Decoder.from_string(data) - decoded_message = Kafka::Protocol::Message.decode(decoder) - decoded_message_set = decoded_message.decompress - messages = decoded_message_set.messages - - expect(messages.map(&:value)).to eq ["hello1", "hello2"] - - # When decoding a compressed message, the offsets are calculated relative to that - # of the container message. The broker will set the offset in normal operation, - # but at the client-side we set it to -1. - expect(messages.map(&:offset)).to eq [-1, 0] - end - it "only compresses the messages if there are at least the configured threshold" do compressor = Kafka::Compressor.new(codec_name: :snappy, threshold: 3, instrumenter: instrumenter) diff --git a/spec/protocol/message_set_spec.rb b/spec/protocol/message_set_spec.rb index 73f9dc61f..3a5ce59e0 100644 --- a/spec/protocol/message_set_spec.rb +++ b/spec/protocol/message_set_spec.rb @@ -59,4 +59,55 @@ Kafka::Protocol::MessageSet.decode(decoder) }.to raise_exception(Kafka::MessageTooLargeToRead) end + + describe '.decode' do + let(:instrumenter) { Kafka::Instrumenter.new(client_id: "test") } + let(:compressor) { Kafka::Compressor.new(codec_name: :snappy, threshold: 1, instrumenter: instrumenter) } + + def encode(messages: [], wrapper_message_offset: -1) + message_set = Kafka::Protocol::MessageSet.new(messages: messages) + compressed_message_set = compressor.compress(message_set, offset: wrapper_message_offset) + Kafka::Protocol::Encoder.encode_with(compressed_message_set) + end + + def decode(data) + decoder = Kafka::Protocol::Decoder.from_string(data) + Kafka::Protocol::MessageSet + .decode(decoder) + .messages + end + + it "sets offsets correctly for compressed messages with relative offsets" do + message1 = Kafka::Protocol::Message.new(value: "hello1", offset: 0) + message2 = Kafka::Protocol::Message.new(value: "hello2", offset: 1) + message3 = Kafka::Protocol::Message.new(value: "hello3", offset: 2) + + data = encode(messages: [message1, message2, message3], wrapper_message_offset: 1000) + messages = decode(data) + + expect(messages.map(&:offset)).to eq [998, 999, 1000] + end + + it "sets offsets correctly for compressed messages with relative offsets on a compacted topic" do + message1 = Kafka::Protocol::Message.new(value: "hello1", offset: 0) + message2 = Kafka::Protocol::Message.new(value: "hello2", offset: 2) + message3 = Kafka::Protocol::Message.new(value: "hello3", offset: 3) + + data = encode(messages: [message1, message2, message3], wrapper_message_offset: 1000) + messages = decode(data) + + expect(messages.map(&:offset)).to eq [997, 999, 1000] + end + + it "keeps the predefined offsets for messages delivered in 0.9 format" do + message1 = Kafka::Protocol::Message.new(value: "hello1", offset: 997) + message2 = Kafka::Protocol::Message.new(value: "hello2", offset: 999) + message3 = Kafka::Protocol::Message.new(value: "hello3", offset: 1000) + + data = encode(messages: [message1, message2, message3], wrapper_message_offset: 1000) + messages = decode(data) + + expect(messages.map(&:offset)).to eq [997, 999, 1000] + end + end end