Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HOLD] Parametrize for size of in-memory queue #689

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions conf/scope.yml
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,26 @@ libscope:
#
level: warning

# Set buffer threshold of the in-memory queue
# Type: integer
# Values: (greater than zero) bytes
# Default: 32768
# Override: $SCOPE_LOG_BUFFER_THRESHOLD
#
# Threshold of raw data after reaching data will be flushed to log
#
buffer_threshold: 32768

# Set flush period in-memory queue
# Type: integer
# Values: (greater than zero) miliseconds
# Default: 2000
# Override: $SCOPE_LOG_FLUSH_PERIOD
#
# Period after data will be flushed to log
#
flush_period: 2000

# Backend connection for logs
#
transport:
Expand Down
30 changes: 30 additions & 0 deletions src/cfg.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ struct _config_t

struct {
cfg_log_level_t level;
unsigned long bufthreshold;
unsigned long flushperiod;
} log;

struct {
Expand Down Expand Up @@ -208,6 +210,8 @@ cfgCreateDefault()
}

c->log.level = DEFAULT_LOG_LEVEL;
c->log.bufthreshold = DEFAULT_LOG_MAX_AGG_BYTES;
c->log.flushperiod = DEFAULT_LOG_FLUSH_PERIOD_IN_MS;

c->pay.enable = DEFAULT_PAYLOAD_ENABLE;
c->pay.dir = (DEFAULT_PAYLOAD_DIR) ? strdup(DEFAULT_PAYLOAD_DIR) : NULL;
Expand Down Expand Up @@ -561,6 +565,18 @@ cfgLogLevel(config_t* cfg)
return (cfg) ? cfg->log.level : DEFAULT_LOG_LEVEL;
}

unsigned long
cfgLogBufThreshold(config_t* cfg)
{
return (cfg) ? cfg->log.bufthreshold : DEFAULT_LOG_MAX_AGG_BYTES;
}

unsigned long
cfgLogFlushPeriod(config_t* cfg)
{
return (cfg) ? cfg->log.flushperiod : DEFAULT_LOG_FLUSH_PERIOD_IN_MS;
}

unsigned int
cfgPayEnable(config_t *cfg)
{
Expand Down Expand Up @@ -912,6 +928,20 @@ cfgLogLevelSet(config_t* cfg, cfg_log_level_t level)
cfg->log.level = level;
}

void
cfgLogBufThresholdSet(config_t* cfg, unsigned long threshold)
{
if (!cfg) return;
cfg->log.bufthreshold = threshold;
}

void
cfgLogFlushPeriodSet(config_t* cfg, unsigned long period)
{
if (!cfg) return;
cfg->log.flushperiod = period;
}

void
cfgPayEnableSet(config_t *cfg, unsigned int val)
{
Expand Down
4 changes: 4 additions & 0 deletions src/cfg.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ const char* cfgTransportTlsCACertPath(config_t *, which_transport_t);
custom_tag_t** cfgCustomTags(config_t*);
const char* cfgCustomTagValue(config_t*, const char*);
cfg_log_level_t cfgLogLevel(config_t*);
unsigned long cfgLogBufThreshold(config_t*);
unsigned long cfgLogFlushPeriod(config_t*);
unsigned int cfgPayEnable(config_t*);
const char * cfgPayDir(config_t*);
const char * cfgEvtFormatHeader(config_t *, int);
Expand Down Expand Up @@ -78,6 +80,8 @@ void cfgTransportTlsValidateServerSet(config_t *, which_transport
void cfgTransportTlsCACertPathSet(config_t *, which_transport_t, const char *);
void cfgCustomTagAdd(config_t*, const char*, const char*);
void cfgLogLevelSet(config_t*, cfg_log_level_t);
void cfgLogBufThresholdSet(config_t*, unsigned long);
void cfgLogFlushPeriodSet(config_t*, unsigned long);
void cfgPayEnableSet(config_t*, unsigned int);
void cfgPayDirSet(config_t*, const char *);
void cfgEvtFormatHeaderSet(config_t *, const char *);
Expand Down
74 changes: 57 additions & 17 deletions src/cfgutils.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@

#define LIBSCOPE_NODE "libscope"
#define LOG_NODE "log"
#define BUF_THRESHOLD_NODE "bufthreshold"
#define FLUSH_PERIOD_NODE "flushperiod"
#define LEVEL_NODE "level"
#define TRANSPORT_NODE "transport"
#define SUMMARYPERIOD_NODE "summaryperiod"
Expand Down Expand Up @@ -165,6 +167,8 @@ void cfgTransportTlsValidateServerSetFromStr(config_t *, which_transport_t, cons
void cfgTransportTlsCACertPathSetFromStr(config_t *, which_transport_t, const char *);
void cfgCustomTagAddFromStr(config_t*, const char*, const char*);
void cfgLogLevelSetFromStr(config_t*, const char*);
void cfgLogBufThresholdSetFromStr(config_t*, const char*);
void cfgLogFlushPeriodSetFromStr(config_t*, const char*);
void cfgPayEnableSetFromStr(config_t*, const char*);
void cfgPayDirSetFromStr(config_t*, const char*);
void cfgAuthTokenSetFromStr(config_t*, const char*);
Expand Down Expand Up @@ -455,6 +459,10 @@ processEnvStyleInput(config_t *cfg, const char *env_line)
cfgMtcVerbositySetFromStr(cfg, value);
} else if (!strcmp(env_name, "SCOPE_LOG_LEVEL")) {
cfgLogLevelSetFromStr(cfg, value);
} else if (!strcmp(env_name, "SCOPE_LOG_BUFFER_THRESHOLD")) {
cfgLogBufThresholdSetFromStr(cfg, value);
} else if (!strcmp(env_name, "SCOPE_LOG_FLUSH_PERIOD")) {
cfgLogFlushPeriodSetFromStr(cfg, value);
} else if (!strcmp(env_name, "SCOPE_METRIC_DEST")) {
cfgTransportSetFromStr(cfg, CFG_MTC, value);
} else if (!strcmp(env_name, "SCOPE_METRIC_TLS_ENABLE")) {
Expand Down Expand Up @@ -649,10 +657,8 @@ void
cfgMtcStatsDMaxLenSetFromStr(config_t* cfg, const char* value)
{
if (!cfg || !value) return;
errno = 0;
char* endptr = NULL;
unsigned long x = strtoul(value, &endptr, 10);
if (errno || *endptr) return;
unsigned long x;
if (!strToUnsignedLong(value, &x)) return;

cfgMtcStatsDMaxLenSet(cfg, x);
}
Expand All @@ -661,10 +667,8 @@ void
cfgMtcPeriodSetFromStr(config_t* cfg, const char* value)
{
if (!cfg || !value) return;
errno = 0;
char* endptr = NULL;
unsigned long x = strtoul(value, &endptr, 10);
if (errno || *endptr) return;
unsigned long x;
if (!strToUnsignedLong(value, &x)) return;

cfgMtcPeriodSet(cfg, x);
}
Expand Down Expand Up @@ -703,10 +707,8 @@ void
cfgEvtRateLimitSetFromStr(config_t* cfg, const char* value)
{
if (!cfg || !value) return;
errno = 0;
char* endptr = NULL;
unsigned long x = strtoul(value, &endptr, 10);
if (errno || *endptr) return;
unsigned long x;
if (!strToUnsignedLong(value, &x)) return;

cfgEvtRateLimitSet(cfg, x);
}
Expand Down Expand Up @@ -757,10 +759,8 @@ void
cfgMtcVerbositySetFromStr(config_t* cfg, const char* value)
{
if (!cfg || !value) return;
errno = 0;
char* endptr = NULL;
unsigned long x = strtoul(value, &endptr, 10);
if (errno || *endptr) return;
unsigned long x;
if (!strToUnsignedLong(value, &x)) return;

cfgMtcVerbositySet(cfg, x);
}
Expand Down Expand Up @@ -857,6 +857,24 @@ cfgLogLevelSetFromStr(config_t* cfg, const char* value)
cfgLogLevelSet(cfg, strToVal(logLevelMap, value));
}

void
cfgLogBufThresholdSetFromStr(config_t* cfg, const char* value)
{
if (!cfg || !value) return;
unsigned long x;
if (!strToUnsignedLong(value, &x)) return;
cfgLogBufThresholdSet(cfg, x);
}

void
cfgLogFlushPeriodSetFromStr(config_t* cfg, const char* value)
{
if (!cfg || !value) return;
unsigned long x;
if (!strToUnsignedLong(value, &x)) return;
cfgLogFlushPeriodSet(cfg, x);
}

void
cfgPayEnableSetFromStr(config_t* cfg, const char* value)
{
Expand Down Expand Up @@ -947,6 +965,22 @@ processLevel(config_t* config, yaml_document_t* doc, yaml_node_t* node)
if (value) free(value);
}

static void
processBufThreshold(config_t* config, yaml_document_t* doc, yaml_node_t* node)
{
char* value = stringVal(node);
cfgLogBufThresholdSetFromStr(config, value);
if (value) free(value);
}

static void
processFlushPeriod(config_t* config, yaml_document_t* doc, yaml_node_t* node)
{
char* value = stringVal(node);
cfgLogFlushPeriodSetFromStr(config, value);
if (value) free(value);
}

static void
processTransportType(config_t* config, yaml_document_t* doc, yaml_node_t* node)
{
Expand Down Expand Up @@ -1086,6 +1120,8 @@ processLogging(config_t* config, yaml_document_t* doc, yaml_node_t* node)

parse_table_t t[] = {
{YAML_SCALAR_NODE, LEVEL_NODE, processLevel},
{YAML_SCALAR_NODE, BUF_THRESHOLD_NODE, processBufThreshold},
{YAML_SCALAR_NODE, FLUSH_PERIOD_NODE, processFlushPeriod},
{YAML_MAPPING_NODE, TRANSPORT_NODE, processTransportLog},
{YAML_NO_NODE, NULL, NULL}
};
Expand Down Expand Up @@ -2121,6 +2157,10 @@ createLogJson(config_t* cfg)
cJSON* transport;

if (!(root = cJSON_CreateObject())) goto err;
if (!cJSON_AddNumberToObjLN(root, BUF_THRESHOLD_NODE,
cfgLogBufThreshold(cfg))) goto err;
if (!cJSON_AddNumberToObjLN(root, FLUSH_PERIOD_NODE,
cfgLogFlushPeriod(cfg))) goto err;
if (!cJSON_AddStringToObjLN(root, LEVEL_NODE,
valToStr(logLevelMap, cfgLogLevel(cfg)))) goto err;

Expand Down Expand Up @@ -2549,7 +2589,7 @@ initEvtFormat(config_t *cfg)
ctl_t *
initCtl(config_t *cfg)
{
ctl_t *ctl = ctlCreate();
ctl_t *ctl = ctlCreate(cfg);
if (!ctl) return ctl;

/*
Expand Down
8 changes: 3 additions & 5 deletions src/ctl.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
#include "state.h"

#define FS_ENTRIES 1024
#define DEFAULT_LOG_MAX_AGG_BYTES 32768
#define DEFAULT_LOG_FLUSH_PERIOD_IN_MS 2000

#define CHANNEL "_channel"
#define ID "id"
Expand Down Expand Up @@ -570,7 +568,7 @@ ctlCreateTxMsg(upload_t *upld)
}

ctl_t *
ctlCreate()
ctlCreate(config_t *cfg)
{
ctl_t *ctl = calloc(1, sizeof(ctl_t));
if (!ctl) {
Expand All @@ -583,8 +581,8 @@ ctlCreate()
DBG(NULL);
goto err;
}
ctl->log.max_agg_bytes = DEFAULT_LOG_MAX_AGG_BYTES;
ctl->log.flush_period_in_ms = DEFAULT_LOG_FLUSH_PERIOD_IN_MS;
ctl->log.max_agg_bytes = cfgLogBufThreshold(cfg);
ctl->log.flush_period_in_ms = cfgLogFlushPeriod(cfg);

ctl->events = cbufInit(DEFAULT_CBUF_SIZE);
if (!ctl->events) {
Expand Down
2 changes: 1 addition & 1 deletion src/ctl.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ char * ctlCreateTxMsg(upload_t*);
typedef struct _ctl_t ctl_t;

// Constructors Destructors
ctl_t * ctlCreate();
ctl_t * ctlCreate(config_t *);
void ctlDestroy(ctl_t **);

// Raw Send (without messaging protocol)
Expand Down
6 changes: 6 additions & 0 deletions src/scope_static.c
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,12 @@ static const char scope_help_configuration[] =
" SCOPE_LOG_DEST\n"
" same format as SCOPE_METRIC_DEST above.\n"
" Default is file:///tmp/scope.log\n"
" SCOPE_LOG_BUFFER_THRESHOLD\n"
" threshold of flushing bytes from memory log queue.\n"
" Default is 32768 bytes.\n"
" SCOPE_LOG_FLUSH_PERIOD\n"
" period of flushing bytes from memory log queue.\n"
" Default is 2000 ms.\n"
" SCOPE_LOG_TLS_ENABLE\n"
" Flag to enable Transport Layer Security (TLS). Only affects\n"
" tcp:// destinations. true,false Default is false.\n"
Expand Down
2 changes: 2 additions & 0 deletions src/scopetypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ typedef unsigned int bool;
#define DEFAULT_LOG_PORT NULL
#define DEFAULT_LOG_PATH "/tmp/scope.log"
#define DEFAULT_LOG_BUF CFG_BUFFER_LINE
#define DEFAULT_LOG_MAX_AGG_BYTES 32768
#define DEFAULT_LOG_FLUSH_PERIOD_IN_MS 2000
#define DEFAULT_TLS_ENABLE FALSE
#define DEFAULT_TLS_VALIDATE_SERVER TRUE
#define DEFAULT_TLS_CA_CERT NULL
Expand Down
10 changes: 10 additions & 0 deletions src/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ strToVal(enum_map_t map[], const char *str)
return -1;
}

int
strToUnsignedLong(const char* str, unsigned long* val)
{
errno = 0;
char* endptr = NULL;
*val = strtoul(str, &endptr, 10);
if (errno || *endptr) return FALSE;
return TRUE;
}

const char *
valToStr(enum_map_t map[], unsigned int val)
{
Expand Down
1 change: 1 addition & 0 deletions src/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ typedef struct {

unsigned int strToVal(enum_map_t[], const char*);
const char* valToStr(enum_map_t[], unsigned int);
int strToUnsignedLong(const char* str, unsigned long* val);

int checkEnv(char *, char *);
int fullSetenv(const char *, const char *, int);
Expand Down
2 changes: 2 additions & 0 deletions test/cfgtest.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ verifyDefaults(config_t* config)
assert_null (cfgCustomTags(config));
assert_null (cfgCustomTagValue(config, "tagname"));
assert_int_equal (cfgLogLevel(config), DEFAULT_LOG_LEVEL);
assert_int_equal (cfgLogBufThreshold(config), DEFAULT_LOG_MAX_AGG_BYTES);
assert_int_equal (cfgLogFlushPeriod(config), DEFAULT_LOG_FLUSH_PERIOD_IN_MS);
assert_int_equal (cfgPayEnable(config), DEFAULT_PAYLOAD_ENABLE);
assert_string_equal (cfgPayDir(config), DEFAULT_PAYLOAD_DIR);
}
Expand Down
2 changes: 2 additions & 0 deletions test/cfgutilstest.c
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,8 @@ verifyDefaults(config_t* config)
assert_null (cfgCustomTags(config));
assert_null (cfgCustomTagValue(config, "tagname"));
assert_int_equal (cfgLogLevel(config), DEFAULT_LOG_LEVEL);
assert_int_equal (cfgLogBufThreshold(config), DEFAULT_LOG_MAX_AGG_BYTES);
assert_int_equal (cfgLogFlushPeriod(config), DEFAULT_LOG_FLUSH_PERIOD_IN_MS);
assert_int_equal (cfgPayEnable(config), DEFAULT_PAYLOAD_ENABLE);
assert_string_equal (cfgPayDir(config), DEFAULT_PAYLOAD_DIR);

Expand Down
Loading