From a745ce1a6b2b17f6ceacc26f9cf97163d1e261d2 Mon Sep 17 00:00:00 2001 From: jmv74211 Date: Thu, 24 Jun 2021 10:35:53 +0200 Subject: [PATCH 1/4] Make a rework of simulate_agents scripts #1471 Now this script has two modes: - Mode 1: In this mode, you specify the number of agents and some EPS per module. Each one of the agents will send the specified amount of EPS for each module. This will be the default behavior - Mode 2: All the EPS of the specified modules will be distributed among different agents, which will be self-calculated. For this, a new flag -b and -i must be specified to indicate that the balanced mode will be used, and -i to indicate the agent-EPS ratio. --- .../wazuh_testing/scripts/simulate_agents.py | 368 ++++++++++++------ 1 file changed, 249 insertions(+), 119 deletions(-) diff --git a/deps/wazuh_testing/wazuh_testing/scripts/simulate_agents.py b/deps/wazuh_testing/wazuh_testing/scripts/simulate_agents.py index 4faac02d7d..35ec0f9062 100644 --- a/deps/wazuh_testing/wazuh_testing/scripts/simulate_agents.py +++ b/deps/wazuh_testing/wazuh_testing/scripts/simulate_agents.py @@ -9,100 +9,265 @@ logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(f"P{os.getpid()}") -def run_agents(agents_number=1, manager_address='localhost', protocol=TCP, agent_version='v4.0.0', - agent_os='debian8', eps=1000, run_duration=20, active_modules=[], modules_eps=None, - fixed_message_size=None, registration_address=None, labels=None): - """Run a batch of agents connected to a manager with the same parameters. + +def parse_custom_labels(labels): + """Parse the wazuh labels from string list to dict. Args: - agents_number (int): Number of agents to run. - manager_address (str): Manager address (hostname or IP). - protocol (str): Communication protocol. - agent_version (str): Agents version p.e: v4.0.0 - agent_os (str): Agents os, some examples: debian8, ubuntu18.04, mojave... - eps (int): Total events per second sent by each agent to the manager. - run_duration (int): Agent life time. - active_modules (list): list with active modules names. - modules_eps (list): list with eps for each active modules. - fixed_message_size (int): size in bytes for the message. - registration_address (str): Manager IP address where the agent will be registered. - labels (dict): Wazuh agent labels in dict format. + labels (list): Labels in format ["key1:value1", "key2:value2"] + + Returns: + dict: Labels dictionary. {key1:value1, key2:value2} """ + custom_labels = labels - logger = logging.getLogger(f"P{os.getpid()}") - logger.info(f"Starting {agents_number} agents.") - - active_agents, injectors = [], [] - - for _ in range(agents_number): - agent = ag.Agent(manager_address, "aes", os=agent_os, version=agent_version, fim_eps=eps, - fixed_message_size=fixed_message_size, syscollector_frequency=0, sca_frequency=0, - registration_address=registration_address, retry_enrollment=True, labels=labels) - available_modules = agent.modules.keys() - - for module in active_modules: - if module not in available_modules: - raise ValueError(f"Selected module: '{module}' doesn't exist on agent simulator!") - - for module in available_modules: - if module in active_modules: - index = list(active_modules).index(module) - agent.modules[module]['status'] = 'enabled' - if module in ['keepalive', 'receive_messages']: - continue - if modules_eps is not None and 'eps' in agent.modules[module]: - agent.modules[module]['eps'] = modules_eps[index] - else: - agent.modules[module]['eps'] = eps - else: - agent.modules[module]['status'] = 'disabled' - agent.modules[module]['eps'] = 0 + # Parse the custom labels from list format to dict + if labels is not None: + custom_labels = {} + + for item in labels: + label = item.split(':') + custom_labels[label[0]] = label[1] + + return custom_labels + + +def set_agent_modules_and_eps(agent, active_modules, modules_eps): + """Set active modules and EPS to an agent. + + Args: + agent (Agent): agent object. + active_modules (list): List of active modules. + modules_eps (list): List of EPS for each active module. + + Raises: + ValueError: If number of active_modules items is not the same than the modules_eps. + ValueError: If a module does not exist on the agent simulator. + """ + if len(active_modules) != len(modules_eps): + raise ValueError('Number of modules must be the same than modules EPS items') + + available_modules = agent.modules.keys() + + for module in active_modules: + if module not in available_modules: + raise ValueError(f"Selected module: '{module}' doesn't exist on agent simulator!") + + for module in available_modules: + if module in active_modules: + index = list(active_modules).index(module) + agent.modules[module]['status'] = 'enabled' + if module in ['keepalive', 'receive_messages']: + continue + + agent.modules[module]['eps'] = int(modules_eps[index]) + else: + agent.modules[module]['status'] = 'disabled' + agent.modules[module]['eps'] = 0 + + logger.info(agent.modules) - logger.info(agent.modules) - active_agents.append(agent) +def create_agents(args): + """Create a list of agents according to script parameters like the mode, EPS... + + Args: + args (list): List of script parameters. + + Returns: + list: List of agents to run. + """ + agents = [] + custom_labels = parse_custom_labels(args.labels) + + if args.balance_mode: + modules_eps_data = [] + + for module, eps in zip(args.modules, args.modules_eps): + modules_eps_data.append({ + 'remaining': eps, + 'module': module + }) + + distribution_list = calculate_eps_distribution(modules_eps_data, args.balance_ratio) + + logger.info(f"Agents-EPS distributon = {distribution_list}") + + for item in distribution_list: # item[0] = modules - item[1] = eps + agent = ag.Agent(manager_address=args.manager_address, os=args.os, + registration_address=args.manager_registration_address, + version=args.version, fixed_message_size=args.fixed_message_size, labels=custom_labels) + set_agent_modules_and_eps(agent, item[0].split(' '), item[1].split(' ')) + agents.append(agent) + else: + for _ in range(args.agents_number): + agent = ag.Agent(manager_address=args.manager_address, os=args.os, + registration_address=args.manager_registration_address, + version=args.version, fixed_message_size=args.fixed_message_size, labels=custom_labels) + set_agent_modules_and_eps(agent, args.modules, args.modules_eps) + agents.append(agent) + + return agents + + +def create_injectors(agents, manager_address, protocol): + """Create injectos objects from list of agents and connection parameters. + + Args: + agents (list): List of agents to create the injectors (1 injector/agent). + manager_address (str): Manager IP address to connect the agents. + protocol (str): TCP or UDP protocol to connect the agents to the manager. + + Returns: + list: List of injector objects. + """ + injectors = [] + + logger.info(f"Starting {len(agents)} agents.") + + for agent in agents: sender = ag.Sender(manager_address, protocol=protocol) injectors.append(ag.Injector(sender, agent)) - sleep(30) + return injectors + + +def start(injector, time_alive): + """Start the injector process for a specified time. + Args: + injector (Injector): Injector object. + time_alive (int): Period of time in seconds during the injector will be running. + """ try: - start(injectors) - sleep(run_duration) + injector.run() + sleep(time_alive) finally: - stop(injectors) + stop(injector) -def start(agent_injectors): - logging.info(f"Running {len(agent_injectors)} injectors...") - for injector in agent_injectors: - injector.run() +def stop(injector): + """Stop the injector process. + Args: + injector (Injector): Injector object. + """ + injector.stop_receive() -def stop(agent_injectors): - logging.info(f"Stopping {len(agent_injectors)} injectors...") - for injector in agent_injectors: - injector.stop_receive() +def run(injectors, time_alive): + """Run each injector in a separated process. -def main(): - arg_parser = argparse.ArgumentParser() + Args: + injectors (list): List of injector objects. + time_alive (int): Period of time in seconds during the injector will be running. + """ + processes = [] + + for injector in injectors: + processes.append(Process(target=start, args=(injector, time_alive))) + + for agent_process in processes: + agent_process.start() + + for agent_process in processes: + agent_process.join() + + +def calculate_eps_distribution(data, max_eps_per_agent): + """Calculate the distribution of agents and EPS according to the input ratio. + + Args: + data (list): List of dictionaries containing information about the module and the remaining EPS to be + distributed. + max_eps_per_agent (int): Maximum EPS load to be distributed to an agent. + + Returns: + list: List of tuples, containing in the first position the modules to be launched by that agent, and in the + second position the EPS distribution for each module of that agent. + + Example: + Input: + data =[ + {'remaining': 0, 'module': 'receive_messages'}, + {'remaining': 0, 'module': 'keepalive'}, + {'remaining': 100, 'module': 'fim'}, + {'remaining': 30, 'module': 'logcollector'}, + {'remaining': 80, 'module': 'syscollector'} + ], + max_eps_per_agent = 50 + Output: + [('fim', '50'), ('fim', '50'), ('logcollector syscollector', '30 20'), ('syscollector', '50'), + ('syscollector', '10')] + """ + + # If there are no more items in the queue list then stop + if len(data) == 0: + return [] + + # If there are no more EPS to distribute from the current queue, then move on to the next item. + if data[0]['remaining'] == 0: + data.pop(0) + return calculate_eps_distribution(data, max_eps_per_agent) + + # If there are enough EPS in the current queue to fill the EPS of an agent, then it is filled. + if data[0]['remaining'] >= max_eps_per_agent: + data[0]['remaining'] -= max_eps_per_agent + + agent_parameters = (data[0]['module'], str(max_eps_per_agent)) + + return [agent_parameters] + calculate_eps_distribution(data, max_eps_per_agent) + # If the agent instance supports higher eps than the remaining eps to be distributed from the current queue + else: + # Add the remaining EPS from the current queue + modules = data[0]['module'] + current_load = data[0]['remaining'] + eps = f"{current_load}" + + # Remove the queue as all its eps have been distributed. + data.pop(0) + + # As long as the maximum EPS load of the agent has not been supported. + while current_load < max_eps_per_agent: + + # Exit the loop if there are no more queue items in the list. + if len(data) == 0: + break + + # Remove the queue if there are no more EPS to distribute and check the next eps. + if data[0]['remaining'] == 0: + data.pop(0) + # If with the elements of the new queue we can fill the instance-agent eps. + elif data[0]['remaining'] > (max_eps_per_agent - current_load): + modules += f" {data[0]['module']}" + eps += f" {(max_eps_per_agent - current_load)}" + data[0]['remaining'] -= (max_eps_per_agent - current_load) + current_load = max_eps_per_agent + # Otherwise, take all EPS from the current queue and keep checking for the same agent. + else: + modules += f" {data[0]['module']}" + eps += f" {data[0]['remaining']}" + current_load += data[0]['remaining'] + data[0]['remaining'] = 0 + + agent_parameters = (modules, eps) + + return [agent_parameters] + calculate_eps_distribution(data, max_eps_per_agent) - arg_parser.add_argument('-a', '--manager', metavar='', type=str, required=False, - default='localhost', help='Manager IP address', dest='manager_addr') - arg_parser.add_argument('-e', '--eps', metavar='', type=int, required=False, default=1000, - help='Number of events per second to be generated by each module on each agent', dest='eps') +def main(): + arg_parser = argparse.ArgumentParser() - arg_parser.add_argument('-n', '--agents', metavar='', type=int, default=5, required=False, - help='Number of agents to create and run', dest='n_agents') + arg_parser.add_argument('-a', '--manager', metavar='', type=str, required=True, + default='localhost', help='Manager IP address', dest='manager_address') - arg_parser.add_argument('-b', '--batch', metavar='', dest='agent_batch', - type=int, required=False, default=2, help='Number of agents to create on each process') + arg_parser.add_argument('-n', '--agents', metavar='', type=int, default=1, required=False, + help='Number of agents to create and run', dest='agents_number') arg_parser.add_argument('-o', '--os', metavar='', dest='os', - type=str, required=False, default='debian8', help='Agent operating system',) + type=str, required=False, default='debian8', help='Agent operating system') arg_parser.add_argument('-p', '--protocol', metavar='', dest='agent_protocol', type=str, required=False, default=TCP, help='Communication protocol') @@ -111,72 +276,37 @@ def main(): required=False, default=None, help='Manager IP address where the agent will be registered', dest='manager_registration_address') - arg_parser.add_argument('-t', '--time', metavar='', dest='duration', - type=int, required=False, default=20, help='Time in seconds for monitoring') + arg_parser.add_argument('-t', '--time', metavar='', dest='simulation_time', + type=int, required=False, default=60, help='Time in seconds for the simulation') arg_parser.add_argument('-v', '--version', metavar='', dest='version', - type=str, required=False, default='4.2.0', help='Agent wazuh version', ) + type=str, required=False, default='4.2.0', help='Agent wazuh version') - arg_parser.add_argument('-m', '--modules', dest='modules', required=False, type=str, nargs='+', action='store', - default=['fim'], help='Active module separated by whitespace.') + arg_parser.add_argument('-m', '--modules', dest='modules', required=True, type=str, nargs='+', action='store', + default=[], help='Active module separated by whitespace.') arg_parser.add_argument('-l', '--labels', dest='labels', required=False, type=str, nargs='+', action='store', default=None, help='Wazuh agent labels.') - arg_parser.add_argument('-s', '--modules-eps', dest='modules_eps', required=False, type=int, nargs='+', + arg_parser.add_argument('-s', '--modules-eps', dest='modules_eps', required=True, type=int, nargs='+', action='store', default=None, help='Active module EPS separated by whitespace.') arg_parser.add_argument('-f', '--fixed-message-size', metavar='', type=int, required=False, default=None, help='Size of all the agent modules messages (KB)', dest='fixed_message_size') - args = arg_parser.parse_args() - - if args.agent_batch > 1: - logging.warning("Launching more than 1 agents per process is not advisable as Python's GIL dramatically " - "reduces the performance of the agent_simulator tool when there are multiple agents running in " - "the same process.") - - # Calculate modules EPS - if args.modules_eps is not None: - len_mod = len(args.modules) - len_eps = len(args.modules_eps) - if len_mod != len_eps: - arg_parser.error(f"Wrong number of eps introduced for selected modules:{len_eps}, expected:{len_mod}.") - - # Calculate agents per process - remainder = args.n_agents % args.agent_batch - n_processes = args.n_agents // args.agent_batch + (1 if remainder != 0 else 0) - - processes = [] - - custom_labels = args.labels - - # Parse the custom labels from list format to dict - if args.labels is not None: - custom_labels = {} + arg_parser.add_argument('-b', '--balance-mode', action='store_true', required=False, + help='Activate the balance mode. EPS will be distributed throughout all agents.') - for item in args.labels: - label = item.split(':') - custom_labels[label[0]] = label[1] - - # Create the process list - for i in range(n_processes): - agents = args.agent_batch - if remainder != 0 and i == 0: - agents = remainder + arg_parser.add_argument('-i', '--balance-ratio', metavar='', type=int, required=False, + default=1000, help='EPS/agent ratio. Can only be used if the parameter -b was specified', + dest='balance_ratio') - arguments = ( - agents, args.manager_addr, args.agent_protocol, args.version, args.os, args.eps, args.duration, - args.modules, args.modules_eps, args.fixed_message_size, args.manager_registration_address, custom_labels - ) - - processes.append(Process(target=run_agents, args=arguments)) + args = arg_parser.parse_args() - for p in processes: - p.start() + agents = create_agents(args) + injectors = create_injectors(agents, args.manager_address, args.agent_protocol) - for p in processes: - p.join() + run(injectors, args.simulation_time) if __name__ == "__main__": From a933a1f77d3ad490460e9edb45ca6cf4293d5379 Mon Sep 17 00:00:00 2001 From: jmv74211 Date: Thu, 24 Jun 2021 13:30:07 +0200 Subject: [PATCH 2/4] Add new waiting time parameter in simulate agents script. Done to prevent CPU overload when registering many agents (registration + event generation) --- .../wazuh_testing/scripts/simulate_agents.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/deps/wazuh_testing/wazuh_testing/scripts/simulate_agents.py b/deps/wazuh_testing/wazuh_testing/scripts/simulate_agents.py index 35ec0f9062..e8485d0bf5 100644 --- a/deps/wazuh_testing/wazuh_testing/scripts/simulate_agents.py +++ b/deps/wazuh_testing/wazuh_testing/scripts/simulate_agents.py @@ -301,9 +301,19 @@ def main(): default=1000, help='EPS/agent ratio. Can only be used if the parameter -b was specified', dest='balance_ratio') + arg_parser.add_argument('-w', '--waiting-connection-time', metavar='', type=int, + help='Waiting time in seconds between agent registration and the sending of events.', + required=False, default=0, dest='waiting_connection_time') + args = arg_parser.parse_args() agents = create_agents(args) + + logger.info(f"Waiting {args.waiting_connection_time} seconds before sending EPS and keep-alive events") + + # Waiting time to prevent CPU overload when registering many agents (registration + event generation). + sleep(args.waiting_connection_time) + injectors = create_injectors(agents, args.manager_address, args.agent_protocol) run(injectors, args.simulation_time) From 75095b6d87867d05a487a9ca0d324fbab80951ab Mon Sep 17 00:00:00 2001 From: jmv74211 Date: Thu, 24 Jun 2021 16:45:51 +0200 Subject: [PATCH 3/4] Fix keep-alive messages in simulate agents script --- deps/wazuh_testing/wazuh_testing/scripts/simulate_agents.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/deps/wazuh_testing/wazuh_testing/scripts/simulate_agents.py b/deps/wazuh_testing/wazuh_testing/scripts/simulate_agents.py index e8485d0bf5..6a087fab40 100644 --- a/deps/wazuh_testing/wazuh_testing/scripts/simulate_agents.py +++ b/deps/wazuh_testing/wazuh_testing/scripts/simulate_agents.py @@ -99,7 +99,8 @@ def create_agents(args): agent = ag.Agent(manager_address=args.manager_address, os=args.os, registration_address=args.manager_registration_address, version=args.version, fixed_message_size=args.fixed_message_size, labels=custom_labels) - set_agent_modules_and_eps(agent, item[0].split(' '), item[1].split(' ')) + set_agent_modules_and_eps(agent, item[0].split(' ') + ['keepalive', 'receive_messages'], + item[1].split(' ') + ['0', '0']) agents.append(agent) else: for _ in range(args.agents_number): From 07699ea898e9a8559620384b1ec79d0cc6c28892 Mon Sep 17 00:00:00 2001 From: jmv74211 Date: Wed, 30 Jun 2021 14:30:41 +0200 Subject: [PATCH 4/4] add: Set keepalive and receive as active modules by default Done in simulate-agents script --- .../wazuh_testing/scripts/simulate_agents.py | 28 +++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/deps/wazuh_testing/wazuh_testing/scripts/simulate_agents.py b/deps/wazuh_testing/wazuh_testing/scripts/simulate_agents.py index 6a087fab40..14e91d0a04 100644 --- a/deps/wazuh_testing/wazuh_testing/scripts/simulate_agents.py +++ b/deps/wazuh_testing/wazuh_testing/scripts/simulate_agents.py @@ -34,6 +34,22 @@ def parse_custom_labels(labels): return custom_labels +def process_script_parameters(args): + """Process script parameters and edit them if necessary. + + Args: + args (argparse.Namespace): Script args. + """ + # Add keepalive and receive_message modules if they are not specified in script parameters + if 'keepalive' not in args.modules: + args.modules.append('keepalive') + args.modules_eps.append('0') + + if 'receive_messages' not in args.modules: + args.modules.append('receive_messages') + args.modules_eps.append('0') + + def set_agent_modules_and_eps(agent, active_modules, modules_eps): """Set active modules and EPS to an agent. @@ -87,7 +103,7 @@ def create_agents(args): for module, eps in zip(args.modules, args.modules_eps): modules_eps_data.append({ - 'remaining': eps, + 'remaining': int(eps), 'module': module }) @@ -283,14 +299,14 @@ def main(): arg_parser.add_argument('-v', '--version', metavar='', dest='version', type=str, required=False, default='4.2.0', help='Agent wazuh version') - arg_parser.add_argument('-m', '--modules', dest='modules', required=True, type=str, nargs='+', action='store', - default=[], help='Active module separated by whitespace.') + arg_parser.add_argument('-m', '--modules', dest='modules', required=False, type=str, nargs='+', action='store', + default=['keepalive', 'receive_messages'], help='Active module separated by whitespace.') arg_parser.add_argument('-l', '--labels', dest='labels', required=False, type=str, nargs='+', action='store', default=None, help='Wazuh agent labels.') - arg_parser.add_argument('-s', '--modules-eps', dest='modules_eps', required=True, type=int, nargs='+', - action='store', default=None, help='Active module EPS separated by whitespace.') + arg_parser.add_argument('-s', '--modules-eps', dest='modules_eps', required=False, type=str, nargs='+', + action='store', default=['0', '0'], help='Active module EPS separated by whitespace.') arg_parser.add_argument('-f', '--fixed-message-size', metavar='', type=int, required=False, default=None, help='Size of all the agent modules messages (KB)', dest='fixed_message_size') @@ -308,6 +324,8 @@ def main(): args = arg_parser.parse_args() + process_script_parameters(args) + agents = create_agents(args) logger.info(f"Waiting {args.waiting_connection_time} seconds before sending EPS and keep-alive events")