Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add script to parse and obtain stats from cluster CSVs #2032

Merged
merged 7 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion deps/wazuh_testing/wazuh_testing/tools/performance/binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ def unit_conversion(x):

# Pre-initialize the info dictionary. If there's a problem while taking metrics of the binary (i.e. it crashed)
# the CSV will set all its values to 0 to easily identify if there was a problem or not
info = {'Daemon': self.process_name, 'Version': self.version, 'Timestamp': datetime.now().strftime('%H:%M:%S'),
info = {'Daemon': self.process_name, 'Version': self.version,
'Timestamp': datetime.now().strftime('%Y/%m/%d %H:%M:%S'),
'PID': self.pid, 'CPU(%)': 0.0, f'VMS({self.value_unit})': 0.0, f'RSS({self.value_unit})': 0.0,
f'USS({self.value_unit})': 0.0, f'PSS({self.value_unit})': 0.0,
f'SWAP({self.value_unit})': 0.0, 'FD': 0.0, 'Read_Ops': 0.0, 'Write_Ops': 0.0,
Expand Down
290 changes: 290 additions & 0 deletions deps/wazuh_testing/wazuh_testing/tools/performance/csv_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
# Copyright (C) 2015-2021, Wazuh Inc.
# Created by Wazuh, Inc. <[email protected]>.
# This program is free software; you can redistribute it and/or modify it under the terms of GPLv2

from collections import defaultdict
from glob import glob
from os.path import join
from pathlib import Path
from re import compile

import numpy as np
import pandas as pd


class ClusterCSVParser:
"""Class to load and parse CSVs with data produced by the Wazuh cluster.

Args:
artifacts_path (str): directory where the cluster CSVs can be found.
files_to_load (list): CSV filenames (without extension) that should be loaded.

Attributes:
artifacts_path (str): directory where the cluster CSVs can be found.
files_to_load (list): CSV filenames (without extension) that should be loaded.
dataframes (dict): dictionary with dataframes obtained from the loaded CSV files.
"""

SETUP_PHASE = 'setup_phase'
STABLE_PHASE = 'stable_phase'

def __init__(self, artifacts_path, files_to_load):
self.artifacts_path = artifacts_path
self.files_to_load = files_to_load
self.dataframes = defaultdict(lambda: defaultdict(lambda: defaultdict(None)))

self._load_dataframes()

def _load_dataframes(self):
"""Recursively iterate CSV files inside 'data' folders and store data as pandas dataframes.

Files will be loaded as pandas dataframes and stored inside a dictionary that
looks like this: self.dataframes[type of data][node name][file name].
When a file is found, it is only parsed if listed in self.files_to_load.
"""
node_file_regex = compile(r'.*/(master|worker_[\d]+)/.*/(.*)/(.*).csv')

for csv in glob(join(self.artifacts_path, '*', '*', '*', '*.csv')):
names = node_file_regex.search(csv)
if names.group(3) in self.files_to_load:
self.dataframes[names.group(2)][names.group(1)].update({names.group(3): pd.read_csv(csv)})

def get_setup_phase(self, node_name):
"""Determine when the setup phase begins and ends.

Args:
node_name (str): name of the node whose phase should be calculated.

Returns:
tuple: start date, end date.
"""
sync_df = self.dataframes['logs'][node_name]['integrity_sync']['Timestamp']
return sync_df[0], sync_df[len(sync_df) - 1 if len(sync_df) > 1 else 1]

def _trim_dataframe(self, df, phase, setup_datetime):
"""Get the dataframe between two datetime.

Args:
df (dataframe): original dataframe from which a subset will be obtained.
phase (str): name of the phase which data should be obtained.
setup_datetime (tuple): start and end datetime of the setup phase.

Returns:
dataframe: subset of data between the dates chosen, according to the phase.
"""
if phase == self.SETUP_PHASE:
return df[(df['Timestamp'] >= setup_datetime[0]) & (df['Timestamp'] <= setup_datetime[1])]
else:
return df[(df['Timestamp'] > setup_datetime[1])]

def _calculate_stats(self, df):
"""Calculate statistics from a dataframe.

Args:
df (dataframe): dataframe used to obtain stats.

Raises:
NotImplementedError: should be implemented in child classes.
"""
raise NotImplementedError

def _calculate_all_df_stats(self, dfs_dict):
"""Iterate all dataframes and obtain statistics for each one.

Args:
dfs_dict (dict): dict with dataframes to obtain stats from.

Returns:
defaultdict: dictionary with stats from each dataframe.
"""
result = defaultdict(lambda: defaultdict(dict))
setup_datetime = self.get_setup_phase('master')

for node, files in dfs_dict.items():
for phase in [self.SETUP_PHASE, self.STABLE_PHASE]:
for file_name, file_df in files.items():
trimmed_df = self._trim_dataframe(file_df, phase, setup_datetime)
if len(trimmed_df):
result[phase][file_name][node] = self._calculate_stats(trimmed_df)

return result

def _group_stats(self, dfs_dict):
"""Group statistics by type of node, phase and column and get the maximum of each group.

Args:
dfs_dict (dict): dict with dataframes to obtain stats from.

Returns:
defaultdict: Maximum value for each phase, task, type of node (worker/master) and stat.
"""
nodes_stats = self._calculate_all_df_stats(dfs_dict)
grouped_data = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(
tuple)))))

