diff --git a/Makefile.am b/Makefile.am index 3a1593ceff..6fc4c829ef 100644 --- a/Makefile.am +++ b/Makefile.am @@ -126,6 +126,7 @@ noinst_LTLIBRARIES = \ libheap.la \ libignorelist.la \ liblatency.la \ + libllist.la \ liblookup.la \ libmetadata.la \ libmount.la \ @@ -217,8 +218,6 @@ collectd_SOURCES = \ src/daemon/utils_cache.h \ src/daemon/utils_complain.c \ src/daemon/utils_complain.h \ - src/daemon/utils_llist.c \ - src/daemon/utils_llist.h \ src/daemon/utils_random.c \ src/daemon/utils_random.h \ src/daemon/utils_subst.c \ @@ -238,6 +237,7 @@ collectd_LDADD = \ libavltree.la \ libcommon.la \ libheap.la \ + libllist.la \ liboconfig.la \ -lm \ $(COMMON_LIBS) \ @@ -356,6 +356,10 @@ libignorelist_la_SOURCES = \ src/utils_ignorelist.c \ src/utils_ignorelist.h +libllist_la_SOURCES = \ + src/daemon/utils_llist.c \ + src/daemon/utils_llist.h + libmetadata_la_SOURCES = \ src/daemon/meta_data.c \ src/daemon/meta_data.h @@ -1838,11 +1842,39 @@ wireless_la_LDFLAGS = $(PLUGIN_LDFLAGS) endif if BUILD_PLUGIN_WRITE_GCM +libstackdriver_json_la_SOURCES = \ + src/utils_stackdriver_json.c \ + src/utils_stackdriver_json.h +libstackdriver_json_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBYAJL_CPPFLAGS) +libstackdriver_json_la_LDFLAGS = $(AM_LDFLAGS) $(BUILD_WITH_LIBYAJL_LDFLAGS) +libstackdriver_json_la_LIBADD = $(BUILD_WITH_LIBYAJL_LIBS) libcommon.la libllist.la + +noinst_LTLIBRARIES += libstackdriver_json.la +check_PROGRAMS += test_stackdriver_json + +EXTRA_DIST += \ + src/collectd_time_series_response_test.json \ + src/time_series_summary_test.json + +AM_CPPFLAGS += \ + -DSRCDIR='"${srcdir}"' + +test_stackdriver_json_SOURCES = \ + src/utils_stackdriver_json_test.c \ + src/testing.h +test_stackdriver_json_LDADD = \ + libstackdriver_json.la \ + libmetadata.la \ + libplugin_mock.la + pkglib_LTLIBRARIES += write_gcm.la write_gcm_la_SOURCES = src/write_gcm.c write_gcm_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_LIBYAJL_LDFLAGS) write_gcm_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_LIBYAJL_CPPFLAGS) $(BUILD_WITH_LIBCURL_CFLAGS) -write_gcm_la_LIBADD = $(BUILD_WITH_LIBYAJL_LIBS) $(BUILD_WITH_LIBCURL_LIBS) +write_gcm_la_LIBADD = \ + $(BUILD_WITH_LIBYAJL_LIBS) \ + $(BUILD_WITH_LIBCURL_LIBS) \ + libstackdriver_json.la if BUILD_WITH_LIBSSL write_gcm_la_CFLAGS += $(BUILD_WITH_LIBSSL_CFLAGS) write_gcm_la_LIBADD += $(BUILD_WITH_LIBSSL_LIBS) diff --git a/src/collectd_time_series_response_test.json b/src/collectd_time_series_response_test.json new file mode 100644 index 0000000000..d892c105f2 --- /dev/null +++ b/src/collectd_time_series_response_test.json @@ -0,0 +1,59 @@ +{ + "payload_errors" : [ + { + "error" : { + "message" : "Retry in 300 milliseconds.", + "status" : "RESOURCE_EXHAUSTED", + "code" : 429 + }, + "value_errors" : [ + { + "index" : 0 + } + ], + "index" : 0 + }, + { + "index" : 1, + "value_errors" : [ + { + "error" : { + "status" : "RESOURCE_EXHAUSTED", + "code" : 429, + "message" : "Retry in 300 milliseconds." + }, + "index" : 0 + }, + { + "error" : { + "message" : "Retry in 300 milliseconds.", + "status" : "RESOURCE_EXHAUSTED", + "code" : 429 + }, + "index" : 1 + } + ] + } + ], + "summary" : { + "errors" : [ + { + "point_count" : 2, + "status" : { + "message" : "Retry in 500 milliseconds.", + "status" : "RESOURCE_EXHAUSTED", + "code" : 429 + } + }, + { + "status" : { + "status" : "NOT_FOUND", + "code" : 404 + }, + "point_count" : 1 + } + ], + "total_point_count" : 4, + "success_point_count" : 1 + } +} diff --git a/src/time_series_summary_test.json b/src/time_series_summary_test.json new file mode 100644 index 0000000000..01a9d78b49 --- /dev/null +++ b/src/time_series_summary_test.json @@ -0,0 +1,40 @@ +{ + "error" : { + "message" : "Retry in 300 milliseconds.", + "code" : 429, + "details" : [ + { + "@type" : "type.googleapis.com/google.monitoring.v3.CreateTimeSeriesSummary", + "success_point_count" : 1, + "total_point_count" : 4, + "errors" : [ + { + "status" : { + "code" : 429, + "message" : "Retry in 500 milliseconds.", + "status" : "RESOURCE_EXHAUSTED" + }, + "point_count" : 2 + }, + { + "status" : { + "code" : 404, + "status" : "NOT_FOUND" + }, + "point_count" : 1 + } + ] + }, + { + "time_series" : {}, + "@type" : "type.googleapis.com/google.monitoring.v3.CreateTimeSeriesError", + "status" : { + "code" : 429, + "message" : "Retry in 300 milliseconds.", + "status" : "RESOURCE_EXHAUSTED" + } + } + ], + "status" : "RESOURCE_EXHAUSTED" + } +} diff --git a/src/utils_stackdriver_json.c b/src/utils_stackdriver_json.c new file mode 100644 index 0000000000..2e6a41c3cb --- /dev/null +++ b/src/utils_stackdriver_json.c @@ -0,0 +1,257 @@ +/** + * collectd - src/utils_format_json.c + * Copyright (C) 2019 Google Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + **/ + +#include "collectd.h" + +#include "utils_stackdriver_json.h" +#include "daemon/plugin.h" + +#include +#include +#if HAVE_YAJL_YAJL_VERSION_H +#include +#endif +#if YAJL_MAJOR > 1 +#define HAVE_YAJL_V2 1 +#endif + +#if HAVE_YAJL_V2 +typedef long long wg_yajl_integer_t; +typedef size_t wg_yajl_size_t; +#else +typedef long wg_yajl_integer_t; +typedef unsigned int wg_yajl_size_t; +#endif + +#define TYPE_SUMMARY "type.googleapis.com/google.monitoring.v3.CreateTimeSeriesSummary" + +static void handle_yajl_status(yajl_status status, yajl_handle handle, + char *buffer, size_t length) { + if (status == yajl_status_ok) { + return; + } + + unsigned char *message = yajl_get_error(handle, 1, (unsigned char *)buffer, length); + ERROR("%s", message); + yajl_free_error(handle, message); +} + +static int parse_json(yajl_callbacks *funcs, char *buffer, void *ctx) { + yajl_handle handle; + yajl_status result; + size_t buffer_length; + +#if HAVE_YAJL_V2 + handle = yajl_alloc(funcs, /* alloc = */ NULL, ctx); +#else + handle = yajl_alloc(funcs, /* config = */ NULL, /* alloc = */ NULL, ctx); +#endif + if (handle == NULL) return -1; + + buffer_length = strlen(buffer); + result = yajl_parse(handle, (unsigned char *)buffer, buffer_length); + handle_yajl_status(result, handle, buffer, buffer_length); + +#if HAVE_YAJL_V2 + result = yajl_complete_parse(handle); +#else + result = yajl_parse_complete(handle); +#endif + handle_yajl_status(result, handle, NULL, 0); + + yajl_free(handle); + return result == yajl_status_ok ? 0 : -1; +} + +typedef enum { + FIELD_UNSET = 0, + FIELD_TYPE, + FIELD_TOTAL_POINT_COUNT, + FIELD_SUCCESS_POINT_COUNT, + FIELD_ERROR_POINT_COUNT, + FIELD_CODE, +} field_t; + +typedef struct { + struct { + // Whether the parser is inside a summary map and at what depth the map was + // found. + _Bool in_summary; + int summary_depth; + // The depth of the current map element. + int depth; + // The most recent field encountered. + field_t current_field; + // Temporary storage for building error instances. + time_series_error_t temp_error; + } state; + + // Holds the output. + time_series_summary_t *response; +} parse_summary_t; + +static void log_context(const parse_summary_t *ctx) { + DEBUG("in_summary %d; summary_depth %d; depth %d; current_field %d", + ctx->state.in_summary, ctx->state.summary_depth, ctx->state.depth, + ctx->state.current_field); +} + +static int summary_start_map(void *c) { + parse_summary_t *ctx = (parse_summary_t *)c; + ctx->state.depth++; + return 1; +} + +static int summary_end_map(void *c) { + parse_summary_t *ctx = (parse_summary_t *)c; + if (ctx->state.depth == ctx->state.summary_depth) { + ctx->state.in_summary = 0; + } + ctx->state.depth--; + if (ctx->state.in_summary) { + // If the parser is inside a CreateTimeSeriesSummary message and there is a + // fully initialized error element, commit it to the response. + if (ctx->state.temp_error.point_count > 0 && ctx->state.temp_error.code > 0) { + const int max_key_length = 10; // Includes trailing NUL. + char *key; + time_series_error_t *value; + key = malloc(max_key_length); + snprintf(key, max_key_length, "%d", ctx->state.temp_error.code); + value = malloc(sizeof(time_series_error_t)); + memcpy(value, &ctx->state.temp_error, sizeof(*value)); + memset(&ctx->state.temp_error, 0, sizeof(ctx->state.temp_error)); + if (ctx->response->errors == NULL) { + ctx->response->errors = llist_create(); + } + llist_append(ctx->response->errors, llentry_create(key, value)); + } + } + return 1; +} + +static int summary_parse_map_key(void *c, const unsigned char *key, + wg_yajl_size_t length) { + parse_summary_t *ctx = (parse_summary_t *)c; + log_context(ctx); + DEBUG("map_key: %.*s", (int) length, (const char*) key); + ctx->state.current_field = FIELD_UNSET; + // Is this a CreateTimeSeriesSummary object within a CreateCollectdTimeSeriesResponse? + if (strncmp((const char *)key, "summary", length) == 0) { + ctx->state.in_summary = 1; + ctx->state.summary_depth = ctx->state.depth; + return 1; + } + // Is this a @type annotation within a CreateTimeSeries status payload? + if (strncmp((const char *)key, "@type", length) == 0) { + ctx->state.current_field = FIELD_TYPE; + return 1; + } + if (!ctx->state.in_summary) { + return 1; + } + // We are inside a summary object. This implementation assumes that the field + // names used within the message are unique. + if (strncmp((const char *)key, "total_point_count", length) == 0) { + ctx->state.current_field = FIELD_TOTAL_POINT_COUNT; + } else if (strncmp((const char *)key, "success_point_count", length) == 0) { + ctx->state.current_field = FIELD_SUCCESS_POINT_COUNT; + } else if (strncmp((const char *)key, "point_count", length) == 0) { + // Not a typo. The field name doesn't contain "error". + ctx->state.current_field = FIELD_ERROR_POINT_COUNT; + } else if (strncmp((const char *)key, "code", length) == 0) { + ctx->state.current_field = FIELD_CODE; + } + return 1; +} + +static int summary_parse_string(void *c, const unsigned char *val, + wg_yajl_size_t length) { + parse_summary_t *ctx = (parse_summary_t *)c; + log_context(ctx); + DEBUG("string: %.*s", (int) length, (const char*)val); + // Is this a CreateTimeSeries object within a CreateTimeSeries status payload? + if (ctx->state.current_field == FIELD_TYPE && + strncmp((const char *)val, TYPE_SUMMARY, length) == 0) { + ctx->state.in_summary = 1; + ctx->state.summary_depth = ctx->state.depth; + } + return 1; +} + +static int summary_parse_integer(void *c, wg_yajl_integer_t val) { + parse_summary_t *ctx = (parse_summary_t *)c; + log_context(ctx); + DEBUG("integer: %lld", val); + if (ctx->state.current_field == FIELD_TOTAL_POINT_COUNT) { + if (ctx->response->total_point_count > 0) { + DEBUG("total_point_count was already set. Bug?"); + } + ctx->response->total_point_count += val; + } else if (ctx->state.current_field == FIELD_SUCCESS_POINT_COUNT) { + if (ctx->response->success_point_count > 0) { + DEBUG("success_point_count was already set. Bug?"); + } + ctx->response->success_point_count += val; + } else if (ctx->state.current_field == FIELD_ERROR_POINT_COUNT) { + if (ctx->state.temp_error.point_count > 0) { + DEBUG("error point_count was already set. Bug?"); + } + ctx->state.temp_error.point_count += val; + } else if (ctx->state.current_field == FIELD_CODE) { + if (ctx->state.temp_error.code > 0) { + DEBUG("error code was already set. Bug?"); + } + ctx->state.temp_error.code = val; + } + return 1; +} + +int parse_time_series_summary(char *buffer, time_series_summary_t *response) { + yajl_callbacks funcs = { + .yajl_integer = summary_parse_integer, + .yajl_string = summary_parse_string, + .yajl_map_key = summary_parse_map_key, + .yajl_start_map = summary_start_map, + .yajl_end_map = summary_end_map, + }; + parse_summary_t ctx; + if (response == NULL) return -1; + memset(&ctx, 0, sizeof(ctx)); + ctx.response = response; + return parse_json(&funcs, buffer, &ctx); +} + +void free_time_series_summary(time_series_summary_t *response) { + if (response->errors != NULL) { + llentry_t *e_this; + llentry_t *e_next; + // Free all elements of `errors`. llist_destroy below will free the actual entries. + for (e_this = llist_head(response->errors); e_this != NULL; e_this = e_next) { + e_next = e_this->next; + free(e_this->key); + free(e_this->value); + } + llist_destroy(response->errors); + response->errors = NULL; + } +} diff --git a/src/utils_stackdriver_json.h b/src/utils_stackdriver_json.h new file mode 100644 index 0000000000..10556a106b --- /dev/null +++ b/src/utils_stackdriver_json.h @@ -0,0 +1,55 @@ +/** + * collectd - src/utils_format_json.h + * Copyright (C) 2019 Google Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + **/ + +#ifndef UTILS_STACKDRIVER_JSON_H +#define UTILS_STACKDRIVER_JSON_H 1 + +#include "collectd.h" +#include "utils_llist.h" + +typedef struct { + int point_count; + int code; +} time_series_error_t; + +typedef struct { + int total_point_count; + int success_point_count; + // Elements have type time_series_error_t. + llist_t *errors; +} time_series_summary_t; + +/* +Extract statistics from the backend API response. Supports both CreateTimeSeries +and CreateCollectdTimeSeries. For sample input see time_series_summary_test.json +and collectd_time_series_response_test.json. +*/ +int parse_time_series_summary(char *buffer, time_series_summary_t *response); + +/* +Release the resources allocated by the time_series_summary_t instance. It +doesn't release `response` itself. +*/ +void free_time_series_summary(time_series_summary_t *response); + +#endif /* UTILS_STACKDRIVER_JSON_H */ diff --git a/src/utils_stackdriver_json_test.c b/src/utils_stackdriver_json_test.c new file mode 100644 index 0000000000..cf2ec60af3 --- /dev/null +++ b/src/utils_stackdriver_json_test.c @@ -0,0 +1,81 @@ +/** + * collectd - src/utils_format_json_test.c + * Copyright (C) 2019 Google Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + **/ + +#include "config.h" + +#include "collectd.h" + +#include "daemon/common.h" +#include "daemon/utils_llist.h" +#include "testing.h" +#include "utils_stackdriver_json.h" + +static int run_summary_test(time_series_summary_t summary) { + llentry_t *entry; + time_series_error_t *error; + EXPECT_EQ_INT(4, summary.total_point_count); + EXPECT_EQ_INT(1, summary.success_point_count); + CHECK_NOT_NULL(summary.errors); + entry = llist_search(summary.errors, "404"); + CHECK_NOT_NULL(entry); + error = (time_series_error_t*) entry->value; + EXPECT_EQ_INT(1, error->point_count); + EXPECT_EQ_INT(404, error->code); + entry = llist_search(summary.errors, "429"); + CHECK_NOT_NULL(entry); + error = (time_series_error_t*) entry->value; + EXPECT_EQ_INT(2, error->point_count); + EXPECT_EQ_INT(429, error->code); + EXPECT_EQ_INT(2, llist_size(summary.errors)); + return 0; +} + +DEF_TEST(time_series_summary) { + char buf[10000]; + int ret; + OK(read_file_contents(SRCDIR "/src/time_series_summary_test.json", buf, + sizeof(buf)) >= 0); + time_series_summary_t summary = {0}; + CHECK_ZERO(parse_time_series_summary(buf, &summary)); + ret = run_summary_test(summary); + free_time_series_summary(&summary); + return ret; +} + +DEF_TEST(collectd_time_series_response) { + char buf[10000]; + int ret; + OK(read_file_contents(SRCDIR "/src/collectd_time_series_response_test.json", buf, + sizeof(buf)) >= 0); + time_series_summary_t summary = {0}; + CHECK_ZERO(parse_time_series_summary(buf, &summary)); + ret = run_summary_test(summary); + free_time_series_summary(&summary); + return ret; +} + +int main(void) { + RUN_TEST(time_series_summary); + RUN_TEST(collectd_time_series_response); + END_TEST; +}