Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for KIP-110 ZSTD compression #2053

Merged
merged 2 commits into from
Oct 22, 2018
Merged

Conversation

vavrusa
Copy link
Contributor

@vavrusa vavrusa commented Oct 12, 2018

This was introduced recently in https://cwiki.apache.org/confluence/display/KAFKA/KIP-110%3A+Add+Codec+for+ZStandard+Compression

And merged in apache/kafka#2267

I'm going to need some help figuring out how to test this. I think I added the prerequisites right - Fetch APIv10 and Produce APIv7.

@vavrusa vavrusa force-pushed the master branch 3 times, most recently from 305e898 to 7efbdfe Compare October 13, 2018 01:45
Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, that was quick, the AK PR was merged two days ago!
This looks great, just needs a couple of minor cosmetic changes.

src/rdkafka_msgset_writer.c Show resolved Hide resolved
src/rdzstd.c Outdated
case ZSTD_CONTENTSIZE_UNKNOWN:
/* Decompressed size cannot be determined, make a guess */
out_bufsize = inlen * 2;
case ZSTD_CONTENTSIZE_ERROR:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be a break; here, right?

src/rdzstd.c Outdated
/* Check if the destination size is too small */
ZSTD_ErrorCode err = ZSTD_getErrorCode(ret);
if (err == ZSTD_error_dstSize_tooSmall) {
out_bufsize *= 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't find the source now, and I don't really remember why, but it is supposedly better to multiply by 1.75 than 2 when increasing buffers.

https:/edenhill/librdkafka/blob/master/src/rdkafka_lz4.c#L268

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this: rd_atomic64_add(&rkb->rkb_c.zbuf_grow, 1);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

src/rdzstd.c Outdated
ZSTD_ErrorCode err = ZSTD_getErrorCode(ret);
if (err == ZSTD_error_dstSize_tooSmall) {
out_bufsize *= 2;
char *newp = rd_realloc(decompressed, out_bufsize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

variable definitions need to go at the top of the scope to be compatible with older non C99 compilers (MSVC).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated it but it doesn't compile with c89 (no inline etc.), is this is just for MSVC compatibility?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, at least that's the most common weird old compiler we come across frequently, and it is stuck somewhere between c89 and c99 :)

src/rdzstd.c Outdated
}

/* Check if the destination size is too small */
ZSTD_ErrorCode err = ZSTD_getErrorCode(ret);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

variable definititions need to get at the top of the scope.

err shadows the top-level rd_kafka_resp_err_t err, better call this one zerr

src/rdzstd.c Outdated
}
}

rd_assert(rd_slice_remains(slice) == 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to error out here instead of crashing since the input data is from an external source.

src/rdzstd.h Outdated
@@ -0,0 +1,52 @@
/*
* Copyright (c) 2018, Marek Vavrusa <[email protected]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dito, rename to rdkafka_zstd.h, add copyright, add trailing >

CONFIGURATION.md Outdated
@@ -140,3 +140,4 @@ offset.store.method | C | file, broker | broker
consume.callback.max.messages | C | 0 .. 1000000 | 0 | Maximum number of messages to dispatch in one `rd_kafka_consume_callback*()` call (0 = unlimited) <br>*Type: integer*

### C/P legend: C = Consumer, P = Producer, * = both

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is automatically generated but seems to be missing the zstd stuff; make sure you build the source with make at least once to let auto-generation kick in.

LICENSE.zstd Outdated
@@ -0,0 +1,30 @@
BSD License
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're not including the zstd source we shouldn't include this license.

LICENSES.txt Outdated
@@ -342,3 +342,37 @@ For the files wingetopt.c wingetopt.h downloaded from https:/alex85k
*/


LICENSE.zstd
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dito

@edenhill
Copy link
Contributor

This will go in v1.0.0, which is currently the idempotence branch.
It would be great if you could retarget and rebase this on idempotence.

@vavrusa
Copy link
Contributor Author

vavrusa commented Oct 13, 2018

Yeah, we've been using zstd in Kafka for a while at Cloudflare, but it only currently "works" with Sarama, so I can't move without it.

@vavrusa vavrusa changed the base branch from master to idempotence October 13, 2018 22:54
@edenhill
Copy link
Contributor

.. so I can't move without it.

Move to what? :)

