Skip to content

Commit

Permalink
Merge pull request collectd#4065 from carlospeon/flush-buffer
Browse files Browse the repository at this point in the history
evaluate flush condition with oldest value in the buffer
  • Loading branch information
octo authored Dec 5, 2023
2 parents f8a6659 + 82e6f73 commit 8a18be9
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 22 deletions.
37 changes: 19 additions & 18 deletions src/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ static pthread_t dispatch_thread_id;
static char *send_buffer;
static char *send_buffer_ptr;
static int send_buffer_fill;
static cdtime_t send_buffer_last_update;
static cdtime_t send_buffer_first_write;
static value_list_t send_buffer_vl = VALUE_LIST_INIT;
static pthread_mutex_t send_buffer_lock = PTHREAD_MUTEX_INITIALIZER;

Expand Down Expand Up @@ -2400,7 +2400,7 @@ static void network_init_buffer(void) {
memset(send_buffer, 0, network_config_packet_size);
send_buffer_ptr = send_buffer;
send_buffer_fill = 0;
send_buffer_last_update = 0;
send_buffer_first_write = 0;

memset(&send_buffer_vl, 0, sizeof(send_buffer_vl));
} /* int network_init_buffer */
Expand Down Expand Up @@ -2715,7 +2715,8 @@ static int network_write(const data_set_t *ds, const value_list_t *vl,
/* status == bytes added to the buffer */
send_buffer_fill += status;
send_buffer_ptr += status;
send_buffer_last_update = cdtime();
if (send_buffer_first_write == 0)
send_buffer_first_write = cdtime();

stats_values_sent++;
} else {
Expand Down Expand Up @@ -2902,11 +2903,11 @@ static int network_config_add_listen(const oconfig_item_t *ci) /* {{{ */
network_config_set_security_level(child, &se->data.server.security_level);
else
#endif /* HAVE_GCRYPT_H */
if (strcasecmp("Interface", child->key) == 0)
network_config_set_interface(child, &se->interface);
else {
WARNING("network plugin: Option `%s' is not allowed here.", child->key);
}
if (strcasecmp("Interface", child->key) == 0)
network_config_set_interface(child, &se->interface);
else {
WARNING("network plugin: Option `%s' is not allowed here.", child->key);
}
}

#if HAVE_GCRYPT_H
Expand Down Expand Up @@ -2982,15 +2983,15 @@ static int network_config_add_server(const oconfig_item_t *ci) /* {{{ */
network_config_set_security_level(child, &se->data.client.security_level);
else
#endif /* HAVE_GCRYPT_H */
if (strcasecmp("Interface", child->key) == 0)
network_config_set_interface(child, &se->interface);
else if (strcasecmp("BindAddress", child->key) == 0)
network_config_set_bind_address(child, &se->data.client.bind_addr);
else if (strcasecmp("ResolveInterval", child->key) == 0)
cf_util_get_cdtime(child, &se->data.client.resolve_interval);
else {
WARNING("network plugin: Option `%s' is not allowed here.", child->key);
}
if (strcasecmp("Interface", child->key) == 0)
network_config_set_interface(child, &se->interface);
else if (strcasecmp("BindAddress", child->key) == 0)
network_config_set_bind_address(child, &se->data.client.bind_addr);
else if (strcasecmp("ResolveInterval", child->key) == 0)
cf_util_get_cdtime(child, &se->data.client.resolve_interval);
else {
WARNING("network plugin: Option `%s' is not allowed here.", child->key);
}
}

#if HAVE_GCRYPT_H
Expand Down Expand Up @@ -3312,7 +3313,7 @@ static int network_flush(cdtime_t timeout,
if (send_buffer_fill > 0) {
if (timeout > 0) {
cdtime_t now = cdtime();
if ((send_buffer_last_update + timeout) > now) {
if ((send_buffer_first_write + timeout) > now) {
pthread_mutex_unlock(&send_buffer_lock);
return 0;
}
Expand Down
9 changes: 5 additions & 4 deletions src/write_influxdb_udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ static sockent_t *sending_sockets;
static char *send_buffer;
static char *send_buffer_ptr;
static int send_buffer_fill;
static cdtime_t send_buffer_last_update;
static cdtime_t send_buffer_first_write;
static pthread_mutex_t send_buffer_lock = PTHREAD_MUTEX_INITIALIZER;

static int set_ttl(const sockent_t *se, const struct addrinfo *ai) {
Expand Down Expand Up @@ -307,7 +307,7 @@ static void write_influxdb_udp_init_buffer(void) {
memset(send_buffer, 0, wifxudp_config_packet_size);
send_buffer_ptr = send_buffer;
send_buffer_fill = 0;
send_buffer_last_update = 0;
send_buffer_first_write = 0;
} /* write_influxdb_udp_init_buffer */

static void write_influxdb_udp_send_buffer(sockent_t *sending_socket,
Expand Down Expand Up @@ -373,7 +373,8 @@ write_influxdb_udp_write(const data_set_t *ds, const value_list_t *vl,

send_buffer_fill += status;
send_buffer_ptr += status;
send_buffer_last_update = cdtime();
if (send_buffer_first_write == 0)
send_buffer_first_write = cdtime();

if (wifxudp_config_packet_size - send_buffer_fill < 120)
/* No room for a new point of average size in buffer,
Expand Down Expand Up @@ -577,7 +578,7 @@ static int write_influxdb_udp_flush(cdtime_t timeout,
if (send_buffer_fill > 0) {
if (timeout > 0) {
cdtime_t now = cdtime();
if ((send_buffer_last_update + timeout) > now) {
if ((send_buffer_first_write + timeout) > now) {
pthread_mutex_unlock(&send_buffer_lock);
return 0;
}
Expand Down

0 comments on commit 8a18be9

Please sign in to comment.