Skip to content

Commit

Permalink
feature(#60): adding check_previous parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
pro-akim committed Jul 24, 2023
1 parent 8d4a75a commit ecfe5fc
Showing 1 changed file with 85 additions and 72 deletions.
157 changes: 85 additions & 72 deletions src/wazuh_qa_framework/system/wazuh_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ def restart_manager(self, host):
"""
self.logger.debug(f'Restarting manager {host}')
if self.is_manager(host):
self.control_service(host, 'wazuh-manager', 'restarted', become=True)
self.control_service(host, 'wazuh-manager', 'restarted')
self.logger.debug(f'Manager {host} restarted successfully')
else:
ValueError(f'Host {host} is not a manager')
Expand Down Expand Up @@ -611,7 +611,7 @@ def stop_manager(self, host):
"""
self.logger.debug(f'Stopping manager {host}')
if self.is_manager(host):
self.control_service(host, 'wazuh-manager', 'stopped', become=True)
self.control_service(host, 'wazuh-manager', 'stopped')
self.logger.debug(f'Manager {host} stopped successfully')
else:
raise ValueError(f'Host {host} is not a manager')
Expand Down Expand Up @@ -668,7 +668,7 @@ def start_manager(self, host):
"""
self.logger.debug(f'Starting manager {host}')
if self.is_manager(host):
self.control_service(host, 'wazuh-manager', 'started', become=True)
self.control_service(host, 'wazuh-manager', 'started')
self.logger.debug(f'Manager {host} started successfully')
else:
raise ValueError(f'Host {host} is not a manager')
Expand Down Expand Up @@ -850,117 +850,130 @@ def is_manager(self, host):
"""
return host in self.get_managers()

def create_group(self, manager, group_name, method='cmd'):
def check_group(self, manager, group_name):
"""Function to check the existance of a group.
Args:
manager (str): Name of the manager.
group_name (str): Name of the group.
"""
return True if group_name in self.run_command(manager, f"{get_bin_directory_path()}/agent_groups -l")[1] else False

def check_agent_group(self, manager, id_agent, group_name):
"""Function to check the existence of an agent in a group.
Args:
manager (str): Name of the manager.
id_agent (str): ID of agent.
group_name (str): Name of the group.
"""
# Check the expected group is in the group data for the agent
self.logger.info(f'Checking agent {id_agent} in group {group_name}')
return group_name in self.run_command(manager, f'{get_bin_directory_path()}/agent_groups -s -i {id_agent}')[1]


def create_group(self, manager, group_name, method='cmd', check_previous=True):
"""Function to create a group.
Args:
manager (str): Name of the manager.
group_name (str): Name of the group.
method (str): Method to be used to create the group. Defaults to cmd.
check_previous (bool): Check if the group exists.
"""
if method == 'cmd':
self.logger.info(f'Creating group {group_name} from {manager} using CMD')
self.run_command(manager, f"{get_bin_directory_path()}/agent_groups -q -a -g {group_name}")

if method == 'api':
self.logger.info(f'Creating new {group_name} using API')
endpoint = f'/groups'
request = WazuhAPIRequest(endpoint=endpoint, payload={"group_id": group_name}, method='POST')
request.send(WazuhAPI(address=self.get_host_variables(manager)['ip']))
if check_previous and self.check_group(manager, group_name):
self.logger.info(f'{group_name} already exists')
else:
self.logger.info(f'Creating group {group_name} from {manager} using {method.upper()}')
if method == 'cmd':
self.run_command(manager, f"{get_bin_directory_path()}/agent_groups -q -a -g {group_name}")
elif method == 'api':
endpoint = '/groups'
request = WazuhAPIRequest(endpoint=endpoint, payload={"group_id": group_name}, method='POST')
request.send(WazuhAPI(address=self.get_host_variables(manager)['ip']))

def delete_group(self, manager, group_name, method='cmd'):
def delete_group(self, manager, group_name, method='cmd', check_previous=True):
"""Function to delete a group.
Args:
manager (str): Name of the manager.
group_name (str): Name of the group.
method (str): Method to be used to delete the group. Defaults to cmd.
"""
if method == 'cmd':
self.logger.info(f'Removing group {group_name} from {manager} using CMD')
self.run_command(manager, f"{get_bin_directory_path()}/agent_groups -q -r -g {group_name}")

if method == 'api':
self.logger.info(f'Removing group {group_name} using API')
endpoint = f'/groups?groups_list={group_name}'
request = WazuhAPIRequest(endpoint=endpoint, method='DELETE')
request.send(WazuhAPI(address=self.get_host_variables(manager)['ip']))
check_previous (str): Check if the group exists.
"""
if check_previous and not self.check_group(manager, group_name):
self.logger.info(f'{group_name} does not exists')
else:
if method == 'cmd':
self.logger.info(f'Removing group {group_name} from {manager} using CMD')
self.run_command(manager, f"{get_bin_directory_path()}/agent_groups -q -r -g {group_name}")

if method == 'api':
self.logger.info(f'Removing group {group_name} using API')
endpoint = f'/groups?groups_list={group_name}'
request = WazuhAPIRequest(endpoint=endpoint, method='DELETE')
request.send(WazuhAPI(address=self.get_host_variables(manager)['ip']))

if method == 'folder':
self.logger.info(f'Removing group {group_name} deleting folder')
self.run_command(manager, f"rm -rf {get_shared_directory_path()}/{group_name}")
if method == 'folder':
self.logger.info(f'Removing group {group_name} deleting folder')
self.run_command(manager, f"rm -rf {get_shared_directory_path()}/{group_name}")

def assign_agent_group(self, manager, id_agent, group_name, method='cmd'):
def assign_agent_group(self, manager, id_agent, group_name, method='cmd', check_previous=True):
"""Function to assign an agent to a group.
Args:
manager (str): Name of the manager.
id_agent (str): ID of the agent.
group_name (str): Name of the group.
method (str): Method to be used to delete the group. Defaults to cmd.
check_previous (str): Check if the agent is already assigned to the group.
"""
if method == 'cmd':
self.logger.info(f'Assign agent {id_agent} to group {group_name} from {manager} using CMD')
self.run_command(manager, f"{get_bin_directory_path()}/agent_groups -q -a -i {id_agent} -g {group_name}")

if method == 'api':
self.logger.info(f'Assign agent {id_agent} to group {group_name} using API')
endpoint = f'/agents/{id_agent}/group/{group_name}'
request = WazuhAPIRequest(endpoint=endpoint, method='PUT')
request.send(WazuhAPI(address=self.get_host_variables(manager)['ip']))
if check_previous and self.check_agent_group(manager, id_agent, group_name):
self.logger.info(f'{id_agent} is assigned to {group_name}')
else:
if method == 'cmd':
self.logger.info(f'Assign agent {id_agent} to group {group_name} from {manager} using CMD')
self.run_command(manager,
f"{get_bin_directory_path()}/agent_groups -q -a -i {id_agent} -g {group_name}")

if method == 'api':
self.logger.info(f'Assign agent {id_agent} to group {group_name} using API')
endpoint = f'/agents/{id_agent}/group/{group_name}'
request = WazuhAPIRequest(endpoint=endpoint, method='PUT')
request.send(WazuhAPI(address=self.get_host_variables(manager)['ip']))

def assign_agents_group(self, manager, list_id_agent, group_name, method='cmd'):
def assign_agents_group(self, manager, list_id_agent, group_name, method='cmd', check_previous=True):
"""Function to assign list of agents to a group.
Args:
manager (str): Name of the manager.
list_id_agent (list): List of the agents ID.
group_name (str): Name of the group.
method (str): Method to be used to delete the group. Defaults to cmd.
check_previous (str): Check if the agents are already assigned to the group.
"""
if method == 'cmd':
for agent in list_id_agent:
self.logger.info(f'Assigning agent {agent} from group {group_name} from {manager} using CMD')
self.assign_agent_group(manager, agent, group_name)

if method == 'api':
self.logger.info(f'Assigning agent {list_id_agent} from group {group_name} using API in parallel')
for agent in list_id_agent:
endpoint = f'/agents/{agent}/group/{group_name}'
request = WazuhAPIRequest(endpoint=endpoint, method='PUT')
request.send(WazuhAPI(address=self.get_host_variables(manager)['ip']))
for agent in list_id_agent:
self.logger.info(f'Assigning agent {agent} from group {group_name} from {manager} using CMD')
self.assign_agent_group(manager, agent, group_name, method=method, check_previous=check_previous)

def unassign_agent_group(self, manager, id_agent, group_name):
def unassign_agent_group(self, manager, id_agent, group_name, check_previous=True):
"""Function to unassign an agent to a group.
Args:
manager (str): Name of the manager.
id_agent (str): ID of the agent.
group_name (str): Name of the group.
check_previous (str): Check if the agent is already assigned to the group.
"""
self.logger.info(f'Removing agent {id_agent} from group {group_name} using API')
endpoint = f'/agents/{id_agent}/group/{group_name}'
request = WazuhAPIRequest(endpoint=endpoint, method='DELETE')
request.send(WazuhAPI(address=self.get_host_variables(manager)['ip']))
if check_previous and not self.check_agent_group(manager, id_agent, group_name):
self.logger.info(f'{id_agent} is not assigned to {group_name}')
else:
endpoint = f'/agents/{id_agent}/group/{group_name}'
request = WazuhAPIRequest(endpoint=endpoint, method='DELETE')
request.send(WazuhAPI(address=self.get_host_variables(manager)['ip']))

def unassign_agents_group(self, manager, list_id_agent, group_name):
def unassign_agents_group(self, manager, list_id_agent, group_name, check_previous=True):
"""Function to unassign list of agents to a group.
Args:
manager (str): Name of the manager.
list_id_agent (list): List of the agents ID.
group_name (str): Name of the group.
check_previous (str): Check if the agents are already assigned to the group.
"""
self.logger.info(f'Removing agents {list_id_agent} from group {group_name} using API in parallel')
for agent in list_id_agent:
endpoint = f'/agents/{agent}/group/{group_name}'
request = WazuhAPIRequest(endpoint=endpoint, method='DELETE')
request.send(WazuhAPI(address=self.get_host_variables(manager)['ip']))

def check_agent_groups(self, agent_id, group_name, hosts_list):
"""Function to check the existence of an agent in a group.
Args:
agent_id (str): ID of agent.
group_name (str): Name of the group.
hosts_list (str): List of hosts from which the check will be performed.
"""
# Check the expected group is in the group data for the agent
for host in hosts_list:
group_data = self.run_command(host, f'{get_bin_directory_path()}/agent_groups -s -i {agent_id}')
self.logger.info(f'Checking agent {agent_id} in group {group_name}')
assert group_name in group_data[1], f"Did not recieve expected agent group: {group_name} in data \
{str(group_data)} in host {host}"
self.unassign_agent_group(manager, agent, group_name, check_previous=check_previous)

0 comments on commit ecfe5fc

Please sign in to comment.