From 82e6f73e624356ed2ae81937ffc5b15d01a58974 Mon Sep 17 00:00:00 2001 From: Carlos Peon Costa Date: Tue, 5 Dec 2023 12:39:16 +0100 Subject: [PATCH] evaluate flush condition with oldest value in the buffer --- src/network.c | 37 +++++++++++++++++++------------------ src/write_influxdb_udp.c | 9 +++++---- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/src/network.c b/src/network.c index e30d1b0e4f..16d7cdd8fd 100644 --- a/src/network.c +++ b/src/network.c @@ -294,7 +294,7 @@ static pthread_t dispatch_thread_id; static char *send_buffer; static char *send_buffer_ptr; static int send_buffer_fill; -static cdtime_t send_buffer_last_update; +static cdtime_t send_buffer_first_write; static value_list_t send_buffer_vl = VALUE_LIST_INIT; static pthread_mutex_t send_buffer_lock = PTHREAD_MUTEX_INITIALIZER; @@ -2400,7 +2400,7 @@ static void network_init_buffer(void) { memset(send_buffer, 0, network_config_packet_size); send_buffer_ptr = send_buffer; send_buffer_fill = 0; - send_buffer_last_update = 0; + send_buffer_first_write = 0; memset(&send_buffer_vl, 0, sizeof(send_buffer_vl)); } /* int network_init_buffer */ @@ -2715,7 +2715,8 @@ static int network_write(const data_set_t *ds, const value_list_t *vl, /* status == bytes added to the buffer */ send_buffer_fill += status; send_buffer_ptr += status; - send_buffer_last_update = cdtime(); + if (send_buffer_first_write == 0) + send_buffer_first_write = cdtime(); stats_values_sent++; } else { @@ -2902,11 +2903,11 @@ static int network_config_add_listen(const oconfig_item_t *ci) /* {{{ */ network_config_set_security_level(child, &se->data.server.security_level); else #endif /* HAVE_GCRYPT_H */ - if (strcasecmp("Interface", child->key) == 0) - network_config_set_interface(child, &se->interface); - else { - WARNING("network plugin: Option `%s' is not allowed here.", child->key); - } + if (strcasecmp("Interface", child->key) == 0) + network_config_set_interface(child, &se->interface); + else { + WARNING("network plugin: Option `%s' is not allowed here.", child->key); + } } #if HAVE_GCRYPT_H @@ -2982,15 +2983,15 @@ static int network_config_add_server(const oconfig_item_t *ci) /* {{{ */ network_config_set_security_level(child, &se->data.client.security_level); else #endif /* HAVE_GCRYPT_H */ - if (strcasecmp("Interface", child->key) == 0) - network_config_set_interface(child, &se->interface); - else if (strcasecmp("BindAddress", child->key) == 0) - network_config_set_bind_address(child, &se->data.client.bind_addr); - else if (strcasecmp("ResolveInterval", child->key) == 0) - cf_util_get_cdtime(child, &se->data.client.resolve_interval); - else { - WARNING("network plugin: Option `%s' is not allowed here.", child->key); - } + if (strcasecmp("Interface", child->key) == 0) + network_config_set_interface(child, &se->interface); + else if (strcasecmp("BindAddress", child->key) == 0) + network_config_set_bind_address(child, &se->data.client.bind_addr); + else if (strcasecmp("ResolveInterval", child->key) == 0) + cf_util_get_cdtime(child, &se->data.client.resolve_interval); + else { + WARNING("network plugin: Option `%s' is not allowed here.", child->key); + } } #if HAVE_GCRYPT_H @@ -3312,7 +3313,7 @@ static int network_flush(cdtime_t timeout, if (send_buffer_fill > 0) { if (timeout > 0) { cdtime_t now = cdtime(); - if ((send_buffer_last_update + timeout) > now) { + if ((send_buffer_first_write + timeout) > now) { pthread_mutex_unlock(&send_buffer_lock); return 0; } diff --git a/src/write_influxdb_udp.c b/src/write_influxdb_udp.c index e26cf36f60..3b7008fbfb 100644 --- a/src/write_influxdb_udp.c +++ b/src/write_influxdb_udp.c @@ -88,7 +88,7 @@ static sockent_t *sending_sockets; static char *send_buffer; static char *send_buffer_ptr; static int send_buffer_fill; -static cdtime_t send_buffer_last_update; +static cdtime_t send_buffer_first_write; static pthread_mutex_t send_buffer_lock = PTHREAD_MUTEX_INITIALIZER; static int set_ttl(const sockent_t *se, const struct addrinfo *ai) { @@ -307,7 +307,7 @@ static void write_influxdb_udp_init_buffer(void) { memset(send_buffer, 0, wifxudp_config_packet_size); send_buffer_ptr = send_buffer; send_buffer_fill = 0; - send_buffer_last_update = 0; + send_buffer_first_write = 0; } /* write_influxdb_udp_init_buffer */ static void write_influxdb_udp_send_buffer(sockent_t *sending_socket, @@ -373,7 +373,8 @@ write_influxdb_udp_write(const data_set_t *ds, const value_list_t *vl, send_buffer_fill += status; send_buffer_ptr += status; - send_buffer_last_update = cdtime(); + if (send_buffer_first_write == 0) + send_buffer_first_write = cdtime(); if (wifxudp_config_packet_size - send_buffer_fill < 120) /* No room for a new point of average size in buffer, @@ -577,7 +578,7 @@ static int write_influxdb_udp_flush(cdtime_t timeout, if (send_buffer_fill > 0) { if (timeout > 0) { cdtime_t now = cdtime(); - if ((send_buffer_last_update + timeout) > now) { + if ((send_buffer_first_write + timeout) > now) { pthread_mutex_unlock(&send_buffer_lock); return 0; }