v1.0.0 is scheduled for release within a month.

@vavrusa
Copy link
Contributor Author

vavrusa commented Oct 13, 2018

Move to what? :)

I wrote the Kafka table engine for ClickHouse using librdkafka, but can't use it with most topics without zstd. The librdkafka is embedded as a submodule there, so I don't have to wait for the release tag before testing it.

@vavrusa
Copy link
Contributor Author

vavrusa commented Oct 13, 2018

Rebased to idempotence branch.

@vavrusa
Copy link
Contributor Author

vavrusa commented Oct 13, 2018

Thanks for reviewing! 🙌

@edenhill edenhill changed the base branch from idempotence to master October 22, 2018 11:21
@edenhill edenhill merged commit 5475b60 into confluentinc:master Oct 22, 2018
@Donis-
Copy link

Donis- commented Jan 2, 2019

@vavrusa did you get it to work in your case?

We upgraded to confluent kafka 5.1.0 / 2.1.0 and set the broker default compression to ZSTD.
Java clients seem to be working fine, but i can't get kafkacat with librdkafka to work:
Consumption fails. Based on logs, zstd was enabled via compilation and is supported by broker:

%7|1546470506.414|APIVERSION|rdkafka#consumer-1| [thrd:sasl_plaintext://xxx/bootstrap]: sasl_plaintext://xxx/bootstrap: Feature ZSTD: Produce (7..7) supported by broker
%7|1546470506.415|APIVERSION|rdkafka#consumer-1| [thrd:sasl_plaintext://xxx/bootstrap]: sasl_plaintext://xxx/bootstrap: Feature ZSTD: Fetch (10..10) supported by broker
%7|1546470506.417|APIVERSION|rdkafka#consumer-1| [thrd:sasl_plaintext://xxx/bootstrap]: sasl_plaintext://xxx/bootstrap: Enabling feature ZSTD
%7|1546470506.417|FEATURE|rdkafka#consumer-1| [thrd:sasl_plaintext://xxx/bootstrap]: sasl_plaintext://xxx/bootstrap: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD
...
%7|1546470508.106|FETCH|rdkafka#consumer-1| [thrd:sasl_plaintext://xxx/105]: sasl_plaintext://xxx/105: Fetch topic XXX [55] at offset 0 (v2)
%7|1546470508.108|FETCH|rdkafka#consumer-1| [thrd:sasl_plaintext://xxx/105]: sasl_plaintext://xxx/105: Fetch 1/1/16 toppar(s)
%7|1546470508.108|SEND|rdkafka#consumer-1| [thrd:sasl_plaintext://xxx/105]: sasl_plaintext://xxx/105: Sent FetchRequest (v4, 112 bytes @ 0, CorrId 3)
%7|1546470508.155|RECV|rdkafka#consumer-1| [thrd:sasl_plaintext://xxx/105]: sasl_plaintext://xxx/105: Received FetchResponse (v4, 92 bytes, CorrId 3, rtt 46.73ms)
%7|1546470508.156|FETCH|rdkafka#consumer-1| [thrd:sasl_plaintext://xxx/105]: sasl_plaintext://xxx/105: Topic XXX [55] MessageSet size 0, error "Err-76?", MaxOffset -1, Ver 2/2

76 is the error introduced with KIP-110: UNSUPPORTED_COMPRESSION_TYPE.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-110%3A+Add+Codec+for+ZStandard+Compression

Maybe we need to bump the v4 fetch version somehow as per "Zstd will only be allowed for the bumped fetch API ".? or am i missing something?

version:
tag 1.0.0-RC5 with dynamic zstd linking, win64 build.

@vavrusa
Copy link
Contributor Author

vavrusa commented Jan 2, 2019

Yes, it's been working for some time. Maybe the negotiation isn't working properly for your case? I don't know that much about Kafka itself to help you, sorry!

@Donis-
Copy link

Donis- commented Jan 3, 2019

thanks for confirming!
just to double check, you're both producing and consuming via librdkafka?
@edenhill maybe you have an idea on what could be going wrong in my case?

@Donis-
Copy link

Donis- commented Jan 4, 2019

seems fetch api v10 support was missing, by adding the the fetch protocol support for v10 i got it to work. i can clean up and submit a PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants