From bfa4462f1347ede836bea176198824db48a2c05b Mon Sep 17 00:00:00 2001 From: Selutario Date: Mon, 11 Oct 2021 15:13:38 +0200 Subject: [PATCH 1/7] Add script to parse and obtain stats from cluster CSVs --- .../wazuh_testing/tools/performance/binary.py | 3 +- .../tools/performance/csv_parser.py | 222 ++++++++++++++++++ 2 files changed, 224 insertions(+), 1 deletion(-) create mode 100644 deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py diff --git a/deps/wazuh_testing/wazuh_testing/tools/performance/binary.py b/deps/wazuh_testing/wazuh_testing/tools/performance/binary.py index 8ccccdd93c..4ca63c5390 100644 --- a/deps/wazuh_testing/wazuh_testing/tools/performance/binary.py +++ b/deps/wazuh_testing/wazuh_testing/tools/performance/binary.py @@ -114,7 +114,8 @@ def unit_conversion(x): # Pre-initialize the info dictionary. If there's a problem while taking metrics of the binary (i.e. it crashed) # the CSV will set all its values to 0 to easily identify if there was a problem or not - info = {'Daemon': self.process_name, 'Version': self.version, 'Timestamp': datetime.now().strftime('%H:%M:%S'), + info = {'Daemon': self.process_name, 'Version': self.version, + 'Timestamp': datetime.now().strftime('%Y/%m/%d %H:%M:%S'), 'PID': self.pid, 'CPU(%)': 0.0, f'VMS({self.value_unit})': 0.0, f'RSS({self.value_unit})': 0.0, f'USS({self.value_unit})': 0.0, f'PSS({self.value_unit})': 0.0, f'SWAP({self.value_unit})': 0.0, 'FD': 0.0, 'Read_Ops': 0.0, 'Write_Ops': 0.0, diff --git a/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py b/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py new file mode 100644 index 0000000000..7f81232ef0 --- /dev/null +++ b/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py @@ -0,0 +1,222 @@ +# Copyright (C) 2015-2021, Wazuh Inc. +# Created by Wazuh, Inc. . +# This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 + +from collections import defaultdict +from glob import glob +from os.path import join +from re import compile + +import numpy as np +import pandas as pd + + +class ClusterCSVParser: + """Class to load and parse CSVs with data produced by the Wazuh cluster. + + Args: + artifacts_path (str): directory where the cluster CSVs can be found. + files_to_load (list): CSV filenames (without extension) that should be loaded. + + Attributes: + artifacts_path (str): directory where the cluster CSVs can be found. + files_to_load (list): CSV filenames (without extension) that should be loaded. + dataframes (dict): dictionary with dataframes obtained from the loaded CSV files. + """ + + SETUP_PHASE = 'setup_phase' + STABLE_PHASE = 'stable_phase' + + def __init__(self, artifacts_path, files_to_load): + self.artifacts_path = artifacts_path + self.files_to_load = files_to_load + self.dataframes = defaultdict(lambda: defaultdict(lambda: defaultdict(None))) + + self._load_dataframes() + + def _load_dataframes(self): + """Recursively iterate CSV files inside 'data' folders and store data as pandas dataframes. + + Files will be loaded as pandas dataframes and stored inside a dictionary that + looks like this: self.dataframes[type of data (logs/binaries)][node name][file name]. + When a file is found, it is only parsed if listed in self.files_to_load. + """ + node_file_regex = compile(r'.*/(master|worker_[\d]+)/.*/(.*)/(.*).csv') + + for csv in glob(join(self.artifacts_path, '*', 'data', '*', '*.csv')): + names = node_file_regex.search(csv) + if names.group(3) in self.files_to_load: + self.dataframes[names.group(2)][names.group(1)].update({names.group(3): pd.read_csv(csv)}) + + def get_setup_phase(self, node_name): + """Determine when the setup phase begins and ends. + + Args: + node_name (str): name of the node whose phase should be calculated. + + Returns: + tuple: start date, end date. + """ + sync_df = self.dataframes['logs'][node_name]['integrity_sync']['Timestamp'] + return sync_df[0], sync_df[len(sync_df)-1 if len(sync_df) > 1 else 1] + + def _trim_dataframe(self, df, phase, setup_datetime): + """Get the dataframe between two datetime. + + Args: + df (dataframe): original dataframe from which a subset will be obtained. + phase (str): name of the phase which data should be obtained. + setup_datetime (tuple): start and end datetime of the setup phase. + + Returns: + dataframe: subset of data between the dates chosen, according to the phase. + """ + if phase == self.SETUP_PHASE: + return df[(df['Timestamp'] >= setup_datetime[0]) & (df['Timestamp'] <= setup_datetime[1])] + else: + return df[(df['Timestamp'] > setup_datetime[1])] + + def _calculate_stats(self, df): + """Calculate statistics from a dataframe. + + Args: + df (dataframe): dataframe used to obtain stats. + + Raises: + NotImplementedError: should be implemented in child classes. + """ + raise NotImplementedError + + def _calculate_all_df_stats(self, dict_df): + """Iterate all dataframes and obtain statistics for each one. + + Args: + dict_df (dict): dict with dataframes to obtain stats from. + + Returns: + dict: dictionary with stats from each dataframe. + """ + result = defaultdict(lambda: defaultdict(dict)) + + for node, files in dict_df.items(): + setup_datetime = self.get_setup_phase(node) + for phase in [self.SETUP_PHASE, self.STABLE_PHASE]: + for file_name, file_df in files.items(): + trimmed_df = self._trim_dataframe(file_df, phase, setup_datetime) + if len(trimmed_df): + result[phase][file_name][node] = self._calculate_stats(trimmed_df) + + return result + + def get_max_stats(self): + """Group statistics by node, phase and file and get the maximum of each group. + + Raises: + NotImplementedError: should be implemented in child classes. + """ + raise NotImplementedError + + +class ClusterCSVTasksParser(ClusterCSVParser): + """Class to load, parse CSVs and obtain stats of cluster tasks. + + Args: + artifacts_path (str): directory where the cluster CSVs can be found. + + Attributes: + artifacts_path (str): directory where the cluster CSVs can be found. + """ + + def __init__(self, artifacts_path): + super().__init__(artifacts_path, files_to_load=['integrity_check', 'integrity_sync', 'agent-info_sync']) + + def _calculate_stats(self, df): + """Calculate mean of 'time_spent(s)' column from a dataframe. + + Args: + df (dataframe): dataframe to obtain the mean value from. + + Returns: + float: mean value for 'time_spent(s)' column. + """ + return df['time_spent(s)'].mean() + + def get_max_stats(self): + """Group statistics by type of node, phase and cluster task and get the maximum of each group. + + Returns: + dict: Maximum mean value for each phase, task and type of node. + """ + nodes_stats = self._calculate_all_df_stats(self.dataframes['logs']) + grouped_data = defaultdict(lambda: defaultdict(lambda: defaultdict(tuple))) + + for phase, tasks in nodes_stats.items(): + for task, nodes in tasks.items(): + for node, mean in nodes.items(): + if node == 'master': + grouped_data[phase][task][node] = (node, mean) + else: + if not grouped_data[phase][task]['workers'] or grouped_data[phase][task]['workers'][1] < mean: + grouped_data[phase][task]['workers'] = (node, mean) + + return grouped_data + + +class ClusterCSVResourcesParser(ClusterCSVParser): + """Class to load, parse CSVs and obtain stats of resources used by the 'wazuh-clusterd' process. + + Args: + artifacts_path (str): directory where the cluster CSVs can be found. + columns (list, optional): columns of the CSVs to obtain stats from. + + Attributes: + artifacts_path (str): directory where the cluster CSVs can be found. + columns (list): columns of the CSVs to obtain stats from. + """ + + def __init__(self, artifacts_path, columns=None): + if columns is None: + columns = ['USS(KB)', 'CPU(%)', 'FD'] + + self.columns = columns + super().__init__(artifacts_path, files_to_load=['wazuh-clusterd', 'integrity_sync']) + + def _calculate_stats(self, df): + """Calculate mean and regression coefficient of each column in self.columns from a dataframe. + + Args: + df (dataframe): dataframe to obtain the stats from. + + Returns: + dict: stats for each column. + """ + result = {} + for column in self.columns: + result[column] = {'mean': df[column].mean(), + 'reg_cof': np.polyfit(range(len(df)), list(df[column]), 1)[0]} + + return result + + def get_max_stats(self): + """Group statistics by type of node, phase and resource/column and get the maximum of each group. + + Returns: + dict: Maximum value for each phase, task, type of node and stat. + """ + nodes_stats = self._calculate_all_df_stats(self.dataframes['binaries']) + grouped_data = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict( + tuple))))) + + for phase, files in nodes_stats.items(): + for file, nodes in files.items(): + for node, resources in nodes.items(): + for resource, stats in resources.items(): + for stat, value in stats.items(): + if node == 'master': + grouped_data[phase][file][resource][node][stat] = (node, value) + else: + if not grouped_data[phase][file][resource]['workers'][stat] or \ + grouped_data[phase][file][resource]['workers'][stat][1] < value: + grouped_data[phase][file][resource]['workers'][stat] = (node, value) + + return grouped_data From 4e97633da6243001b7fe0b8d8244c1ab1f299554 Mon Sep 17 00:00:00 2001 From: Selutario Date: Wed, 13 Oct 2021 14:45:31 +0200 Subject: [PATCH 2/7] Convert defaultdicts to dicts. Minor changes. --- .../tools/performance/csv_parser.py | 95 ++++++++++--------- 1 file changed, 51 insertions(+), 44 deletions(-) diff --git a/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py b/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py index 7f81232ef0..7722323d4f 100644 --- a/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py +++ b/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py @@ -87,18 +87,18 @@ def _calculate_stats(self, df): """ raise NotImplementedError - def _calculate_all_df_stats(self, dict_df): + def _calculate_all_df_stats(self, dfs_dict): """Iterate all dataframes and obtain statistics for each one. Args: - dict_df (dict): dict with dataframes to obtain stats from. + dfs_dict (dict): dict with dataframes to obtain stats from. Returns: - dict: dictionary with stats from each dataframe. + defaultdict: dictionary with stats from each dataframe. """ result = defaultdict(lambda: defaultdict(dict)) - for node, files in dict_df.items(): + for node, files in dfs_dict.items(): setup_datetime = self.get_setup_phase(node) for phase in [self.SETUP_PHASE, self.STABLE_PHASE]: for file_name, file_df in files.items(): @@ -108,14 +108,49 @@ def _calculate_all_df_stats(self, dict_df): return result - def get_max_stats(self): - """Group statistics by node, phase and file and get the maximum of each group. + def _group_stats(self, dfs_dict): + """Group statistics by type of node, phase and column and get the maximum of each group. + + Args: + dfs_dict (dict): dict with dataframes to obtain stats from. + + Returns: + defaultdict: Maximum value for each phase, task, type of node (worker/master) and stat. + """ + nodes_stats = self._calculate_all_df_stats(dfs_dict) + grouped_data = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict( + tuple))))) + + for phase, files in nodes_stats.items(): + for file, nodes in files.items(): + for node, columns in nodes.items(): + for column, stats in columns.items(): + for stat, value in stats.items(): + if node == 'master': + grouped_data[phase][file][column][node][stat] = (node, value) + else: + if not grouped_data[phase][file][column]['workers'][stat] or \ + grouped_data[phase][file][column]['workers'][stat][1] < value: + grouped_data[phase][file][column]['workers'][stat] = (node, value) + + return grouped_data + + def get_stats(self): + """Get max stats after grouping by phase, task, type of node and stat. Raises: NotImplementedError: should be implemented in child classes. """ raise NotImplementedError + @staticmethod + def default_to_dict(default_dict): + """Convert defaultdict to dict.""" + if isinstance(default_dict, defaultdict): + default_dict = {k: ClusterCSVParser.default_to_dict(v) for k, v in default_dict.items()} + + return default_dict + class ClusterCSVTasksParser(ClusterCSVParser): """Class to load, parse CSVs and obtain stats of cluster tasks. @@ -137,29 +172,17 @@ def _calculate_stats(self, df): df (dataframe): dataframe to obtain the mean value from. Returns: - float: mean value for 'time_spent(s)' column. + dict: mean value for 'time_spent(s)' column. """ - return df['time_spent(s)'].mean() + return {'time_spent(s)': {'mean': df['time_spent(s)'].mean()}} - def get_max_stats(self): - """Group statistics by type of node, phase and cluster task and get the maximum of each group. + def get_stats(self): + """Get max stats after grouping by phase, task, type of node and stat. Returns: - dict: Maximum mean value for each phase, task and type of node. + dict: max stats obtained from cluster tasks. """ - nodes_stats = self._calculate_all_df_stats(self.dataframes['logs']) - grouped_data = defaultdict(lambda: defaultdict(lambda: defaultdict(tuple))) - - for phase, tasks in nodes_stats.items(): - for task, nodes in tasks.items(): - for node, mean in nodes.items(): - if node == 'master': - grouped_data[phase][task][node] = (node, mean) - else: - if not grouped_data[phase][task]['workers'] or grouped_data[phase][task]['workers'][1] < mean: - grouped_data[phase][task]['workers'] = (node, mean) - - return grouped_data + return self.default_to_dict(self._group_stats(self.dataframes['logs'])) class ClusterCSVResourcesParser(ClusterCSVParser): @@ -197,26 +220,10 @@ def _calculate_stats(self, df): return result - def get_max_stats(self): - """Group statistics by type of node, phase and resource/column and get the maximum of each group. + def get_stats(self): + """Get max stats after grouping by phase, task, type of node and stat. Returns: - dict: Maximum value for each phase, task, type of node and stat. + defaultdict: max stats obtained from cluster resources. """ - nodes_stats = self._calculate_all_df_stats(self.dataframes['binaries']) - grouped_data = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict( - tuple))))) - - for phase, files in nodes_stats.items(): - for file, nodes in files.items(): - for node, resources in nodes.items(): - for resource, stats in resources.items(): - for stat, value in stats.items(): - if node == 'master': - grouped_data[phase][file][resource][node][stat] = (node, value) - else: - if not grouped_data[phase][file][resource]['workers'][stat] or \ - grouped_data[phase][file][resource]['workers'][stat][1] < value: - grouped_data[phase][file][resource]['workers'][stat] = (node, value) - - return grouped_data + return self.default_to_dict(self._group_stats(self.dataframes['binaries'])) From 362300801d5102fedc5bdcbc92d6da37e6c31170 Mon Sep 17 00:00:00 2001 From: Selutario Date: Wed, 13 Oct 2021 14:53:42 +0200 Subject: [PATCH 3/7] Update docstring --- .../wazuh_testing/wazuh_testing/tools/performance/csv_parser.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py b/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py index 7722323d4f..a4c4dc4e9f 100644 --- a/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py +++ b/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py @@ -224,6 +224,6 @@ def get_stats(self): """Get max stats after grouping by phase, task, type of node and stat. Returns: - defaultdict: max stats obtained from cluster resources. + dict: max stats obtained from cluster resources. """ return self.default_to_dict(self._group_stats(self.dataframes['binaries'])) From 06b65e9003f051d3aff721f400464c097f412c90 Mon Sep 17 00:00:00 2001 From: Selutario Date: Wed, 20 Oct 2021 09:52:31 +0200 Subject: [PATCH 4/7] Add max field to stats calculation --- .../wazuh_testing/tools/performance/csv_parser.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py b/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py index a4c4dc4e9f..911527802d 100644 --- a/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py +++ b/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py @@ -58,7 +58,7 @@ def get_setup_phase(self, node_name): tuple: start date, end date. """ sync_df = self.dataframes['logs'][node_name]['integrity_sync']['Timestamp'] - return sync_df[0], sync_df[len(sync_df)-1 if len(sync_df) > 1 else 1] + return sync_df[0], sync_df[len(sync_df) - 1 if len(sync_df) > 1 else 1] def _trim_dataframe(self, df, phase, setup_datetime): """Get the dataframe between two datetime. @@ -174,7 +174,8 @@ def _calculate_stats(self, df): Returns: dict: mean value for 'time_spent(s)' column. """ - return {'time_spent(s)': {'mean': df['time_spent(s)'].mean()}} + return {'time_spent(s)': {'mean': df['time_spent(s)'].mean(), + 'max': df['time_spent(s)'].max()}} def get_stats(self): """Get max stats after grouping by phase, task, type of node and stat. @@ -216,6 +217,7 @@ def _calculate_stats(self, df): result = {} for column in self.columns: result[column] = {'mean': df[column].mean(), + 'max': df[column].max(), 'reg_cof': np.polyfit(range(len(df)), list(df[column]), 1)[0]} return result From 2de0be634d39c58156138c270196b1f4c962db3a Mon Sep 17 00:00:00 2001 From: Selutario Date: Thu, 21 Oct 2021 09:37:28 +0200 Subject: [PATCH 5/7] Add new ClusterEnvInfo class --- .../tools/performance/csv_parser.py | 66 ++++++++++++++++++- 1 file changed, 64 insertions(+), 2 deletions(-) diff --git a/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py b/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py index 911527802d..f574b17210 100644 --- a/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py +++ b/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py @@ -5,6 +5,7 @@ from collections import defaultdict from glob import glob from os.path import join +from pathlib import Path from re import compile import numpy as np @@ -38,12 +39,12 @@ def _load_dataframes(self): """Recursively iterate CSV files inside 'data' folders and store data as pandas dataframes. Files will be loaded as pandas dataframes and stored inside a dictionary that - looks like this: self.dataframes[type of data (logs/binaries)][node name][file name]. + looks like this: self.dataframes[type of data][node name][file name]. When a file is found, it is only parsed if listed in self.files_to_load. """ node_file_regex = compile(r'.*/(master|worker_[\d]+)/.*/(.*)/(.*).csv') - for csv in glob(join(self.artifacts_path, '*', 'data', '*', '*.csv')): + for csv in glob(join(self.artifacts_path, '*', '*', '*', '*.csv')): names = node_file_regex.search(csv) if names.group(3) in self.files_to_load: self.dataframes[names.group(2)][names.group(1)].update({names.group(3): pd.read_csv(csv)}) @@ -229,3 +230,64 @@ def get_stats(self): dict: max stats obtained from cluster resources. """ return self.default_to_dict(self._group_stats(self.dataframes['binaries'])) + + +class ClusterEnvInfo: + """Class to obtain information from cluster artifacts files. + + Args: + artifacts_path (str): directory where the cluster data (nodes, logs, CSVs, etc.) can be found. + + Attributes: + artifacts_path (str): directory where the cluster data (nodes, logs, CSVs, etc.) can be found. + """ + + def __init__(self, artifacts_path): + self.artifacts_path = artifacts_path + + def get_file_timestamps(self, node='master', file='integrity_sync.csv'): + """Get first and last datetime of lines inside a specific file. + + Args: + node (str): node folder from which the information should be retrieved. + file (str): filename in any nested level inside 'node' from which the information should be retrieved. + + Returns: + list: first and last datetime found. + """ + result = [] + node_file_regex = compile(r'(\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d).*') + + with open(next(Path(join(self.artifacts_path, node)).rglob(file), '')) as f: + data = f.readlines() + result.extend(node_file_regex.findall(data[1])) + result.extend(node_file_regex.findall(data[-1])) + + return result + + def count_workers_nodes(self): + """Count how many worker folders there are in the artifacts. + + Returns: + int: number of workers in the cluster artifacts. + """ + return len(glob(join(self.artifacts_path, 'worker_*'))) + + def get_phases(self): + """Get start and end datetime for setup and stable phases. + + Returns: + dict: start and end datetime for setup and stable phases. + """ + setup_phase = self.get_file_timestamps() + stable_phase = [setup_phase[1], self.get_file_timestamps(file='integrity_check.csv')[1]] + + return {'setup_phase': setup_phase, 'stable_phase': stable_phase} + + def get_all_info(self): + """Get all info from cluster artifacts. + + Returns: + dict: start and end datetime for each phase and number of workers. + """ + return {'phases': self.get_phases(), 'worker_nodes': self.count_workers_nodes()} From 727573081137835eea8ec0197b07d9dcb96c9e87 Mon Sep 17 00:00:00 2001 From: Selutario Date: Fri, 22 Oct 2021 13:33:27 +0200 Subject: [PATCH 6/7] Use master logs to define when setup phase starts for every node --- .../wazuh_testing/wazuh_testing/tools/performance/csv_parser.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py b/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py index f574b17210..ba6cd8903a 100644 --- a/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py +++ b/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py @@ -98,9 +98,9 @@ def _calculate_all_df_stats(self, dfs_dict): defaultdict: dictionary with stats from each dataframe. """ result = defaultdict(lambda: defaultdict(dict)) + setup_datetime = self.get_setup_phase('master') for node, files in dfs_dict.items(): - setup_datetime = self.get_setup_phase(node) for phase in [self.SETUP_PHASE, self.STABLE_PHASE]: for file_name, file_df in files.items(): trimmed_df = self._trim_dataframe(file_df, phase, setup_datetime) From c7ba26a4c1ff90aee7f1e8313b13a733705084df Mon Sep 17 00:00:00 2001 From: Selutario Date: Wed, 27 Oct 2021 10:09:12 +0200 Subject: [PATCH 7/7] Use ternary operators --- .../wazuh_testing/tools/performance/csv_parser.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py b/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py index ba6cd8903a..290a6e3084 100644 --- a/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py +++ b/deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py @@ -200,10 +200,7 @@ class ClusterCSVResourcesParser(ClusterCSVParser): """ def __init__(self, artifacts_path, columns=None): - if columns is None: - columns = ['USS(KB)', 'CPU(%)', 'FD'] - - self.columns = columns + self.columns = ['USS(KB)', 'CPU(%)', 'FD'] if columns is None else columns super().__init__(artifacts_path, files_to_load=['wazuh-clusterd', 'integrity_sync']) def _calculate_stats(self, df):