Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for KIP-110 ZSTD compression #2053

Merged
merged 2 commits into from
Oct 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@ endif()
option(WITH_ZLIB "With ZLIB" ${with_zlib_default})
# }

# ZSTD {
if (WITH_BUNDLED_ZSTD)
set(with_zstd_default ON)
else ()
find_library (ZSTD zstd)
if(ZSTD)
set(with_zstd_default ON)
else()
set(with_zstd_default OFF)
endif()
endif()
option(WITH_ZSTD "With ZSTD" ${with_zstd_default})
# }

# LibDL {
try_compile(
WITH_LIBDL
Expand Down
6 changes: 3 additions & 3 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

Property | C/P | Range | Default | Description
-----------------------------------------|-----|-----------------|--------------:|--------------------------
builtin.features | * | | gzip, snappy, ssl, sasl, regex, lz4, sasl_gssapi, sasl_plain, sasl_scram, plugins | Indicates the builtin features for this build of librdkafka. An application can either query this value or attempt to set it with its list of required features to check for library support. <br>*Type: CSV flags*
builtin.features | * | | gzip, snappy, ssl, sasl, regex, lz4, sasl_gssapi, sasl_plain, sasl_scram, plugins, zstd | Indicates the builtin features for this build of librdkafka. An application can either query this value or attempt to set it with its list of required features to check for library support. <br>*Type: CSV flags*
client.id | * | | rdkafka | Client identifier. <br>*Type: string*
metadata.broker.list | * | | | Initial list of brokers as a CSV list of broker host or host:port. The application may also use `rd_kafka_brokers_add()` to add brokers during runtime. <br>*Type: string*
bootstrap.servers | * | | | Alias for `metadata.broker.list`
Expand Down Expand Up @@ -110,7 +110,7 @@ message.send.max.retries | P | 0 .. 10000000 | 2
retries | P | | | Alias for `message.send.max.retries`
retry.backoff.ms | P | 1 .. 300000 | 100 | The backoff time in milliseconds before retrying a protocol request. <br>*Type: integer*
queue.buffering.backpressure.threshold | P | 1 .. 1000000 | 1 | The threshold of outstanding not yet transmitted broker requests needed to backpressure the producer's message accumulator. If the number of not yet transmitted requests equals or exceeds this number, produce request creation that would have otherwise been triggered (for example, in accordance with linger.ms) will be delayed. A lower number yields larger and more effective batches. A higher value can improve latency when using compression on slow machines. <br>*Type: integer*
compression.codec | P | none, gzip, snappy, lz4 | none | compression codec to use for compressing message sets. This is the default value for all topics, may be overridden by the topic configuration property `compression.codec`. <br>*Type: enum value*
compression.codec | P | none, gzip, snappy, lz4, zstd | none | compression codec to use for compressing message sets. This is the default value for all topics, may be overridden by the topic configuration property `compression.codec`. <br>*Type: enum value*
compression.type | P | | | Alias for `compression.codec`
batch.num.messages | P | 1 .. 1000000 | 10000 | Maximum number of messages batched in one MessageSet. The total MessageSet size is also limited by message.max.bytes. <br>*Type: integer*
delivery.report.only.error | P | true, false | false | Only provide delivery reports for failed messages. <br>*Type: boolean*
Expand All @@ -132,7 +132,7 @@ partitioner | P | | consistent_ra
partitioner_cb | P | | | Custom partitioner callback (set with rd_kafka_topic_conf_set_partitioner_cb()) <br>*Type: pointer*
msg_order_cmp | P | | | Message queue ordering comparator (set with rd_kafka_topic_conf_set_msg_order_cmp()). Also see `queuing.strategy`. <br>*Type: pointer*
opaque | * | | | Application opaque (set with rd_kafka_topic_conf_set_opaque()) <br>*Type: pointer*
compression.codec | P | none, gzip, snappy, lz4, inherit | inherit | Compression codec to use for compressing message sets. inherit = inherit global compression.codec configuration. <br>*Type: enum value*
compression.codec | P | none, gzip, snappy, lz4, zstd, inherit | inherit | Compression codec to use for compressing message sets. inherit = inherit global compression.codec configuration. <br>*Type: enum value*
compression.type | P | | | Alias for `compression.codec`
compression.level | P | -1 .. 12 | -1 | Compression level parameter for algorithm selected by configuration property `compression.codec`. Higher values will result in better compression at the cost of more CPU usage. Usable range is algorithm-dependent: [0-9] for gzip; [0-12] for lz4; only 0 for snappy; -1 = codec-dependent default compression level. <br>*Type: integer*
auto.commit.enable | C | true, false | true | [**LEGACY PROPERTY:** This property is used by the simple legacy consumer only. When using the high-level KafkaConsumer, the global `enable.auto.commit` property must be used instead]. If true, periodically commit offset of the last message handed to the application. This committed offset will be used when the process restarts to pick up where it left off. If false, the application will have to call `rd_kafka_offset_store()` to store an offset (optional). **NOTE:** There is currently no zookeeper integration, offsets will be written to broker or local file according to offset.store.method. <br>*Type: boolean*
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ See the [wiki](https:/edenhill/librdkafka/wiki) for a FAQ.
* High-level producer
* High-level balanced KafkaConsumer (requires broker >= 0.9)
* Simple (legacy) consumer
* Compression: snappy, gzip, lz4
* Compression: snappy, gzip, lz4, zstd
* [SSL](https:/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka) support
* [SASL](https:/edenhill/librdkafka/wiki/Using-SASL-with-librdkafka) (GSSAPI/Kerberos/SSPI, PLAIN, SCRAM) support
* Broker version support: >=0.8 (see [Broker version compatibility](https:/edenhill/librdkafka/wiki/Broker-version-compatibility))
Expand Down
2 changes: 2 additions & 0 deletions configure.librdkafka
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ void foo (void) {
mkl_lib_check "zlib" "WITH_ZLIB" disable CC "-lz" \
"#include <zlib.h>"
mkl_lib_check "libcrypto" "" disable CC "-lcrypto"
mkl_lib_check "zstd" "WITH_ZSTD" disable CC "-lzstd" \
"#include <zstd.h>"

if mkl_lib_check "libm" "" disable CC "-lm" \
"#include <math.h>"; then
Expand Down
4 changes: 4 additions & 0 deletions packaging/cmake/Config.cmake.in
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ if(@WITH_ZLIB@)
find_dependency(ZLIB)
endif()

if(@WITH_ZSTD@)
find_dependency(ZSTD)
endif()

if(@WITH_SSL@)
if(@WITH_BUNDLED_SSL@)
# TODO: custom SSL library should be installed
Expand Down
1 change: 1 addition & 0 deletions packaging/cmake/config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@


#cmakedefine01 WITH_ZLIB
#cmakedefine01 WITH_ZSTD
#cmakedefine01 WITH_LIBDL
#cmakedefine01 WITH_PLUGINS
#define WITH_SNAPPY 1
Expand Down
15 changes: 15 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ if(WITH_ZLIB)
list(APPEND sources rdgz.c)
endif()

if(WITH_ZSTD)
list(APPEND sources rdkafka_zstd.c)
endif()

if(NOT HAVE_REGEX)
list(APPEND sources regexp.c)
endif()
Expand All @@ -95,6 +99,9 @@ if(WITHOUT_WIN32_CONFIG)
if(WITH_SNAPPY)
list(APPEND rdkafka_compile_definitions WITH_SNAPPY)
endif(WITH_SNAPPY)
if(WITH_ZSTD)
list(APPEND rdkafka_compile_definitions WITH_ZSTD)
endif(WITH_ZSTD)
if(WITH_SASL_SCRAM)
list(APPEND rdkafka_compile_definitions WITH_SASL_SCRAM)
endif(WITH_SASL_SCRAM)
Expand Down Expand Up @@ -142,6 +149,14 @@ if(WITH_ZLIB)
target_link_libraries(rdkafka PUBLIC ZLIB::ZLIB)
endif()

if(WITH_ZSTD)
find_library (ZSTD_LIBRARY zstd)
find_path (ZSTD_INCLUDE_DIR NAMES zstd.h)
target_link_libraries(rdkafka PUBLIC ${ZSTD_LIBRARY})
target_include_directories(rdkafka PUBLIC ${ZSTD_INCLUDE_DIR})
message(STATUS "Found ZSTD: ${ZSTD_LIBRARY}")
endif()

if(WITH_SSL)
if(WITH_BUNDLED_SSL) # option from 'h2o' parent project
if(NOT TARGET bundled-ssl)
Expand Down
1 change: 1 addition & 0 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ SRCS_$(WITH_SASL_CYRUS) += rdkafka_sasl_cyrus.c
SRCS_$(WITH_SASL_SCRAM) += rdkafka_sasl_scram.c
SRCS_$(WITH_SNAPPY) += snappy.c
SRCS_$(WITH_ZLIB) += rdgz.c
SRCS_$(WITH_ZSTD) += rdkafka_zstd.c
SRCS_$(WITH_HDRHISTOGRAM) += rdhdrhistogram.c

SRCS_LZ4 = xxhash.c
Expand Down
18 changes: 17 additions & 1 deletion src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -3001,11 +3001,27 @@ rd_kafka_offsets_store(rd_kafka_t *rk,
/**
* @brief Subscribe to topic set using balanced consumer groups.
*
* Wildcard (regex) topics are supported by the librdkafka assignor:
* Wildcard (regex) topics are supported:
* any topic name in the \p topics list that is prefixed with \c \"^\" will
* be regex-matched to the full list of topics in the cluster and matching
* topics will be added to the subscription list.
*
* The full topic list is retrieved every \c topic.metadata.refresh.interval.ms
* to pick up new or delete topics that match the subscription.
* If there is any change to the matched topics the consumer will
* immediately rejoin the group with the updated set of subscribed topics.
*
* Regex and full topic names can be mixed in \p topics.
*
* @remark Only the \c .topic field is used in the supplied \p topics list,
* all other fields are ignored.
*
* @remark subscribe() is an asynchronous method which returns immediately:
* background threads will (re)join the group, wait for group rebalance,
* issue any registered rebalance_cb, assign() the assigned partitions,
* and then start fetching messages. This cycle may take up to
* \c session.timeout.ms * 2 or more to complete.
*
* @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or
* RD_KAFKA_RESP_ERR__INVALID_ARG if list is empty, contains invalid
* topics or regexes.
Expand Down
11 changes: 10 additions & 1 deletion src/rdkafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
#if WITH_PLUGINS
{ 0x200, "plugins" },
#endif
#if WITH_ZSTD
{ 0x400, "zstd" },
#endif
{ 0, NULL }
}
},
Expand Down Expand Up @@ -923,7 +926,10 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
#if WITH_SNAPPY
{ RD_KAFKA_COMPRESSION_SNAPPY, "snappy" },
#endif
{ RD_KAFKA_COMPRESSION_LZ4, "lz4" },
{ RD_KAFKA_COMPRESSION_LZ4, "lz4" },
#if WITH_ZSTD
{ RD_KAFKA_COMPRESSION_ZSTD, "zstd" },
#endif
{ 0 }
} },
{ _RK_GLOBAL|_RK_PRODUCER, "compression.type", _RK_C_ALIAS,
Expand Down Expand Up @@ -1035,6 +1041,9 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
{ RD_KAFKA_COMPRESSION_SNAPPY, "snappy" },
#endif
{ RD_KAFKA_COMPRESSION_LZ4, "lz4" },
#if WITH_ZSTD
{ RD_KAFKA_COMPRESSION_ZSTD, "zstd" },
#endif
{ RD_KAFKA_COMPRESSION_INHERIT, "inherit" },
{ 0 }
} },
Expand Down
4 changes: 3 additions & 1 deletion src/rdkafka_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ typedef enum {
RD_KAFKA_COMPRESSION_NONE,
RD_KAFKA_COMPRESSION_GZIP = RD_KAFKA_MSG_ATTR_GZIP,
RD_KAFKA_COMPRESSION_SNAPPY = RD_KAFKA_MSG_ATTR_SNAPPY,
RD_KAFKA_COMPRESSION_LZ4 = RD_KAFKA_MSG_ATTR_LZ4,
RD_KAFKA_COMPRESSION_LZ4 = RD_KAFKA_MSG_ATTR_LZ4,
RD_KAFKA_COMPRESSION_ZSTD = RD_KAFKA_MSG_ATTR_ZSTD,
RD_KAFKA_COMPRESSION_INHERIT /* Inherit setting from global conf */
} rd_kafka_compression_t;

Expand All @@ -58,6 +59,7 @@ typedef enum {
RD_KAFKA_COMPLEVEL_GZIP_MAX = 9,
RD_KAFKA_COMPLEVEL_LZ4_MAX = 12,
RD_KAFKA_COMPLEVEL_SNAPPY_MAX = 0,
RD_KAFKA_COMPLEVEL_ZSTD_MAX = 22,
RD_KAFKA_COMPLEVEL_MAX = 12
} rd_kafka_complevel_t;

Expand Down
10 changes: 10 additions & 0 deletions src/rdkafka_feature.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ static const char *rd_kafka_feature_names[] = {
"OffsetTime",
"MsgVer2",
"IdempotentProducer",
"ZSTD",
NULL
};

Expand Down Expand Up @@ -181,6 +182,15 @@ static const struct rd_kafka_feature_map {
{ -1 },
}
},
{
/* @brief >=2.1.0-IV2: Support ZStandard Compression Codec (KIP-110) */
.feature = RD_KAFKA_FEATURE_ZSTD,
.depends = {
{ RD_KAFKAP_Produce, 7, 7 },
{ RD_KAFKAP_Fetch, 10, 10 },
{ -1 },
},
},
{ .feature = 0 }, /* sentinel */
};

