Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add alert.json to VD E2E test report #5147

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ All notable changes to this project will be documented in this file.

### Added

- Add alert.json file to Vulnerability Detector E2E test report ([#5147](https:/wazuh/wazuh-qa/pull/5147)) \- (Framework)
- Add documentation about markers for system tests ([#5080](https:/wazuh/wazuh-qa/pull/5080)) \- (Documentation)
- Add AWS Custom Buckets Integration tests ([#4675](https:/wazuh/wazuh-qa/pull/4675)) \- (Framework + Tests)
- Add Vulnerability Detector end to end tests ([#4878](https:/wazuh/wazuh-qa/pull/4878)) \- (Framework + Tests)
Expand Down
19 changes: 19 additions & 0 deletions deps/wazuh_testing/wazuh_testing/end_to_end/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,29 @@ def get_hosts_logs(host_manager: HostManager, host_group: str = 'all') -> Dict[s
- host_manager (HostManager): An instance of the HostManager class for managing remote hosts.
- host_group (str, optional): The name of the host group where the files will be truncated.
Default is 'all'.

Returns:
- host_logs (Dict[str, str]): Dictionary containing the logs from the ossec.log file of each host
"""
host_logs = {}
for host in host_manager.get_group_hosts(host_group):
host_os_name = host_manager.get_host_variables(host)['os_name']
host_logs[host] = host_manager.get_file_content(host, logs_filepath_os[host_os_name])

return host_logs

def get_hosts_alerts(host_manager: HostManager) -> Dict[str, str]:
"""
Get the alerts in the alert.json file from the specified host group.

Parameters:
- host_manager (HostManager): An instance of the HostManager class for managing remote hosts.

Returns:
- host_alerts (Dict[str, str]): Dictionary containing the alerts from the alert.json file of each manager
"""
host_alerts = {}
for host in host_manager.get_group_hosts("manager"):
host_alerts[host] = host_manager.get_file_content(host, ALERTS_JSON_PATH)

return host_alerts
18 changes: 13 additions & 5 deletions tests/end_to_end/test_vulnerability_detector/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def test_example(host_manager):

from wazuh_testing.tools.system import HostManager
from wazuh_testing.end_to_end.remote_operations_handler import launch_parallel_operations
from wazuh_testing.end_to_end.logs import get_hosts_logs
from wazuh_testing.end_to_end.logs import get_hosts_logs, get_hosts_alerts


STYLE_PATH = os.path.join(os.path.dirname(__file__), '../../../deps/wazuh_testing/wazuh_testing/reporting/style.css')
Expand All @@ -61,17 +61,25 @@ def collect_e2e_environment_data(test_name, host_manager) -> None:
"""
logging.info("Collecting environment data")
environment_logs = get_hosts_logs(host_manager)
environment_alerts = get_hosts_alerts(host_manager)

current_dir = os.path.dirname(__file__)
vulnerability_detector_logs_dir = os.path.join(current_dir, "logs")
tests_evidences_directory = os.path.join(str(vulnerability_detector_logs_dir), str(test_name))

for host in environment_logs.keys():
logging.info(f"Collecting logs for {host}")
host_logs_name_evidence = host + "_ossec.log"
evidence_file = os.path.join(tests_evidences_directory, host_logs_name_evidence)
with open(evidence_file, 'w') as evidence_file:
evidence_file.write(environment_logs[host])
host_logs_name_evidence = host + "_ossec.log"
evidence_log_file = os.path.join(tests_evidences_directory, host_logs_name_evidence)
with open(evidence_log_file, 'w') as evidence_log_file:
evidence_log_file.write(environment_logs[host])

for host in environment_alerts.keys():
logging.info(f"Collecting alerts for {host}")
host_alerts_name_evidence = host + "_alert.json"
evidence_alert_file = os.path.join(tests_evidences_directory, host_alerts_name_evidence)
with open(evidence_alert_file, 'w') as evidence_alert_file:
evidence_alert_file.write(environment_alerts[host])


def collect_evidences(test_name, evidences) -> None:
Expand Down
Loading