Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_kinesis_streams: integrate with shared compression lib #9200

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions plugins/out_kinesis_streams/kinesis.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ static int cb_kinesis_init(struct flb_output_instance *ins,
ctx->time_key_format = DEFAULT_TIME_KEY_FORMAT;
}

tmp = flb_output_get_property("compression", ins);
if (tmp) {
ret = flb_aws_compression_get_type(tmp);
if (ret == -1) {
flb_plg_error(ctx->ins, "unknown compression: %s", tmp);
goto error;
}
ctx->compression = ret;
}

tmp = flb_output_get_property("log_key", ins);
if (tmp) {
ctx->log_key = tmp;
Expand Down Expand Up @@ -473,6 +483,15 @@ static struct flb_config_map config_map[] = {
"networking issues."
},

{
FLB_CONFIG_MAP_STR, "compression", NULL,
0, FLB_FALSE, 0,
"Compression type for log records. Each log record is individually compressed "
"and sent to Kinesis. 'gzip' and 'arrow' are the supported values. "
"'arrow' is only an available if Apache Arrow was enabled at compile time. "
"Defaults to no compression."
},

{
FLB_CONFIG_MAP_STR, "profile", NULL,
0, FLB_TRUE, offsetof(struct flb_kinesis, profile),
Expand Down
1 change: 1 addition & 0 deletions plugins/out_kinesis_streams/kinesis.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ struct flb_kinesis {
const char *role_arn;
const char *log_key;
const char *external_id;
int compression;
int retry_requests;
char *sts_endpoint;
int custom_endpoint;
Expand Down
61 changes: 43 additions & 18 deletions plugins/out_kinesis_streams/kinesis_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include <fluent-bit/flb_http_client.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_base64.h>
#include <fluent-bit/aws/flb_aws_compress.h>

#include <monkey/mk_core.h>
#include <msgpack.h>
Expand Down Expand Up @@ -226,6 +227,7 @@ static int process_event(struct flb_kinesis *ctx, struct flush *buf,
struct tm *tmp;
size_t len;
size_t tmp_size;
void *compressed_tmp_buf;
char *out_buf;

tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset;
Expand Down Expand Up @@ -339,29 +341,52 @@ static int process_event(struct flb_kinesis *ctx, struct flush *buf,
memcpy(tmp_buf_ptr + written, "\n", 1);
written++;

/*
* check if event_buf is initialized and big enough
* Base64 encoding will increase size by ~4/3
*/
size = (written * 1.5) + 4;
if (buf->event_buf == NULL || buf->event_buf_size < size) {
flb_free(buf->event_buf);
buf->event_buf = flb_malloc(size);
buf->event_buf_size = size;
if (buf->event_buf == NULL) {
if (ctx->compression == FLB_AWS_COMPRESS_NONE) {
/*
* check if event_buf is initialized and big enough
* Base64 encoding will increase size by ~4/3
*/
size = (written * 1.5) + 4;
if (buf->event_buf == NULL || buf->event_buf_size < size) {
flb_free(buf->event_buf);
buf->event_buf = flb_malloc(size);
buf->event_buf_size = size;
if (buf->event_buf == NULL) {
flb_errno();
return -1;
}
}

tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset;
ret = flb_base64_encode((unsigned char *) buf->event_buf, size, &b64_len,
(unsigned char *) tmp_buf_ptr, written);
if (ret != 0) {
flb_errno();
return -1;
}
written = b64_len;
}

tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset;
ret = flb_base64_encode((unsigned char *) buf->event_buf, size, &b64_len,
(unsigned char *) tmp_buf_ptr, written);
if (ret != 0) {
flb_errno();
return -1;
else {
/*
* compress event, truncating input if needed
* replace event buffer with compressed buffer
*/
ret = flb_aws_compression_b64_truncate_compress(ctx->compression,
MAX_B64_EVENT_SIZE,
tmp_buf_ptr,
written, &compressed_tmp_buf,
&size); /* evaluate size */

if (ret == -1) {
flb_plg_error(ctx->ins, "Unable to compress record, discarding, "
"%s", ctx->stream_name);
return 2;
}
flb_free(buf->event_buf);
buf->event_buf = compressed_tmp_buf;
compressed_tmp_buf = NULL;
written = size;
}
written = b64_len;

tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset;
if ((buf->tmp_buf_size - buf->tmp_buf_offset) < written) {
Expand Down
1 change: 1 addition & 0 deletions plugins/out_kinesis_streams/kinesis_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#define PUT_RECORDS_PAYLOAD_SIZE 5242880
#define MAX_EVENTS_PER_PUT 500
#define MAX_EVENT_SIZE 1048556 /* 1048576 - 20 bytes for partition key */
#define MAX_B64_EVENT_SIZE 1365336 /* ceil(1024000 / 3) * 4 */

/* number of characters needed to 'start' a PutRecords payload */
#define PUT_RECORDS_HEADER_LEN 30
Expand Down