Expand Down
2 changes: 2 additions & 0 deletions src/rdkafka_feature.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
/* >= 0.11.0.0: Idempotent Producer support */
#define RD_KAFKA_FEATURE_IDEMPOTENT_PRODUCER 0x400

/* >= 2.1.0-IV2: ZSTD compression */
#define RD_KAFKA_FEATURE_ZSTD 0x800

int rd_kafka_get_legacy_ApiVersions (const char *broker_version,
struct rd_kafka_ApiVersion **apisp,
Expand Down
3 changes: 2 additions & 1 deletion src/rdkafka_msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
#define RD_KAFKA_MSG_ATTR_GZIP (1 << 0)
#define RD_KAFKA_MSG_ATTR_SNAPPY (1 << 1)
#define RD_KAFKA_MSG_ATTR_LZ4 (3)
#define RD_KAFKA_MSG_ATTR_COMPRESSION_MASK 0x3
#define RD_KAFKA_MSG_ATTR_ZSTD (4)
#define RD_KAFKA_MSG_ATTR_COMPRESSION_MASK 0x7
#define RD_KAFKA_MSG_ATTR_CREATE_TIME (0 << 3)
#define RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME (1 << 3)

Expand Down
17 changes: 16 additions & 1 deletion src/rdkafka_msgset_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@
#if WITH_SNAPPY
#include "snappy.h"
#endif

#if WITH_ZSTD
#include "rdkafka_zstd.h"
#endif


struct msgset_v2_hdr {
Expand Down Expand Up @@ -346,6 +348,19 @@ rd_kafka_msgset_reader_decompress (rd_kafka_msgset_reader_t *msetr,
}
break;

#if WITH_ZSTD
case RD_KAFKA_COMPRESSION_ZSTD:
{
err = rd_kafka_zstd_decompress(msetr->msetr_rkb,
(char *)compressed,
compressed_size,
&iov.iov_base, &iov.iov_len);
if (err)
goto err;
}
break;
#endif

default:
rd_rkb_dbg(msetr->msetr_rkb, MSG, "CODEC",
"%s [%"PRId32"]: Message at offset %"PRId64
Expand Down
30 changes: 29 additions & 1 deletion src/rdkafka_msgset_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
#include "rdkafka_header.h"
#include "rdkafka_lz4.h"

#if WITH_ZSTD
#include "rdkafka_zstd.h"
#endif

#include "snappy.h"
#include "rdvarint.h"
#include "crc32c.h"
Expand Down Expand Up @@ -1035,7 +1039,22 @@ rd_kafka_msgset_writer_compress_lz4 (rd_kafka_msgset_writer_t *msetw,
return (err ? -1 : 0);
}


#if WITH_ZSTD
/**
* @brief Compress messageset using ZSTD
*/
static int
rd_kafka_msgset_writer_compress_zstd (rd_kafka_msgset_writer_t *msetw,
rd_slice_t *slice, struct iovec *ciov) {
rd_kafka_resp_err_t err;
int comp_level =
vavrusa marked this conversation as resolved.
Show resolved Hide resolved
msetw->msetw_rktp->rktp_rkt->rkt_conf.compression_level;
err = rd_kafka_zstd_compress(msetw->msetw_rkb,
comp_level,
slice, &ciov->iov_base, &ciov->iov_len);
return (err ? -1 : 0);
}
#endif

/**
* @brief Compress the message set.
Expand Down Expand Up @@ -1085,6 +1104,15 @@ rd_kafka_msgset_writer_compress (rd_kafka_msgset_writer_t *msetw,
r = rd_kafka_msgset_writer_compress_lz4(msetw, &slice, &ciov);
break;

#if WITH_ZSTD
case RD_KAFKA_COMPRESSION_ZSTD:
/* Skip ZSTD compression if broker doesn't support it. */
if (!(msetw->msetw_rkb->rkb_features & RD_KAFKA_FEATURE_ZSTD))
return -1;

r = rd_kafka_msgset_writer_compress_zstd(msetw, &slice, &ciov);
break;
#endif

default:
rd_kafka_assert(NULL,
Expand Down
14 changes: 14 additions & 0 deletions src/rdkafka_topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
#include "rdtime.h"
#include "rdregex.h"

#if WITH_ZSTD
#include <zstd.h>
#endif


const char *rd_kafka_topic_state_names[] = {
"unknown",
"exists",
Expand Down Expand Up @@ -351,6 +356,15 @@ shptr_rd_kafka_itopic_t *rd_kafka_topic_new0 (rd_kafka_t *rk,
rkt->rkt_conf.compression_level =
RD_KAFKA_COMPLEVEL_LZ4_MAX;
break;
#if WITH_ZSTD
case RD_KAFKA_COMPRESSION_ZSTD:
if (rkt->rkt_conf.compression_level == RD_KAFKA_COMPLEVEL_DEFAULT)
rkt->rkt_conf.compression_level = 3;
else if (rkt->rkt_conf.compression_level > RD_KAFKA_COMPLEVEL_ZSTD_MAX)
rkt->rkt_conf.compression_level =
RD_KAFKA_COMPLEVEL_ZSTD_MAX;
break;
#endif
case RD_KAFKA_COMPRESSION_SNAPPY:
default:
/* Compression level has no effect in this case */
Expand Down
Loading