Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix csv reader when line is too long #91

Merged
merged 4 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions libCacheSim/bin/traceUtils/traceConv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ void convert_to_oracleGeneral(reader_t *reader, std::string ofilepath,
(double)unique_bytes / GiB);
}

if (n_req_curr > n_req_total * 2) {
ERROR("n_req_curr (%ld) > n_req_total (%ld)\n", n_req_curr,
n_req_total);
}

if (read_one_req_above(reader, req) != 0) {
break;
}
Expand Down
26 changes: 13 additions & 13 deletions libCacheSim/include/libCacheSim/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ extern pthread_mutex_t log_mtx;
do { \
pthread_mutex_lock(&log_mtx); \
log_header(level, __FILE__, __LINE__); \
printf(FMT, ##__VA_ARGS__); \
printf("%s", NORMAL); \
fflush(stdout); \
fprintf(stderr, FMT, ##__VA_ARGS__); \
fprintf(stderr, "%s", NORMAL); \
fflush(stderr); \
pthread_mutex_unlock(&log_mtx); \
} while (0)

Expand Down Expand Up @@ -111,28 +111,28 @@ static inline void log_header(int level, const char *file, int line) {

switch (level) {
case VVVERBOSE_LEVEL:
printf("%s[VVV] ", CYAN);
fprintf(stderr, "%s[VVV] ", CYAN);
break;
case VVERBOSE_LEVEL:
printf("%s[VV] ", CYAN);
fprintf(stderr, "%s[VV] ", CYAN);
break;
case VERBOSE_LEVEL:
printf("%s[VERB] ", MAGENTA);
fprintf(stderr, "%s[VERB] ", MAGENTA);
break;
case DEBUG_LEVEL:
printf("%s[DEBUG] ", CYAN);
fprintf(stderr, "%s[DEBUG] ", CYAN);
break;
case INFO_LEVEL:
printf("%s[INFO] ", GREEN);
fprintf(stderr, "%s[INFO] ", GREEN);
break;
case WARN_LEVEL:
printf("%s[WARN] ", YELLOW);
fprintf(stderr, "%s[WARN] ", YELLOW);
break;
case SEVERE_LEVEL:
printf("%s[ERROR] ", RED);
fprintf(stderr, "%s[ERROR] ", RED);
break;
default:
printf("in logging should not be here\n");
fprintf(stderr, "in logging should not be here\n");
break;
}

Expand All @@ -144,8 +144,8 @@ static inline void log_header(int level, const char *file, int line) {
curtime = tv.tv_sec;
strftime(buffer, 30, "%m-%d-%Y %T", localtime(&curtime));

printf("%s %8s:%-4d ", buffer, strrchr(file, '/') + 1, line);
printf("(tid=%zu): ", (unsigned long)pthread_self());
fprintf(stderr, "%s %8s:%-4d ", buffer, strrchr(file, '/') + 1, line);
fprintf(stderr, "(tid=%zu): ", (unsigned long)pthread_self());
}

#ifdef __cplusplus
Expand Down
6 changes: 2 additions & 4 deletions libCacheSim/traceReader/generalReader/csv.c
Original file line number Diff line number Diff line change
Expand Up @@ -200,21 +200,19 @@ static inline void csv_cb1(void *s, size_t len, void *data) {
if (reader->obj_id_is_num) {
req->obj_id = strtoull((char *)s, &end, 0);
if (req->obj_id == 0 && s == end) {
WARN("object id is not numeric %s\n", (char *)s);
WARN("object id is not numeric: \"%s\"\n", (char *)s);
}
} else {
// req->obj_id = (uint64_t)g_quark_from_string(s);
req->obj_id = (uint64_t)get_hash_value_str((char *)s, len);
}
} else if (csv_params->curr_field_idx == csv_params->time_field_idx) {
// this does not work, because s is not null terminated
uint64_t ts = (uint64_t)atof((char *)s);
// uint64_t ts = (uint64_t)strtod((char *)s, &end);
req->clock_time = ts;
} else if (csv_params->curr_field_idx == csv_params->obj_size_field_idx) {
req->obj_size = (uint32_t)strtoul((char *)s, &end, 0);
if (req->obj_size == 0 && end == s) {
ERROR("csvReader obj_size is not a number: \"%s\"\n", (char *)s);
WARN("csvReader obj_size is not a number: \"%s\"\n", (char *)s);
}
} else if (csv_params->curr_field_idx == csv_params->cnt_field_idx) {
reader->n_req_left = (uint64_t)strtoull((char *)s, &end, 0) - 1;
Expand Down
2 changes: 1 addition & 1 deletion libCacheSim/traceReader/generalReader/readerInternal.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
extern "C" {
#endif

#define MAX_LINE_LEN (1024 * 1)
#define PER_SEEK_SIZE 1024
#define MAX_OBJ_ID_LEN 256

/**************** common ****************/
Expand Down
119 changes: 53 additions & 66 deletions libCacheSim/traceReader/reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include "customizedReader/akamaiBin.h"
#include "customizedReader/cf1Bin.h"
#include "customizedReader/oracle/oracleAkamaiBin.h"
#include "customizedReader/valpinBin.h"
#include "customizedReader/oracle/oracleCF1Bin.h"
#include "customizedReader/oracle/oracleGeneralBin.h"
#include "customizedReader/oracle/oracleTwrBin.h"
Expand All @@ -21,6 +20,7 @@
#include "customizedReader/standardBin.h"
#include "customizedReader/twrBin.h"
#include "customizedReader/twrNSBin.h"
#include "customizedReader/valpinBin.h"
#include "customizedReader/vscsi.h"
#include "customizedReader/wikiBin.h"
#include "generalReader/lcs.h"
Expand All @@ -38,8 +38,7 @@ extern "C" {
#define FILE_COMMA 0x2c
#define FILE_QUOTE 0x22

reader_t *setup_reader(const char *const trace_path,
const trace_type_e trace_type,
reader_t *setup_reader(const char *const trace_path, const trace_type_e trace_type,
const reader_init_param_t *const init_params) {
static bool _info_printed = false;

Expand All @@ -54,8 +53,7 @@ reader_t *setup_reader(const char *const trace_path,
reader->zstd_reader_p = NULL;
#ifdef SUPPORT_ZSTD_TRACE
size_t slen = strlen(trace_path);
if (strncmp(trace_path + (slen - 4), ".zst", 4) == 0 ||
strncmp(trace_path + (slen - 7), ".zst.22", 7) == 0) {
if (strncmp(trace_path + (slen - 4), ".zst", 4) == 0 || strncmp(trace_path + (slen - 7), ".zst.22", 7) == 0) {
reader->is_zstd_file = true;
reader->zstd_reader_p = create_zstd_reader(trace_path);
if (!_info_printed) {
Expand Down Expand Up @@ -83,17 +81,15 @@ reader_t *setup_reader(const char *const trace_path,

if (init_params != NULL) {
memcpy(&reader->init_params, init_params, sizeof(reader_init_param_t));
if (init_params->binary_fmt_str != NULL)
reader->init_params.binary_fmt_str = strdup(init_params->binary_fmt_str);
if (init_params->binary_fmt_str != NULL) reader->init_params.binary_fmt_str = strdup(init_params->binary_fmt_str);

reader->ignore_obj_size = init_params->ignore_obj_size;
reader->ignore_size_zero_req = init_params->ignore_size_zero_req;
reader->obj_id_is_num = init_params->obj_id_is_num;
reader->trace_start_offset = init_params->trace_start_offset;
reader->mmap_offset = init_params->trace_start_offset;
reader->cap_at_n_req = init_params->cap_at_n_req;
if (init_params->sampler != NULL)
reader->sampler = init_params->sampler->clone(init_params->sampler);
if (init_params->sampler != NULL) reader->sampler = init_params->sampler->clone(init_params->sampler);
} else {
memset(&reader->init_params, 0, sizeof(reader_init_param_t));
}
Expand All @@ -113,15 +109,14 @@ reader_t *setup_reader(const char *const trace_path,
}
reader->file_size = st.st_size;

if (reader->trace_type == CSV_TRACE ||
reader->trace_type == PLAIN_TXT_TRACE) {
if (reader->trace_type == CSV_TRACE || reader->trace_type == PLAIN_TXT_TRACE) {
reader->file = fopen(reader->trace_path, "rb");
if (reader->file == 0) {
ERROR("Failed to open %s: %s\n", reader->trace_path, strerror(errno));
exit(1);
}

reader->line_buf_size = MAX_LINE_LEN;
reader->line_buf_size = PER_SEEK_SIZE;
reader->line_buf = (char *)malloc(reader->line_buf_size);
} else {
// set up mmap region
Expand All @@ -137,8 +132,7 @@ reader_t *setup_reader(const char *const trace_path,
if ((reader->mapped_file) == MAP_FAILED) {
close(fd);
reader->mapped_file = NULL;
ERROR("Unable to allocate %llu bytes of memory, %s\n",
(unsigned long long)st.st_size, strerror(errno));
ERROR("Unable to allocate %llu bytes of memory, %s\n", (unsigned long long)st.st_size, strerror(errno));
abort();
}
}
Expand All @@ -148,8 +142,7 @@ reader_t *setup_reader(const char *const trace_path,
reader->trace_format = TXT_TRACE_FORMAT;
csv_setup_reader(reader);
if (!check_delimiter(reader, init_params->delimiter)) {
ERROR("The trace does not use delimiter '%c', please check\n",
init_params->delimiter);
ERROR("The trace does not use delimiter '%c', please check\n", init_params->delimiter);
}
break;
case PLAIN_TXT_TRACE:
Expand Down Expand Up @@ -238,9 +231,7 @@ reader_t *setup_reader(const char *const trace_path,
WARN(
"trace file size %lu - %lu is not multiple of item size %lu, mod "
"%lu\n",
(unsigned long)reader->file_size,
(unsigned long)reader->trace_start_offset,
(unsigned long)reader->item_size,
(unsigned long)reader->file_size, (unsigned long)reader->trace_start_offset, (unsigned long)reader->item_size,
(unsigned long)reader->file_size % reader->item_size);
}

Expand Down Expand Up @@ -273,15 +264,14 @@ reader_t *setup_reader(const char *const trace_path,
*/
int read_one_req(reader_t *const reader, request_t *const req) {
if (reader->mmap_offset >= reader->file_size) {
DEBUG("read_one_req: end of file, current mmap_offset %zu, file size %zu\n",
reader->mmap_offset, reader->file_size);
DEBUG("read_one_req: end of file, current mmap_offset %zu, file size %zu\n", reader->mmap_offset,
reader->file_size);
req->valid = false;
return 1;
}

if (reader->cap_at_n_req > 1 && reader->n_read_req >= reader->cap_at_n_req) {
DEBUG("read_one_req: processed %ld requests capped by the user\n",
(long) reader->n_read_req);
DEBUG("read_one_req: processed %ld requests capped by the user\n", (long)reader->n_read_req);
req->valid = false;
return 1;
}
Expand Down Expand Up @@ -392,8 +382,8 @@ int read_one_req(reader_t *const reader, request_t *const req) {
sampler_t *sampler = reader->sampler;
reader->sampler = NULL;
while (!sampler->sample(sampler, req)) {
VVERBOSE("skip one req: time %lu, obj_id %lu, size %lu at offset %zu\n",
req->clock_time, req->obj_id, req->obj_size, offset_before_read);
VVERBOSE("skip one req: time %lu, obj_id %lu, size %lu at offset %zu\n", req->clock_time, req->obj_id,
req->obj_size, offset_before_read);
if (reader->read_direction == READ_FORWARD) {
status = read_one_req(reader, req);
} else {
Expand All @@ -411,8 +401,8 @@ int read_one_req(reader_t *const reader, request_t *const req) {
req->obj_size = 1;
}

VVERBOSE("read one req: time %lu, obj_id %lu, size %lu at offset %zu\n",
req->clock_time, req->obj_id, req->obj_size, offset_before_read);
VVERBOSE("read one req: time %lu, obj_id %lu, size %lu at offset %zu\n", req->clock_time, req->obj_id, req->obj_size,
offset_before_read);

return status;
}
Expand All @@ -433,46 +423,46 @@ int go_back_one_req(reader_t *const reader) {
return 1;
}

ssize_t seek_size =
MAX_LINE_LEN - 1 > curr_offset - reader->trace_start_offset
? curr_offset - reader->trace_start_offset
: MAX_LINE_LEN - 1;
VVERBOSE("go_back_one_req prev pos %ld, seek size %zu\n",
ftell(reader->file), seek_size);

fseek(reader->file, -seek_size, SEEK_CUR);
/* do not read the current pos */
int _read_size = fread(reader->line_buf, seek_size - 1, 1, reader->file);
reader->line_buf[seek_size - 1] = 0;
char *last_line_end = strrchr(reader->line_buf, '\n');
if (last_line_end == NULL) {
if (seek_size < MAX_LINE_LEN - 1) {
fseek(reader->file, reader->trace_start_offset, SEEK_SET);
}
if (curr_offset == reader->trace_start_offset) {
/* this happens when reverse reading reaches the start of the file */
DEBUG(
"go_back_one_req cannot find the request above, set offset to "
"trace start %zu\n",
ftell(reader->file));

// if curr_offset is 0, we were at the start of the file before seek,
// so we return 1; otherwise, we return 0 because we seek to the start
return 1;
ssize_t max_seek_size = curr_offset - reader->trace_start_offset;
ssize_t seek_size = 0;
ssize_t total_seek_size = 0;
bool found = false;
while (!found && total_seek_size < max_seek_size) {
seek_size = MIN(PER_SEEK_SIZE, max_seek_size - total_seek_size);
total_seek_size += seek_size;
fseek(reader->file, -seek_size, SEEK_CUR);
ssize_t read_size = fread(reader->line_buf, 1, seek_size - 1, reader->file);
reader->line_buf[read_size - 1] = 0;
char *last_line_end = strrchr(reader->line_buf, '\n');

if (last_line_end == NULL) {
// three possible cases
// 1. reach the trace start
// 2. line is too long and has not found \n
// 3. error case
if (seek_size < PER_SEEK_SIZE) {
// case 1 reach the trace start
fseek(reader->file, reader->trace_start_offset, SEEK_SET);

return 0;
} else if (total_seek_size < max_seek_size) {
// case 2 line is too long and has not found \n
fseek(reader->file, -PER_SEEK_SIZE, SEEK_CUR);

} else {
WARN("go_back_one_req cannot find the request above\n");
return 1;
}
} else {
found = true;
int pos = last_line_end + 2 - reader->line_buf;
fseek(reader->file, -(seek_size - pos), SEEK_CUR);
return 0;
}
return curr_offset == 0 ? 1 : 0;
}
int pos = last_line_end + 2 - reader->line_buf;
fseek(reader->file, -(seek_size - pos), SEEK_CUR);

VVERBOSE("go_back_one_req after pos %ld\n", ftell(reader->file));
return 0;

case BINARY_TRACE_FORMAT:
if (reader->mmap_offset >=
reader->trace_start_offset + reader->item_size) {
if (reader->mmap_offset >= reader->trace_start_offset + reader->item_size) {
reader->mmap_offset -= (reader->item_size);
return 0;
} else {
Expand Down Expand Up @@ -602,8 +592,7 @@ uint64_t get_num_of_req(reader_t *const reader) {
}

reader_t *clone_reader(const reader_t *const reader_in) {
reader_t *reader = setup_reader(reader_in->trace_path, reader_in->trace_type,
&reader_in->init_params);
reader_t *reader = setup_reader(reader_in->trace_path, reader_in->trace_type, &reader_in->init_params);
reader->n_total_req = reader_in->n_total_req;

if (reader->trace_format != TXT_TRACE_FORMAT) {
Expand Down Expand Up @@ -720,9 +709,7 @@ void read_last_req(reader_t *reader, request_t *req) {

bool is_str_num(const char *str) {
for (int i = 0; i < strlen(str); i++) {
if (!(isdigit(str[i]) || (str[i] >= 'a' && str[i] <= 'f') ||
(str[i] >= 'A' && str[i] <= 'F')))
return false;
if (!(isdigit(str[i]) || (str[i] >= 'a' && str[i] <= 'f') || (str[i] >= 'A' && str[i] <= 'F'))) return false;
}
return true;
}
Expand Down
Loading