for phase, files in nodes_stats.items():
for file, nodes in files.items():
for node, columns in nodes.items():
for column, stats in columns.items():
for stat, value in stats.items():
if node == 'master':
grouped_data[phase][file][column][node][stat] = (node, value)
else:
if not grouped_data[phase][file][column]['workers'][stat] or \
grouped_data[phase][file][column]['workers'][stat][1] < value:
grouped_data[phase][file][column]['workers'][stat] = (node, value)

return grouped_data

def get_stats(self):
"""Get max stats after grouping by phase, task, type of node and stat.

Raises:
NotImplementedError: should be implemented in child classes.
"""
raise NotImplementedError

@staticmethod
def default_to_dict(default_dict):
"""Convert defaultdict to dict."""
if isinstance(default_dict, defaultdict):
default_dict = {k: ClusterCSVParser.default_to_dict(v) for k, v in default_dict.items()}

return default_dict


class ClusterCSVTasksParser(ClusterCSVParser):
"""Class to load, parse CSVs and obtain stats of cluster tasks.

Args:
artifacts_path (str): directory where the cluster CSVs can be found.

Attributes:
artifacts_path (str): directory where the cluster CSVs can be found.
"""

def __init__(self, artifacts_path):
super().__init__(artifacts_path, files_to_load=['integrity_check', 'integrity_sync', 'agent-info_sync'])

def _calculate_stats(self, df):
"""Calculate mean of 'time_spent(s)' column from a dataframe.

Args:
df (dataframe): dataframe to obtain the mean value from.

Returns:
dict: mean value for 'time_spent(s)' column.
"""
return {'time_spent(s)': {'mean': df['time_spent(s)'].mean(),
'max': df['time_spent(s)'].max()}}

def get_stats(self):
"""Get max stats after grouping by phase, task, type of node and stat.

Returns:
dict: max stats obtained from cluster tasks.
"""
return self.default_to_dict(self._group_stats(self.dataframes['logs']))


class ClusterCSVResourcesParser(ClusterCSVParser):
"""Class to load, parse CSVs and obtain stats of resources used by the 'wazuh-clusterd' process.

Args:
artifacts_path (str): directory where the cluster CSVs can be found.
columns (list, optional): columns of the CSVs to obtain stats from.

Attributes:
artifacts_path (str): directory where the cluster CSVs can be found.
columns (list): columns of the CSVs to obtain stats from.
"""

def __init__(self, artifacts_path, columns=None):
self.columns = ['USS(KB)', 'CPU(%)', 'FD'] if columns is None else columns
super().__init__(artifacts_path, files_to_load=['wazuh-clusterd', 'integrity_sync'])

def _calculate_stats(self, df):
"""Calculate mean and regression coefficient of each column in self.columns from a dataframe.

Args:
df (dataframe): dataframe to obtain the stats from.

Returns:
dict: stats for each column.
"""
result = {}
for column in self.columns:
result[column] = {'mean': df[column].mean(),
'max': df[column].max(),
'reg_cof': np.polyfit(range(len(df)), list(df[column]), 1)[0]}

return result

def get_stats(self):
"""Get max stats after grouping by phase, task, type of node and stat.

Returns:
dict: max stats obtained from cluster resources.
"""
return self.default_to_dict(self._group_stats(self.dataframes['binaries']))


class ClusterEnvInfo:
"""Class to obtain information from cluster artifacts files.

Args:
artifacts_path (str): directory where the cluster data (nodes, logs, CSVs, etc.) can be found.

Attributes:
artifacts_path (str): directory where the cluster data (nodes, logs, CSVs, etc.) can be found.
"""

def __init__(self, artifacts_path):
self.artifacts_path = artifacts_path

def get_file_timestamps(self, node='master', file='integrity_sync.csv'):
"""Get first and last datetime of lines inside a specific file.

Args:
node (str): node folder from which the information should be retrieved.
file (str): filename in any nested level inside 'node' from which the information should be retrieved.

Returns:
list: first and last datetime found.
"""
result = []
node_file_regex = compile(r'(\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d).*')

with open(next(Path(join(self.artifacts_path, node)).rglob(file), '')) as f:
data = f.readlines()
result.extend(node_file_regex.findall(data[1]))
result.extend(node_file_regex.findall(data[-1]))

return result

def count_workers_nodes(self):
"""Count how many worker folders there are in the artifacts.

Returns:
int: number of workers in the cluster artifacts.
"""
return len(glob(join(self.artifacts_path, 'worker_*')))

def get_phases(self):
"""Get start and end datetime for setup and stable phases.

Returns:
dict: start and end datetime for setup and stable phases.
"""
setup_phase = self.get_file_timestamps()
stable_phase = [setup_phase[1], self.get_file_timestamps(file='integrity_check.csv')[1]]

return {'setup_phase': setup_phase, 'stable_phase': stable_phase}

def get_all_info(self):
"""Get all info from cluster artifacts.

Returns:
dict: start and end datetime for each phase and number of workers.
"""
return {'phases': self.get_phases(), 'worker_nodes': self.count_workers_nodes()}