-
Notifications
You must be signed in to change notification settings - Fork 32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
QA Docs - Schema 2.0 #1694
Comments
Research progressModule block
Test block
How companies manage their test docCommaai tests not documented, so i could not use them for the research CucumberCucumber looks like a flexible and strong tool where you can use ubiquitous language so you would have a readable and understandable step file like this example. If we would like to use this tool we would had to:
About the living documentation from Cucumber, It can be integrated with repos and Jira. I did not that much research because I think we aim to have a better documentation (search)tool. BehaveBehave is a great BDD test framework, that gives you the possibility to work as u would do with Cucumber but using python. It has many pros and cons, but I think the cost would be so high to migrate from pytest framework. pytest-bddThis bdd framework is a pytest plugin which allows you to use Gherkin syntax and the same methodology than frames mentioned above, you would work with the same file types: features, steps An example tree could be:
Relevant features in my opinion:
|
Research progressModule block & Test block new fields
How companies manage their test docDatadog: Although there are several repositories containing tests, there aren't new remarkable fields that can be included in the documentation blocks listed above. Snort: Taking Snorts as an example of documentation pattern brings the possibility of including the field "Author". However, as the test that already exists were made a long time ago, there is a posibility that some of the tests creators are not avaliable anymore. CucumberAfter searching and reading documentation about Cucumber and making a simple test example, there are multiple conclussions.
|
Pytest-bdd demoUsing pytest-bdd you have features and steps, in this case, the python steps use an expresion that links it to one Gherkin step(given, when or then).
Using As a simple test, the directory tree has not been changed. About how to run it:
|
We are going to go thru few iterations where we receive feedback and refine those iterations proposal Branch: 1694-proposal1 First iteration
Doc block examples`test_vulnerability_detector/test_scan_results/test_scan_nvd_feed.py`'''
brief: These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities and
vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed.
copyright:
Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the terms of GPLv2
modules:
- vulnerability detector
daemons:
- wazuh-modulesd
os_platform:
- linux
os_vendor:
- redhat
- debian
- ubuntu
- alas
- centos
- arch-linux
os_version:
- rhel5
- rhel6
- rhel7
- rhel8
- buster
- stretch
- jessie
- wheezy
- bionic
- xenial
- trusty
- amazon-linux-1
- amazon-linux-2
tiers:
- 0
tags:
- NVD
component:
- manager
'''
import os
from datetime import timedelta
from shutil import copy
import pytest
import wazuh_testing.vulnerability_detector as vd
from wazuh_testing.fim import check_time_travel
from wazuh_testing.tools import LOG_FILE_PATH
from wazuh_testing.tools import file
from wazuh_testing.tools.configuration import load_wazuh_configurations
from wazuh_testing.tools.monitoring import FileMonitor
# Marks
pytestmark = pytest.mark.tier(level=0)
# Variables
current_test_path = os.path.dirname(os.path.realpath(__file__))
test_data_path = os.path.join(current_test_path, 'data')
configurations_path = os.path.join(test_data_path, 'wazuh_nvd_configuration.yaml')
vulnerabilities_data_path = os.path.join(test_data_path, vd.VULNERABILITIES)
custom_cpe_helper_data_path = os.path.join(test_data_path, vd.CUSTOM_CPE_HELPER)
custom_msu_data_path = os.path.join(test_data_path, vd.CUSTOM_MSU)
wazuh_log_monitor = FileMonitor(LOG_FILE_PATH)
SCAN_TIMEOUT = 40
copy(custom_cpe_helper_data_path, vd.CPE_HELPER_PATH)
# Set configuration
parameters = [{'NVD_JSON_PATH': os.path.join(test_data_path, vd.CUSTOM_NVD_FEED),
'MSU_JSON_PATH': custom_msu_data_path}]
ids = ['scan_nvd_configuration']
# Read JSON data template
nvd_vulnerabilities = file.read_json_file(vulnerabilities_data_path)
system_data = [
{"target": "WINDOWS10", "os_name": "Microsoft Windows Server 2016 Datacenter Evaluation",
"os_major": "10", "os_minor": "0", "name": "windows", "format": "win",
"vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "windows", "version": "Wazuh v4.1"},
{"target": "RHEL8", "os_name": "CentOS Linux", "os_major": "8", "os_minor": "1", "name": "centos8",
"format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "centos", "version": "Wazuh v4.1"},
{"target": "RHEL7", "os_name": "CentOS Linux", "os_major": "7", "os_minor": "8", "name": "centos7",
"format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "centos", "version": "Wazuh v4.1"},
{"target": "RHEL6", "os_name": "CentOS Linux", "os_major": "6", "os_minor": "10", "name": "centos6",
"format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "centos", "version": "Wazuh v4.1"},
{"target": "RHEL5", "os_name": "CentOS Linux", "os_major": "5", "os_minor": "11", "name": "centos5",
"format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "centos", "version": "Wazuh v4.1"},
{"target": "BIONIC", "os_name": "Ubuntu", "os_major": "18", "os_minor": "04", "name": "Ubuntu-bionic",
"format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "ubuntu", "version": "Wazuh v4.1"},
{"target": "XENIAL", "os_name": "Ubuntu", "os_major": "16", "os_minor": "04", "name": "Ubuntu-xenial",
"format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "ubuntu", "version": "Wazuh v4.1"},
{"target": "TRUSTY", "os_name": "Ubuntu", "os_major": "14", "os_minor": "04", "name": "Ubuntu-trusty",
"format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "ubuntu", "version": "Wazuh v4.1"},
{"target": "BUSTER", "os_name": "Debian GNU/Linux", "os_major": "10", "os_minor": "", "name": "debian10",
"format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "debian", "version": "Wazuh v4.1"},
{"target": "STRETCH", "os_name": "Debian GNU/Linux", "os_major": "9", "os_minor": "", "name": "debian9",
"format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "debian", "version": "Wazuh v4.1"},
{"target": "MAC", "os_name": "Mac OS X", "os_major": "10", "os_minor": "15", "name": "macos-catalina",
"format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "darwin", "version": "Wazuh v4.1"},
{"target": "MAC", "os_name": "Mac OS X", "os_major": "10", "os_minor": "15", "name": "macos-catalina",
"format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "darwin", "version": "Wazuh v4.0"},
{"target": "MAC", "os_name": "Mac OS X Server", "os_major": "5", "os_minor": "10", "name": "macos-server",
"format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "darwin", "version": "Wazuh v4.1"}
]
system_data_ids = [system['target'] for system in system_data]
# Configuration data
configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters)
# Fixtures
@pytest.fixture(scope='module', params=configurations, ids=ids)
def get_configuration(request):
"""
description:
Get configurations from the module.
parameters:
- request
"""
return request.param
@pytest.fixture(scope='module', params=system_data, ids=system_data_ids)
@vd.mock_cve_db
def mock_vulnerability_scan(request, mock_agent):
"""
description:
Mocks the vulnerability scan inserting custom hotfixes, feeds and changing the host system
parameters:
- request:
type: dictionary
brief: containing the data to mock the system and the agent
- mock_agent:
type: callable
brief: fixture used to mock the agent
"""
# Mock system
vd.modify_system(agent_id=mock_agent, os_name=request.param['os_name'], os_major=request.param['os_major'],
os_minor=request.param['os_minor'], name=vd.MOCKED_AGENT_NAME,
os_platform=request.param['os_platform'], version=request.param['version'])
# Insert a vulnerability in table VULNERABILITIES
vd.insert_vulnerability(cveid='CWE-000', operation='less than', operation_value='1.0.0',
package='test', target=request.param['target'])
# Add custom vulnerabilities and feeds
for vulnerability in nvd_vulnerabilities['vulnerabilities_nvd']:
vd.insert_package(**vulnerability['package'], source=vulnerability['package']['name'],
format=request.param['format'], agent=mock_agent)
def test_vulnerabilities_report(get_configuration, configure_environment, restart_modulesd, check_cve_db,
mock_vulnerability_scan):
"""
description:
Check if inserted vulnerable packages are reported by vulnerability detector
parameters:
- configure_environment:
type: fixture
brief: Configure a custom environment for testing.
- get_configuration:
type: fixture
brief: Get configurations from the module.
- mock_vulnerability_scan:
type: callable
brief: Decorator used in any function that needs to mock cve.db
- restart_modulesd:
type: callable
brief: Restart modulesd daemon
- check_cve_db:
type: fixture
brief: Check if the CVE database exists and its tables are created
wazuh_min_version:
3.13
behaviour:
- Clean NVD, `vulnerabilities`, and `sys_programs tables`.
- Mock the system.
- If the mocked system is *Windows*:
-`vd.insert_osinfo()`.
- `vd.insert_hotfix()`.
- Insert a dummy vulnerability in the `vulnerabilities` table and mock an imported feed from a provider.
- Insert **simulated** NVD [vulnerable packages](../../test_scan_results/data/vulnerabilities.json)
- Import **simulated** NVD vulnerabilities from [custom NVD feed](../../test_scan_results/data/custom_nvd_feed.json).
- Check these events in `ossec.log`:
- If *Windows*: Agent '000' is vulnerable to '{cve}'. Condition: '{patch} patch is not installed.'
- If *Red Hat*, then the alert is not checked, because it is disabled
- For other systems: The '{package}' package .* from agent .* is vulnerable to '{cve}'
- Finally, for all the systems it is checked: The NVD found a total of
'{vulnerabilities_number}' potential vulnerabilities for agent .*
expected_behaviour:
- "Agent .* has an unsupported Wazuh version: '{version}'"
- r"The {feed_source} found a total of '{expected_vulnerabilities_number}' potential vulnerabilities "
"for agent .*"
- r"The '{package}' package .* from agent .* is vulnerable to '{cve}'"
- "The expected event 'Agent .* has an unsupported Wazuh version' not found"
- "The expected number of {feed_source} vulnerabilities have not been found"
- "Could not find the report which says that the package {package} is vulnerable with {cve}"
"""
vulnerabilities_number = mock_vulnerability_scan["vulnerabilities_number"]
if mock_vulnerability_scan['format'] == 'pkg' and mock_vulnerability_scan['version'] == 'Wazuh v4.0':
version = mock_vulnerability_scan['version']
wazuh_log_monitor.start(
timeout=SCAN_TIMEOUT,
update_position=False,
callback=vd.make_vuln_callback(fr"Agent .* has an unsupported Wazuh version: '{version}'"),
error_message="The expected event 'Agent .* has an unsupported Wazuh version' not found"
)
return
# Check the vulnerabilities of inserted packages
try:
for item in nvd_vulnerabilities['vulnerabilities_nvd']:
vd.check_vulnerability_scan_event(wazuh_log_monitor, item['package']['name'], item['cve']['cveid'])
except TimeoutError:
check_time_travel(time_travel=True, interval=timedelta(seconds=300))
for item in nvd_vulnerabilities['vulnerabilities_nvd']:
vd.check_vulnerability_scan_event(wazuh_log_monitor, item['package']['name'], item['cve']['cveid'])
# Check that the number of NVD vulnerabilities is the expected
if mock_vulnerability_scan["format"] != "win":
vd.check_detected_vulnerabilities_number(wazuh_log_monitor=wazuh_log_monitor,
expected_vulnerabilities_number=vulnerabilities_number,
feed_source='NVD', timeout=vd.VULN_DETECTOR_SCAN_TIMEOUT)
vd.check_if_modulesd_is_running()
JSON: test_scan_nvd_feed.json{
"brief": "These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed.",
"modules": [
"Vulnerability detector"
],
"daemons": [
"wazuh-modulesd"
],
"platform": [
"RedHat",
"Canonical",
"Debian"
],
"version": [
"RHEL5",
"RHEL6",
"RHEL7",
"RHEL8",
"BUSTER",
"STRETCH",
"JESSIE",
"WHEEZY",
"BIONIC",
"XENIAL",
"TRUSTY"
],
"tiers": [
0
],
"tags": [
"NVD"
],
"completeness": 100,
"component": [
"manager"
],
"name": "test_scan_nvd_feed.py",
"id": 16,
"group_id": 2,
"tests": [
{
"description": "Check if inserted vulnerable packages are reported by vulnerability detector",
"parameters": [
{
"configure_environment": {
"type": "fixture",
"brief": "Configure a custom environment for testing."
}
},
{
"get_configuration": {
"type": "fixture",
"brief": "Get configurations from the module."
}
},
{
"mock_vulnerability_scan": {
"type": "callable",
"brief": "Decorator used in any function that needs to mock cve.db"
}
},
{
"restart_modulesd": {
"type": "callable",
"brief": "Restart modulesd daemon"
}
},
{
"check_cve_db": {
"type": "fixture",
"brief": "Check if the CVE database exists and its tables are created"
}
}
],
"assertions": [
"A report of the corresponding vulnerabilities is generated in the `ossec.log`."
],
"wazuh_min_version": "4.1.0",
"status": "working",
"behaviour": [
"Clean NVD, `vulnerabilities`, and `sys_programs tables`.",
"Mock the system.",
"r\"If the mocked system is *Windows*:\" \"-`vd.insert_osinfo()`.\" \"- `vd.insert_hotfix()`.\"",
"Insert a dummy vulnerability in the `vulnerabilities` table and mock an imported feed from a provider.",
"Insert **simulated** NVD [vulnerable packages](../../test_scan_results/data/vulnerabilities.json)",
"Import **simulated** NVD vulnerabilities from [custom NVD feed](../../test_scan_results/data/custom_nvd_feed.json).",
"r\"Check these events in `ossec.log`:\" \"- If *Windows*, Agent '000' is vulnerable to '{cve}'. Condition, '{patch} patch is not installed.'\" \"- If *Red Hat*, then the alert is not checked, because it is disabled\"",
"r\"For other systems, The '{package}' package .* from agent .* is vulnerable to '{cve}'\"",
"r\"Finally, for all the systems it is checked, The NVD found a total of\" \"'{vulnerabilities_number}' potential vulnerabilities for agent .*\""
],
"logging": [
{
"callback": [
"r\"Agent .* has an unsupported Wazuh version, '{version}'\"",
"r\"The {feed-source} found a total of '{expected-vulnerabilities-number}' potential vulnerabilities \" \"for agent .*\"",
"r\"The '{package}' package .* from agent .* is vulnerable to '{cve}'\""
]
},
{
"error_message": [
"r\"The expected event 'Agent .* has an unsupported Wazuh version' not found\"",
"The expected number of {feed-source} vulnerabilities have not been found",
"Could not find the report which says that the package {package} is vulnerable with {cve}"
]
}
],
"name": "test_vulnerabilities_report",
"test_cases": [
"scan_nvd_configuration-RHEL8",
"scan_nvd_configuration-RHEL7",
"scan_nvd_configuration-RHEL6",
"scan_nvd_configuration-RHEL5",
"scan_nvd_configuration-BIONIC",
"scan_nvd_configuration-XENIAL",
"scan_nvd_configuration-TRUSTY",
"scan_nvd_configuration-BUSTER",
"scan_nvd_configuration-STRETCH"
]
}
]
} YAML: test_scan_nvd_feed.yamlbrief: These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert
custom vulnerabilities and vulnerable packages to check if Vulnerability Detector
generates the vulnerability alerts from NVD feed.
completeness: 100
component:
- manager
daemons:
- wazuh-modulesd
group_id: 2
id: 16
modules:
- Vulnerability detector
name: test_scan_nvd_feed.py
platform:
- RedHat
- Canonical
- Debian
tags:
- NVD
tests:
- assertions:
- A report of the corresponding vulnerabilities is generated in the `ossec.log`.
behaviour:
- Clean NVD, `vulnerabilities`, and `sys_programs tables`.
- Mock the system.
- r"If the mocked system is *Windows*:" "-`vd.insert_osinfo()`." "- `vd.insert_hotfix()`."
- Insert a dummy vulnerability in the `vulnerabilities` table and mock an imported
feed from a provider.
- Insert **simulated** NVD [vulnerable packages](../../test_scan_results/data/vulnerabilities.json)
- Import **simulated** NVD vulnerabilities from [custom NVD feed](../../test_scan_results/data/custom_nvd_feed.json).
- r"Check these events in `ossec.log`:" "- If *Windows*, Agent '000' is vulnerable
to '{cve}'. Condition, '{patch} patch is not installed.'" "- If *Red Hat*, then
the alert is not checked, because it is disabled"
- r"For other systems, The '{package}' package .* from agent .* is vulnerable to
'{cve}'"
- r"Finally, for all the systems it is checked, The NVD found a total of" "'{vulnerabilities_number}'
potential vulnerabilities for agent .*"
description: Check if inserted vulnerable packages are reported by vulnerability
detector
logging:
- callback:
- r"Agent .* has an unsupported Wazuh version, '{version}'"
- r"The {feed-source} found a total of '{expected-vulnerabilities-number}' potential
vulnerabilities " "for agent .*"
- r"The '{package}' package .* from agent .* is vulnerable to '{cve}'"
- error_message:
- r"The expected event 'Agent .* has an unsupported Wazuh version' not found"
- The expected number of {feed-source} vulnerabilities have not been found
- Could not find the report which says that the package {package} is vulnerable
with {cve}
name: test_vulnerabilities_report
parameters:
- configure_environment:
brief: Configure a custom environment for testing.
type: fixture
- get_configuration:
brief: Get configurations from the module.
type: fixture
- mock_vulnerability_scan:
brief: Decorator used in any function that needs to mock cve.db
type: callable
- restart_modulesd:
brief: Restart modulesd daemon
type: callable
- check_cve_db:
brief: Check if the CVE database exists and its tables are created
type: fixture
status: working
test_cases:
- scan_nvd_configuration-RHEL8
- scan_nvd_configuration-RHEL7
- scan_nvd_configuration-RHEL6
- scan_nvd_configuration-RHEL5
- scan_nvd_configuration-BIONIC
- scan_nvd_configuration-XENIAL
- scan_nvd_configuration-TRUSTY
- scan_nvd_configuration-BUSTER
- scan_nvd_configuration-STRETCH
wazuh_min_version: 4.1.0
tiers:
- 0
version:
- RHEL5
- RHEL6
- RHEL7
- RHEL8
- BUSTER
- STRETCH
- JESSIE
- WHEEZY
- BIONIC
- XENIAL
- TRUSTY `test_vulnerability_detector/test_scan_results/test_redhat_inventory_redhat_feed.py`'''
brief: These tests will mock RedHat systems and insert custom vulnerabilities and vulnerable packages to check
if Vulnerability Detector generates the alerts from the RedHat provider feed.
copyright:
Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the terms of GPLv2
modules:
- vulnerability detector
daemons:
- wazuh-modulesd
os_platform:
- linux
os_vendor:
- redhat
- debian
- ubuntu
- arch-linux
os_version:
- rhel5
- rhel6
- rhel7
- rhel8
- buster
- stretch
- wheezy
- bionic
- xenial
- trusty
- amazon-linux-1
- amazon-linux-2
- arch-linux
tiers:
- 0
tags:
- NVD
component:
- manager
'''
import os
import pytest
from wazuh_testing.tools import LOG_FILE_PATH
from wazuh_testing.tools.configuration import load_wazuh_configurations
from wazuh_testing.tools.monitoring import FileMonitor
from wazuh_testing.tools import file
from wazuh_testing import vulnerability_detector as vd
# Marks
pytestmark = pytest.mark.tier(level=0)
# Variables
current_test_path = os.path.dirname(os.path.realpath(__file__))
test_data_path = os.path.join(current_test_path, 'data')
configurations_path = os.path.join(test_data_path, 'wazuh_redhat_inventory.yaml')
redhat_vulnerabilities_data_path = os.path.join(test_data_path, 'redhat_vulnerabilities.json')
wazuh_log_monitor = FileMonitor(LOG_FILE_PATH)
SCAN_TIMEOUT = 40
# Set configuration
parameters = [{'NVD_JSON_PATH': os.path.join(test_data_path, vd.REAL_NVD_FEED)}]
ids = ['redhat_scan_configuration']
# Read JSON data template
redhat_vulnerabilities = file.read_json_file(redhat_vulnerabilities_data_path)
redhat_data_ids = [system['target'] for system in redhat_vulnerabilities]
# Configuration data
configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters)
# Fixtures
@pytest.fixture(scope='module', params=configurations, ids=ids)
def get_configuration(request):
"""
description:
Get configurations from the module.
parameters:
- request
"""
return request.param
@pytest.fixture(scope='module', params=redhat_vulnerabilities, ids=redhat_data_ids)
@vd.mock_cve_db
def mock_vulnerability_scan(request, mock_agent):
"""
description:
Mocks the vulnerability scan inserting custom hotfixes, feeds and changing the host system
parameters:
- request:
type: dictionary
brief: containing the data to mock the system and the agent
- mock_agent:
type: callable
brief: fixture used to mock the agent
"""
# Mock system
vd.modify_system(agent_id=mock_agent, os_name=request.param['os_name'], os_major=request.param['os_major'],
os_minor=request.param['os_minor'], name=vd.MOCKED_AGENT_NAME)
# Add custom vulnerabilities and feeds
for vulnerability in request.param['vulnerabilities']:
vd.insert_package(**vulnerability['package'], agent=mock_agent, source=vulnerability['package']['name'])
vd.insert_vulnerability(**vulnerability['cve'], package=vulnerability['package']['name'],
target=request.param['target'])
def test_redhat_vulnerabilities_report(get_configuration, configure_environment, restart_modulesd, check_cve_db,
mock_vulnerability_scan):
"""
description:
Check if inserted vulnerable packages are reported by vulnerability detector
parameters:
- configure_environment:
type: fixture
brief: Configure a custom environment for testing.
- get_configuration:
type: fixture
brief: Get configurations from the module.
- restart_modulesd:
type: callable
brief: Restart modulesd daemon
- mock_vulnerability_scan:
type: dict
brief: vulnerability scan mock
- check_cve_db:
type: fixture
brief: Check if the CVE database exists and its tables are created
wazuh_min_version:
3.13
behaviour:
- Clean NVD, `vulnerabilities`, and `sys_programs tables`.
- Mock the system.
- Insert [Red Hat custom feed and vulnerable packages](../../test_scan_results/data/redhat_vulnerabilities.json).
- Import NVD vulnerabilities from [custom NVD feed](../../test_scan_results/data/real_nvd_feed.json).
- Check this event in `ossec.log`: The '{package}' package .* from agent .* is vulnerable to '{cve}'
expected_behaviour:
- r"The {feed_source} found a total of '{expected_vulnerabilities_number}' potential vulnerabilities "
"for agent .*"
- r"The '{package}' package .* from agent .* is vulnerable to '{cve}'"
- "The expected number of {feed_source} vulnerabilities have not been found"
- "Could not find the report which says that the package {package} is vulnerable with {cve}"
"""
vulnerabilities_number = len(mock_vulnerability_scan['vulnerabilities'])
# Check that the number of OVAL vulnerabilities is the expected
vd.check_detected_vulnerabilities_number(wazuh_log_monitor=wazuh_log_monitor,
expected_vulnerabilities_number=vulnerabilities_number,
feed_source='OVAL', timeout=vd.VULN_DETECTOR_SCAN_TIMEOUT)
# Check the vulnerabilities of packages inserted
for item in mock_vulnerability_scan['vulnerabilities']:
vd.check_vulnerability_scan_event(wazuh_log_monitor=wazuh_log_monitor, package=item['package']['name'],
cve=item['cve']['cveid'])
vd.check_if_modulesd_is_running()
JSON: test_redhat_inventory_redhat_feed.json{
"brief": "These tests will mock RedHat systems and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the alerts from the RedHat provider feed.",
"modules": [
"Vulnerability detector"
],
"daemons": [
"wazuh-modulesd"
],
"platform": [
"RedHat"
],
"version": [
"RHEL5",
"RHEL6",
"RHEL7",
"RHEL8"
],
"tiers": [
0
],
"tags": [
"NVD"
],
"completeness": 100,
"component": [
"manager"
],
"name": "test_redhat_inventory_redhat_feed.py",
"id": 9,
"group_id": 2,
"tests": [
{
"description": "Check if inserted vulnerable packages are reported by vulnerability detector",
"parameters": [
{
"mock_vulnerability_scan": {
"type": "dict",
"brief": "vulnerability scan mock"
}
}
],
"assertions": [
"A report of the corresponding vulnerabilities is generated in the `ossec.log`."
],
"wazuh_min_version": null,
"status": "working",
"behaviour": [
"Clean NVD, `vulnerabilities`, and `sys_programs tables`.",
"Mock the system.",
"Insert [Red Hat custom feed and vulnerable packages](../../test_scan_results/data/redhat_vulnerabilities.json).",
"Import NVD vulnerabilities from [custom NVD feed](../../test_scan_results/data/real_nvd_feed.json).",
"Check this event in `ossec.log`: The '{package}' package .* from agent .* is vulnerable to '{cve}'"
],
"logging": [
{
"callback": [
"r\"The {feed_source} found a total of '{expected_vulnerabilities_number}' potential vulnerabilities \" \"for agent .*\"",
"r\"The '{package}' package .* from agent .* is vulnerable to '{cve}'\""
]
},
{
"error_message": [
"The expected number of {feed_source} vulnerabilities have not been found",
"Could not find the report which says that the package {package} is vulnerable with {cve}"
]
}
],
"name": "test_redhat_vulnerabilities_report",
"test_cases": [
"redhat_scan_configuration-RHEL8",
"redhat_scan_configuration-RHEL7",
"redhat_scan_configuration-RHEL6",
"redhat_scan_configuration-RHEL5"
]
}
]
} YAML: test_redhat_inventory_redhat_feed.yamlbrief: These tests will mock RedHat systems and insert custom vulnerabilities and
vulnerable packages to check if Vulnerability Detector generates the alerts from
the RedHat provider feed.
completeness: 100
component:
- manager
daemons:
- wazuh-modulesd
group_id: 2
id: 9
modules:
- Vulnerability detector
name: test_redhat_inventory_redhat_feed.py
platform:
- RedHat
tags:
- NVD
tests:
- assertions:
- A report of the corresponding vulnerabilities is generated in the `ossec.log`.
behaviour:
- Clean NVD, `vulnerabilities`, and `sys_programs tables`.
- Mock the system.
- Insert [Red Hat custom feed and vulnerable packages](../../test_scan_results/data/redhat_vulnerabilities.json).
- Import NVD vulnerabilities from [custom NVD feed](../../test_scan_results/data/real_nvd_feed.json).
- 'Check this event in `ossec.log`: The ''{package}'' package .* from agent .* is
vulnerable to ''{cve}'''
description: Check if inserted vulnerable packages are reported by vulnerability
detector
logging:
- callback:
- r"The {feed_source} found a total of '{expected_vulnerabilities_number}' potential
vulnerabilities " "for agent .*"
- r"The '{package}' package .* from agent .* is vulnerable to '{cve}'"
- error_message:
- The expected number of {feed_source} vulnerabilities have not been found
- Could not find the report which says that the package {package} is vulnerable
with {cve}
name: test_redhat_vulnerabilities_report
parameters:
- mock_vulnerability_scan:
brief: vulnerability scan mock
type: dict
status: working
test_cases:
- redhat_scan_configuration-RHEL8
- redhat_scan_configuration-RHEL7
- redhat_scan_configuration-RHEL6
- redhat_scan_configuration-RHEL5
wazuh_min_version: null
tiers:
- 0
version:
- RHEL5
- RHEL6
- RHEL7
- RHEL8
`test_vulnerability_detector/test_general_settings/test_general_settings_ignore_time.py`'''
brief: The tests will modify the value of `ignore_time` tag in `ossec.conf`, set different times
and check the result in `ossec.log`.
copyright:
Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the terms of GPLv2
modules:
- vulnerability detector
daemons:
- wazuh-modulesd
os_platform:
- linux
os_vendor:
- redhat
- debian
- ubuntu
- alas
- arch-linux
os_version:
- rhel5
- rhel6
- rhel7
- rhel8
- buster
- stretch
- wheezy
- bionic
- xenial
- trusty
- amazon-linux-1
- amazon-linux-2
tiers:
- 0
tags:
- log_monitor
component:
- manager
'''
import os
from datetime import timedelta
import pytest
import wazuh_testing.vulnerability_detector as vd
from wazuh_testing.fim import check_time_travel
from wazuh_testing.tools import LOG_FILE_PATH
from wazuh_testing.tools.configuration import load_wazuh_configurations
from wazuh_testing.tools.monitoring import FileMonitor
from wazuh_testing.tools.services import control_service
from wazuh_testing.tools.time import time_to_seconds
# Marks
pytestmark = pytest.mark.tier(level=0)
# variables
test_path = os.path.dirname(os.path.realpath(__file__))
test_data_path = os.path.join(test_path, 'data')
nvd_feed_path = os.path.join(os.path.dirname(test_path), 'test_scan_results', 'data', vd.REAL_NVD_FEED)
configurations_path = os.path.join(test_data_path, 'wazuh_ignore_time.yaml')
wazuh_log_monitor = FileMonitor(LOG_FILE_PATH)
callback_string_vulnerability = f"'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'"
parameters = [{'IGNORE_TIME': '3600s', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path},
{'IGNORE_TIME': '60m', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path},
{'IGNORE_TIME': '1h', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path}]
metadata = [{'ignore_time': '3600s', 'timeout': 30, 'jumps': 2, 'interval': '5s'},
{'ignore_time': '60m', 'timeout': 30, 'jumps': 2, 'interval': '5s'},
{'ignore_time': '1h', 'timeout': 30, 'jumps': 2, 'interval': '5s'}]
# Configuration data
configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters, metadata=metadata)
# fixtures
@pytest.fixture(scope='module', params=configurations)
def get_configuration(request):
"""
description:
Get configurations from the module.
parameters:
- request
"""
return request.param
@pytest.fixture(scope='module')
def prepare_agent(mock_agent):
control_service('stop', daemon='wazuh-db')
vd.clean_vd_tables(mock_agent)
vd.insert_package(agent=mock_agent, vendor="Red Hat, Inc.")
vd.insert_vulnerability()
control_service('start', daemon='wazuh-db')
yield mock_agent
def test_ignore_time(get_configuration, configure_environment, restart_modulesd, prepare_agent,
custom_callback_vulnerability=vd.make_vuln_callback(callback_string_vulnerability)):
"""
description:
Check if an alert is not fired during the ignore time interval
parameters:
- configure_environment:
type: fixture
brief: Configure a custom environment for testing.
- get_configuration:
type: fixture
brief: Get configurations from the module.
- restart_modulesd:
type: callable
brief: Restart modulesd daemon
- prepare_agent:
type: fixture
brief: setup and start an agent
- custom_callback_vulnerability:
type: str
brief: custon callback created
wazuh_min_version:
3.13
behaviour:
- Insert a custom vulnerability and vulnerable package.
- Check the initial vulnerability alert.
- Advance the time clock before the set time and check that the alert has not been generated.
- Advance the time clock just after the set time and check that the alert has been generated.
expected_behaviour:
- '{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'
- 'Alert did not appear at the start of the test'
- 'Alert did not appear at the end of the test'
"""
control_service('stop', daemon='wazuh-modulesd')
control_service('stop', daemon='wazuh-db')
vd.update_last_scan(agent=prepare_agent)
control_service('start', daemon='wazuh-db')
control_service('start', daemon='wazuh-modulesd')
ignore_time = get_configuration['metadata']['ignore_time']
jumps = get_configuration['metadata']['jumps']
seconds_to_travel = time_to_seconds(ignore_time) / jumps
# Check for initial alert
wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'],
callback=custom_callback_vulnerability,
error_message='Alert did not appear at the start of the test')
# Check if alert does not appear during ignore time
for _ in range(1, jumps):
check_time_travel(time_travel=True, interval=timedelta(seconds=seconds_to_travel))
with pytest.raises(TimeoutError):
wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'],
callback=custom_callback_vulnerability)
raise AttributeError('Alert appeared before ignore_time was finished')
# Travel to the time set in ignore time
check_time_travel(time_travel=True, interval=timedelta(seconds=seconds_to_travel))
# Check for final alert
wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'],
callback=custom_callback_vulnerability,
error_message='Alert did not appear at the end of the test')
JSON: test_general_settings_ignore_time.json{
"brief": "The tests will modify the value of `ignore_time` tag in `ossec.conf`, set different times and check the result in `ossec.log`.",
"category": "Integration",
"modules": [
"Vulnerability Detector"
],
"daemons": [
"wazuh-modulesd"
],
"platform": [
"RedHat",
"Canonical",
"Debian",
"Windows"
],
"version": [
"RHEL5",
"RHEL6",
"RHEL7",
"RHEL8",
"BUSTER",
"STRETCH",
"JESSIE",
"WHEEZY",
"BIONIC",
"XENIAL",
"TRUSTY",
"WIN10"
],
"tiers": [
0
],
"tags": [
"log_monitor"
],
"completeness": 100,
"component": "Manager",
"name": "test_general_settings_ignore_time.py",
"id": 5,
"group_id": 2,
"tests": [
{
"description": "Check if an alert is not fired during the ignore time interval",
"parameters": [
{
"configure_environment": {
"type": "fixture",
"brief": "Configure a custom environment for testing."
}
},
{
"get_configuration": {
"type": "fixture",
"brief": "Get configurations from the module."
}
},
{
"restart_modulesd": {
"type": "callable",
"brief": "Restart modulesd daemon"
}
},
{
"prepare_agent": {
"type": "fixture",
"brief": "setup and start an agent"
}
},
{
"custom_callback_vulnerability": {
"type": "str",
"brief": "custon callback created"
}
}
],
"assertions": [
"Vulnerabilities alerts are not generated before `ignore_time` time set.",
"Vulnerabilities alerts are generated after `ignore_time` time set."
],
"wazuh_min_version": "4.1.0",
"status": "working",
"behaviour": [
"Insert a custom vulnerability and vulnerable package.",
"Check the initial vulnerability alert.",
"Advance the time clock before the set time and check that the alert has not been generated.",
"Advance the time clock just after the set time and check that the alert has been generated."
],
"logging": [
{
"callback": [
"r\"{vd.DEFAULT_PACKAGE_NAME} is vulnerable to {vd.DEFAULT_VULNERABILITY_ID}\""
]
},
{
"error_message": [
"Alert did not appear at the start of the test",
"Alert did not appear at the end of the test"
]
}
],
"name": "test_ignore_time",
"test_cases": [
"get_configuration0",
"get_configuration1",
"get_configuration2"
]
}
]
} YAML: test_general_settings_ignore_time.yamlbrief: The tests will modify the value of `ignore_time` tag in `ossec.conf`, set different
times and check the result in `ossec.log`.
category: Integration
completeness: 100
component: Manager
daemons:
- wazuh-modulesd
group_id: 2
id: 5
modules:
- Vulnerability Detector
name: test_general_settings_ignore_time.py
platform:
- RedHat
- Canonical
- Debian
- Windows
tags:
- log_monitor
tests:
- assertions:
- Vulnerabilities alerts are not generated before `ignore_time` time set.
- Vulnerabilities alerts are generated after `ignore_time` time set.
behaviour:
- Insert a custom vulnerability and vulnerable package.
- Check the initial vulnerability alert.
- Advance the time clock before the set time and check that the alert has not been
generated.
- Advance the time clock just after the set time and check that the alert has been
generated.
description: Check if an alert is not fired during the ignore time interval
logging:
- callback:
- r"{vd.DEFAULT_PACKAGE_NAME} is vulnerable to {vd.DEFAULT_VULNERABILITY_ID}"
- error_message:
- Alert did not appear at the start of the test
- Alert did not appear at the end of the test
name: test_ignore_time
parameters:
- configure_environment:
brief: Configure a custom environment for testing.
type: fixture
- get_configuration:
brief: Get configurations from the module.
type: fixture
- restart_modulesd:
brief: Restart modulesd daemon
type: callable
- prepare_agent:
brief: setup and start an agent
type: fixture
- custom_callback_vulnerability:
brief: custon callback created
type: str
status: working
test_cases:
- get_configuration0
- get_configuration1
- get_configuration2
wazuh_min_version: 4.1.0
tiers:
- 0
version:
- RHEL5
- RHEL6
- RHEL7
- RHEL8
- BUSTER
- STRETCH
- JESSIE
- WHEEZY
- BIONIC
- XENIAL
- TRUSTY
- WIN10
`test_agentd/test_agentd_reconnection.py`'''
brief: These tests will check if, during enrollment, the agent re-establishes communication with the manager
under different situations that interrupt it.
copyright:
Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the terms of GPLv2
modules:
- agentd
daemons:
- wazuh-agentd
os_platform:
- linux
- windows
- mac
os_vendor:
- redhat
- debian
- ubuntu
- alas
- macos
- arch-linux
os_version:
- rhel5
- rhel6
- rhel7
- rhel8
- buster
- stretch
- jessie
- wheezy
- bionic
- xenial
- trusty
- windows-10
- windows-8
- windows-7
- windows-server-2003
- windows-server-2012
- windows-server-2016
- macos-catalina
- macos-server
- amazon-linux-1
- amazon-linux-2
tiers:
- 0
tags:
- tcp
- udp
- authd
component:
- agent
'''
import os
import platform
from time import sleep
import pytest
from datetime import datetime, timedelta
from wazuh_testing.agent import CLIENT_KEYS_PATH, SERVER_CERT_PATH, SERVER_KEY_PATH
from wazuh_testing.tools import WAZUH_PATH, LOG_FILE_PATH
from wazuh_testing.tools.authd_sim import AuthdSimulator
from wazuh_testing.tools.configuration import load_wazuh_configurations
from wazuh_testing.tools.file import truncate_file
from wazuh_testing.tools.monitoring import QueueMonitor, FileMonitor
from wazuh_testing.tools.remoted_sim import RemotedSimulator
from wazuh_testing.tools.services import control_service
# Marks
pytestmark = [pytest.mark.linux, pytest.mark.win32, pytest.mark.tier(level=0), pytest.mark.agent]
test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data')
configurations_path = os.path.join(test_data_path, 'wazuh_conf.yaml')
params = [
{
'SERVER_ADDRESS': '127.0.0.1',
'REMOTED_PORT': 1514,
'PROTOCOL': 'tcp',
},
{
'SERVER_ADDRESS': '127.0.0.1',
'REMOTED_PORT': 1514,
'PROTOCOL': 'udp',
}
]
metadata = [
{'PROTOCOL': 'tcp'},
{'PROTOCOL': 'udp'}
]
config_ids = ['tcp', 'udp']
configurations = load_wazuh_configurations(configurations_path, __name__, params=params, metadata=metadata)
log_monitor_paths = []
receiver_sockets_params = []
monitored_sockets_params = []
receiver_sockets, monitored_sockets, log_monitors = None, None, None # Set in the fixtures
authd_server = AuthdSimulator(params[0]['SERVER_ADDRESS'], key_path=SERVER_KEY_PATH, cert_path=SERVER_CERT_PATH)
remoted_server = None
def teardown():
global remoted_server
if remoted_server is not None:
remoted_server.stop()
def set_debug_mode():
"""Set debug2 for agentd in local internal options file."""
if platform.system() == 'win32' or platform.system() == 'Windows':
local_int_conf_path = os.path.join(WAZUH_PATH, 'local_internal_options.conf')
debug_line = 'windows.debug=2\nagent.recv_timeout=5\n'
else:
local_int_conf_path = os.path.join(WAZUH_PATH, 'etc', 'local_internal_options.conf')
debug_line = 'agent.debug=2\nagent.recv_timeout=5\n'
with open(local_int_conf_path, 'r') as local_file_read:
lines = local_file_read.readlines()
for line in lines:
if line == debug_line:
return
with open(local_int_conf_path, 'a') as local_file_write:
local_file_write.write('\n' + debug_line)
set_debug_mode()
# fixtures
@pytest.fixture(scope="module", params=configurations, ids=config_ids)
def get_configuration(request):
"""Get configurations from the module"""
return request.param
@pytest.fixture(scope="function")
def configure_authd_server(request):
"""Initialize a simulated authd connection."""
authd_server.start()
global monitored_sockets
monitored_sockets = QueueMonitor(authd_server.queue)
authd_server.clear()
yield
authd_server.shutdown()
def start_authd():
"""Enable authd to accept connections and perform enrollments."""
authd_server.set_mode("ACCEPT")
authd_server.clear()
def stop_authd():
"""Disable authd to accept connections and perform enrollments."""
authd_server.set_mode("REJECT")
def set_authd_id():
"""Set agent id to 101 in the authd simulated connection."""
authd_server.agent_id = 101
def clean_keys():
"""Clear the agent's client.keys file."""
truncate_file(CLIENT_KEYS_PATH)
sleep(1)
def delete_keys():
"""Remove the agent's client.keys file."""
os.remove(CLIENT_KEYS_PATH)
sleep(1)
def set_keys():
"""Write to client.keys file the agent's enrollment details."""
with open(CLIENT_KEYS_PATH, 'w+') as f:
f.write("100 ubuntu-agent any TopSecret")
sleep(1)
def wait_notify(line):
"""Callback function to wait for agent checkins to the manager."""
if 'Sending keep alive:' in line:
return line
return None
def wait_enrollment(line):
"""Callback function to wait for enrollment."""
if 'Valid key received' in line:
return line
return None
def wait_enrollment_try(line):
"""Callback function to wait for enrollment attempt."""
if 'Requesting a key' in line:
return line
return None
def search_error_messages():
"""Retrieve the line of the log file where first error is found.
Returns:
str: String where the error is found or None if errors are not found.
"""
with open(LOG_FILE_PATH, 'r') as log_file:
lines = log_file.readlines()
for line in lines:
if f"ERROR:" in line:
return line
return None
# Tests
"""
This test covers the scenario of Agent starting with keys,
when misses communication with Remoted and a new enrollment is sent to Authd.
"""
def test_agentd_reconection_enrollment_with_keys(configure_authd_server, configure_environment, get_configuration):
"""
description:
Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd.
In this case, the agent starts with keys.
parameters:
- configure_authd_server:
type: fixture
brief: Initialize a simulated authd connection.
- start_authd:
type: fixture
brief: Enable authd to accept connections and perform enrollments.
- set_authd_id:
type: fixture
brief: Set agent id to 101 in the authd simulated connection.
- set_keys:
type: fixture
brief: Write to client.keys file the agent's enrollment details.
- configure_environment:
type: fixture
brief: Configure a custom environment for testing.
- get_configuration:
type: fixture
brief: Get configurations from the module.
wazuh_min_version:
4.1
behaviour:
- Verify that `wazuh-agentd` is communicating with `wazuh-remoted`(if the agent is already enrolled)
- Configure `wazuh-remoted` to reject this connection
- Verify that the agent is enrolled again.
expected_behaviour:
- "Sending keep alive"
- "Valid key received"
- "Notify message from agent was never sent!"
- "Agent never enrolled after rejecting connection!"
- "Notify message from agent was never sent!"
- "Incorrect Secure Message"
"""
global remoted_server
remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK',
client_keys=CLIENT_KEYS_PATH)
# Stop target Agent
control_service('stop')
# Prepare test
start_authd()
set_authd_id()
set_keys()
# Clean logs
truncate_file(LOG_FILE_PATH)
# Start target Agent
control_service('start')
# Start hearing logs
truncate_file(LOG_FILE_PATH)
log_monitor = FileMonitor(LOG_FILE_PATH)
# hearing on enrollment server
authd_server.clear()
# Wait until Agent is notifying Manager
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
# Start rejecting Agent
remoted_server.set_mode('REJECT')
# hearing on enrollment server
authd_server.clear()
# Wait until Agent asks a new key to enrollment
log_monitor.start(timeout=180, callback=wait_enrollment,
error_message="Agent never enrolled after rejecting connection!")
# Start responding to Agent
remoted_server.set_mode('CONTROLLED_ACK')
# Wait until Agent is notifying Manager
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message"
"""
This test covers the scenario of Agent starting without client.keys file
and an enrollment is sent to Authd to start communicating with Remoted
"""
def test_agentd_reconection_enrollment_no_keys_file(configure_authd_server, configure_environment, get_configuration):
"""
description:
Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd.
In this case, the agent doesn't have client.keys file.
parameters:
- configure_authd_server:
type: fixture
brief: Initialize a simulated authd connection.
- start_authd:
type: fixture
brief: Enable authd to accept connections and perform enrollments.
- set_authd_id:
type: fixture
brief: Set agent id to 101 in the authd simulated connection.
- delete_keys:
type: fixture
brief: Remove the agent's client.keys file.
- configure_environment:
type: fixture
brief: Configure a custom environment for testing.
- get_configuration:
type: fixture
brief: Get configurations from the module.
wazuh_min_version:
4.1
behaviour:
- Verify that `wazuh-agentd` is communicating with `wazuh-remoted`(if the agent is already enrolled)
- Configure `wazuh-remoted` to reject this connection
- Verify that the agent is enrolled again.
expected_behaviour:
- "Sending keep alive"
- "Valid key received"
- "Agent never enrolled for the first time."
- "Agent never enrolled after rejecting connection!"
- "Notify message from agent was never sent!"
- "Incorrect Secure Message"
"""
global remoted_server
remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK',
client_keys=CLIENT_KEYS_PATH)
# Stop target Agent
control_service('stop')
# Clean logs
truncate_file(LOG_FILE_PATH)
# Prepare test
start_authd()
set_authd_id()
delete_keys()
# Start target Agent
control_service('start')
# start hearing logs
log_monitor = FileMonitor(LOG_FILE_PATH)
# hearing on enrollment server
authd_server.clear()
# Wait until Agent asks keys for the first time
log_monitor.start(timeout=50, callback=wait_enrollment,
error_message="Agent never enrolled for the first time.")
# Wait until Agent is notifing Manager
log_monitor.start(timeout=50, callback=wait_notify, error_message="Notify message from agent was never sent!")
# Start rejecting Agent
remoted_server.set_mode('REJECT')
# hearing on enrollment server
authd_server.clear()
# Wait until Agent asks a new key to enrollment
log_monitor.start(timeout=180, callback=wait_enrollment,
error_message="Agent never enrolled after rejecting connection!")
# Start responding to Agent
remoted_server.set_mode('CONTROLLED_ACK')
# Wait until Agent is notifing Manager
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message"
"""
This test covers the scenario of Agent starting without keys in client.keys file
and an enrollment is sent to Authd to start communicating with Remoted
"""
def test_agentd_reconection_enrollment_no_keys(configure_authd_server, configure_environment, get_configuration):
"""
description:
Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd.
In this case, the agent has its client.keys file empty.
parameters:
- configure_authd_server:
type: fixture
brief: Initialize a simulated authd connection.
- start_authd:
type: fixture
brief: Enable authd to accept connections and perform enrollments.
- set_authd_id:
type: fixture
brief: Set agent id to 101 in the authd simulated connection.
- clean_keys:
type: fixture
brief: Clear the agent's client.keys file.
- configure_environment:
type: fixture
brief: Configure a custom environment for testing.
- get_configuration:
type: fixture
brief: Get configurations from the module.
wazuh_min_version:
4.1
behaviour:
- Verify that `wazuh-agentd` is communicating with `wazuh-remoted`(if the agent is already enrolled)
- Configure `wazuh-remoted` to reject this connection
- Verify that the agent is enrolled again.
expected_output:
- "Sending keep alive"
- "Valid key received"
- "Agent never enrolled for the first time."
- "Agent never enrolled after rejecting connection!"
- "Notify message from agent was never sent!"
- "Incorrect Secure Message"
"""
global remoted_server
remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK',
client_keys=CLIENT_KEYS_PATH)
# Stop target Agent
control_service('stop')
# Clean logs
truncate_file(LOG_FILE_PATH)
# Prepare test
start_authd()
set_authd_id()
clean_keys()
# Start target Agent
control_service('start')
# start hearing logs
log_monitor = FileMonitor(LOG_FILE_PATH)
# hearing on enrollment server
authd_server.clear()
# Wait until Agent asks keys for the first time
log_monitor.start(timeout=120, callback=wait_enrollment,
error_message="Agent never enrolled for the first time rejecting connection!")
# Wait until Agent is notifying Manager
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message"
# Start rejecting Agent
remoted_server.set_mode('REJECT')
# hearing on enrollment server
authd_server.clear()
# Wait until Agent asks a new key to enrollment
log_monitor.start(timeout=180, callback=wait_enrollment,
error_message="Agent never enrolled after rejecting connection!")
# Start responding to Agent
remoted_server.set_mode('CONTROLLED_ACK')
# Wait until Agent is notifying Manager
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message"
"""
This test covers and check the scenario of Agent starting without keys
and multiple retries are required until the new key is obtained to start communicating with Remoted
"""
def test_agentd_initial_enrollment_retries(configure_authd_server, configure_environment, get_configuration):
"""
description:
Check how the agent behaves when it makes multiple enrollment attempts before getting its key.
For this, the agent starts without keys and perform multiple enrollment requests
to authd before getting the new key to communicate with remoted.
parameters:
- configure_authd_server:
type: fixture
brief: Initialize a simulated authd connection.
- stop_authd:
type: fixture
brief: Disable authd to accept connections and perform enrollments.
- set_authd_id:
type: fixture
brief: Set agent id to 101 in the authd simulated connection.
- clean_keys:
type: fixture
brief: Clear the agent's client.keys file.
- configure_environment:
type: fixture
brief: Configure a custom environment for testing.
- get_configuration:
type: fixture
brief: Get configurations from the module.
wazuh_min_version:
4.1
behaviour:
- Verify that `wazuh-agentd` is communicating with `wazuh-remoted`(if the agent is already enrolled)
- Configure `wazuh-remoted` to reject this connection
- Verify that the agent is enrolled again.
expected_behaviour:
- "Requesting a key"
- "Sending keep alive"
- "Valid key received"
- "Enrollment retry was not sent!"
- "Retries too quick"
- "No succesful enrollment after reties!"
- "Notify message from agent was never sent!"
- "A Wazuh module stopped because of Agentd initialization!"
"""
global remoted_server
remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK',
client_keys=CLIENT_KEYS_PATH)
# Stop target Agent
control_service('stop')
# Clean logs
truncate_file(LOG_FILE_PATH)
# Preapre test
stop_authd()
set_authd_id()
clean_keys()
# Start whole Agent service to check other daemons status after initialization
control_service('start')
# Start hearing logs
log_monitor = FileMonitor(LOG_FILE_PATH)
start_time = datetime.now()
# Check for unsuccessful enrollment retries in Agentd initialization
retries = 0
while retries < 4:
retries += 1
log_monitor.start(timeout=retries * 5 + 20, callback=wait_enrollment_try,
error_message="Enrollment retry was not sent!")
stop_time = datetime.now()
expected_time = start_time + timedelta(seconds=retries * 5 - 2)
# Check if delay was applied
assert stop_time > expected_time, "Retries too quick"
# Enable authd
authd_server.clear()
authd_server.set_mode("ACCEPT")
# Wait successfully enrollment
# Wait succesfull enrollment
log_monitor.start(timeout=70, callback=wait_enrollment, error_message="No succesful enrollment after reties!")
# Wait until Agent is notifying Manager
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
# Check if no Wazuh module stopped due to Agentd Initialization
with open(LOG_FILE_PATH) as log_file:
log_lines = log_file.read().splitlines()
for line in log_lines:
if "Unable to access queue:" in line:
raise AssertionError("A Wazuh module stopped because of Agentd initialization!")
"""
This test covers and check the scenario of Agent starting with keys but Remoted is not reachable during some seconds
and multiple connection retries are required prior to requesting a new enrollment
"""
def test_agentd_connection_retries_pre_enrollment(configure_authd_server, configure_environment, get_configuration):
"""
description:
Check how the agent behaves when Remoted is not available and performs multiple connection attempts to it.
For this, the agent starts with keys but Remoted is not available for several seconds,
then the agent performs multiple connection retries before requesting a new enrollment.
parameters:
- configure_authd_server:
type: fixture
brief: Initialize a simulated authd connection.
- stop_authd:
type: fixture
brief: Disable authd to accept connections and perform enrollments.
- set_keys:
type: fixture
brief: Write to client.keys file the agent's enrollment details.
- configure_environment:
type: fixture
brief: Configure a custom environment for testing.
- get_configuration:
type: fixture
brief: Get configurations from the module.
wazuh_min_version:
4.1
behaviour:
- Verify that `wazuh-agentd` is communicating with `wazuh-remoted`(if the agent is already enrolled)
- Configure `wazuh-remoted` to reject this connection
- Verify that the agent is enrolled again.
expected_behaviour:
- "Sending keep alive"
- "Notify message from agent was never sent!"
"""
global remoted_server
REMOTED_KEYS_SYNC_TIME = 10
# Start Remoted mock
remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], client_keys=CLIENT_KEYS_PATH)
# Stop target Agent
control_service('stop')
# Clean logs
truncate_file(LOG_FILE_PATH)
# Prepare test
stop_authd()
set_keys()
# Start hearing logs
log_monitor = FileMonitor(LOG_FILE_PATH)
# Start whole Agent service to check other daemons status after initialization
control_service('start')
# Simulate time of Remoted to synchronize keys by waiting previous to start responding
remoted_server.set_mode('CONTROLLED_ACK')
sleep(REMOTED_KEYS_SYNC_TIME)
# Check Agentd is finally communicating
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
`test_remoted/test_agent_communication/test_request_agent_info.py`'''
brief:
Check that manager-agent communication through remoted socket works as expected.
copyright:
Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the terms of GPLv2
modules:
- remoted
daemons:
- wazuh-remoted
os_platform:
- linux
os_vendor:
- redhat
- debian
- ubuntu
- alas
- arch-linux
os_version:
- rhel5
- rhel6
- rhel7
- rhel8
- buster
- stretch
- wheezy
- bionic
- xenial
- trusty
- amazon-linux-1
- amazon-linux-2
tiers:
- 0
tags:
- tcp
- udp
- authd
component:
- manager
'''
import os
import time
import pytest
import wazuh_testing.tools.agent_simulator as ag
from wazuh_testing.tools.configuration import load_wazuh_configurations
from wazuh_testing.tools.sockets import send_request
# Marks
pytestmark = pytest.mark.tier(level=0)
# Configuration
test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data')
configurations_path = os.path.join(test_data_path, 'wazuh_request_agent_info.yaml')
parameters = [
{'PROTOCOL': 'udp,tcp'},
{'PROTOCOL': 'tcp'},
{'PROTOCOL': 'udp'},
]
metadata = [
{'PROTOCOL': 'udp,tcp'},
{'PROTOCOL': 'tcp'},
{'PROTOCOL': 'udp'},
]
# test cases
test_case = {
'disconnected': ('agent getconfig disconnected',
'Cannot send request'),
'get_config': ('agent getconfig client',
'{"client":{"config-profile":"centos8","notify_time":10,"time-reconnect":60}}'),
'get_state': ('logcollector getstate',
'{"error":0,"data":{"global":{"start":"2021-02-26, 06:41:26","end":"2021-02-26 08:49:19"}}}')
}
configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters, metadata=metadata)
config_ids = [x['PROTOCOL'] for x in parameters]
# Utils
manager_address = "localhost"
# fixtures
@pytest.fixture(scope="module", params=configurations, ids=config_ids)
def get_configuration(request):
"""Get configurations from the module."""
return request.param
@pytest.mark.parametrize("command_request,expected_answer", test_case.values(), ids=list(test_case.keys()))
def test_request(get_configuration, configure_environment, remove_shared_files,
restart_remoted, command_request, expected_answer):
"""
description:
Writes (config/state) requests in $DIR/queue/ossec/request and check if remoted forwards it to the agent,
collects the response, and writes it in the socket or returns an error message if the queried
agent is disconnected.
parameters:
- remove_shared_files:
type: fixture
brief: Temporary removes txt files from default agent group shared files
- restart_remoted:
type: fixture
brief: Reset ossec.log and start a new monitor
- configure_environment:
type: fixture
brief: Configure a custom environment for testing.
- get_configuration:
type: fixture
brief: Get configurations from the module.
wazuh_min_version:
4.2
behaviour:
- Test getconfig request
- Test getstate request
- Test getconfig request for a disconnected agent
expected_behaviour:
- "Remoted unexpected answer"
"""
cfg = get_configuration['metadata']
protocols = cfg['PROTOCOL'].split(',')
agents = [ag.Agent(manager_address, "aes", os="debian8", version="4.2.0") for _ in range(len(protocols))]
for agent, protocol in zip(agents, protocols):
if "disconnected" not in command_request:
sender, injector = ag.connect(agent, manager_address, protocol)
msg_request = f'{agent.id} {command_request}'
response = send_request(msg_request)
assert expected_answer in response, "Remoted unexpected answer"
if "disconnected" not in command_request:
injector.stop_receive()
test_request_agent_info.json{
"brief": "Check that manager-agent communication through remoted socket works as expected.",
"category": "Integration",
"modules": [
"Remoted"
],
"daemons": [
"wazuh-remoted"
],
"platform": [
"RedHat",
"Canonical",
"Debian"
],
"tiers": [
0
],
"tags": [
"tcp",
"udp",
"authd"
],
"completeness": 100,
"component": [
"manager"
],
"name": "test_request_agent_info.py",
"id": 67,
"group_id": 47,
"tests": [
{
"description": "Writes (config/state) requests in $DIR/queue/ossec/request and check if remoted forwards it to the agent, collects the response, and writes it in the socket or returns an error message if the queried agent is disconnected.",
"parameters": [
{
"remove_shared_files": {
"type": "fixture",
"brief": "Temporary removes txt files from default agent group shared files"
}
},
{
"restart_remoted": {
"type": "fixture",
"brief": "Reset ossec.log and start a new monitor"
}
},
{
"configure_environment": {
"type": "fixture",
"brief": "Configure a custom environment for testing."
}
},
{
"get_configuration": {
"type": "fixture",
"brief": "Get configurations from the module."
}
}
],
"assertions": [
"The agent has keys and loses communication with RemoteD",
"The agent does not have keys and loses communication with RemoteD when enrollment has been started",
"The agent does not have `client.keys` file and loses communication with RemoteD when enrollment has been started",
"The agent has keys, loses communication with RemoteD, and performs multiple enrollment requests",
"The agent does not have keys, RemoteD is unavailable for several seconds and multiple connection requests are performed before a new enrollment is made"
],
"wazuh_min_version": "4.1.0",
"status": "working",
"behaviour": [
"Test getconfig request",
"Test getstate request",
"Test getconfig request for a disconnected agent"
],
"logging": [
{
"error_message": [
"Remoted unexpected answer"
]
}
],
"name": "test_request",
"test_cases": [
"udp,tcp-disconnected",
"udp,tcp-get_config",
"udp,tcp-get_state",
"tcp-disconnected",
"tcp-get_config",
"tcp-get_state",
"udp-disconnected",
"udp-get_config",
"udp-get_state"
]
}
]
} test_request_agent_info.yamlbrief: Check that manager-agent communication through remoted socket works as expected.
category: Integration
completeness: 100
component:
- manager
daemons:
- wazuh-remoted
group_id: 47
id: 67
modules:
- Remoted
name: test_request_agent_info.py
platform:
- RedHat
- Canonical
- Debian
tags:
- tcp
- udp
- authd
tests:
- assertions:
- The agent has keys and loses communication with RemoteD
- The agent does not have keys and loses communication with RemoteD when enrollment
has been started
- The agent does not have `client.keys` file and loses communication with RemoteD
when enrollment has been started
- The agent has keys, loses communication with RemoteD, and performs multiple enrollment
requests
- The agent does not have keys, RemoteD is unavailable for several seconds and multiple
connection requests are performed before a new enrollment is made
behaviour:
- Test getconfig request
- Test getstate request
- Test getconfig request for a disconnected agent
description: Writes (config/state) requests in $DIR/queue/ossec/request and check
if remoted forwards it to the agent, collects the response, and writes it in the
socket or returns an error message if the queried agent is disconnected.
logging:
- error_message:
- Remoted unexpected answer
name: test_request
parameters:
- remove_shared_files:
brief: Temporary removes txt files from default agent group shared files
type: fixture
- restart_remoted:
brief: Reset ossec.log and start a new monitor
type: fixture
- configure_environment:
brief: Configure a custom environment for testing.
type: fixture
- get_configuration:
brief: Get configurations from the module.
type: fixture
status: working
test_cases:
- udp,tcp-disconnected
- udp,tcp-get_config
- udp,tcp-get_state
- tcp-disconnected
- tcp-get_config
- tcp-get_state
- udp-disconnected
- udp-get_config
- udp-get_state
wazuh_min_version: 4.1.0
tiers:
- 0
Schema changesIdeas
|
2021-08-25QA Docs schema 2.0 proposal 2Module block
Test block
Sample documentation
Show documentation
'''
copyright:
Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the terms of GPLv2
type:
Integration
description:
These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities
and vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed.
tiers:
- 0
component:
Server
platform:
- Linux, RHEL5
- Linux, RHEL6
- Linux, RHEL7
- Linux, RHEL8
- Linux, Amazon Linux 1
- Linux, Amazon Linux 2
- Linux, Debian BUSTER
- Linux, Debian STRETCH
- Linux, Debian WHEEZY
- Linux, Ubuntu BIONIC
- Linux, Ubuntu XENIAL
- Linux, Ubuntu TRUSTY
- Linux, Arch Linux
checks:
- There are as many NVD alerts as vulnerable packages.
- There are 0 NVD vulnerability alerts for RedHat provider.
- The alerts are produced by the NVD provider.
references:
- https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html
tags:
- linux
- modulesd
- vulnerability_detector
'''
import os
from datetime import timedelta
from shutil import copy
import pytest
import wazuh_testing.vulnerability_detector as vd
from wazuh_testing.fim import check_time_travel
from wazuh_testing.tools import LOG_FILE_PATH
from wazuh_testing.tools import file
from wazuh_testing.tools.configuration import load_wazuh_configurations
from wazuh_testing.tools.monitoring import FileMonitor
# Marks
pytestmark = pytest.mark.tier(level=0)
# Variables
current_test_path = os.path.dirname(os.path.realpath(__file__))
test_data_path = os.path.join(current_test_path, 'data')
configurations_path = os.path.join(test_data_path, 'wazuh_nvd_configuration.yaml')
vulnerabilities_data_path = os.path.join(test_data_path, vd.VULNERABILITIES)
custom_cpe_helper_data_path = os.path.join(test_data_path, vd.CUSTOM_CPE_HELPER)
custom_msu_data_path = os.path.join(test_data_path, vd.CUSTOM_MSU)
wazuh_log_monitor = FileMonitor(LOG_FILE_PATH)
SCAN_TIMEOUT = 40
copy(custom_cpe_helper_data_path, vd.CPE_HELPER_PATH)
# Set configuration
parameters = [{'NVD_JSON_PATH': os.path.join(test_data_path, vd.CUSTOM_NVD_FEED),
'MSU_JSON_PATH': custom_msu_data_path}]
ids = ['scan_nvd_configuration']
# Read JSON data template
nvd_vulnerabilities = file.read_json_file(vulnerabilities_data_path)
system_data = [
{"target": "WINDOWS10", "os_name": "Microsoft Windows Server 2016 Datacenter Evaluation",
"os_major": "10", "os_minor": "0", "name": "windows", "format": "win",
"vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "windows", "version": "Wazuh v4.1"},
{"target": "RHEL8", "os_name": "CentOS Linux", "os_major": "8", "os_minor": "1", "name": "centos8",
"format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "centos", "version": "Wazuh v4.1"},
{"target": "RHEL7", "os_name": "CentOS Linux", "os_major": "7", "os_minor": "8", "name": "centos7",
"format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "centos", "version": "Wazuh v4.1"},
{"target": "RHEL6", "os_name": "CentOS Linux", "os_major": "6", "os_minor": "10", "name": "centos6",
"format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "centos", "version": "Wazuh v4.1"},
{"target": "RHEL5", "os_name": "CentOS Linux", "os_major": "5", "os_minor": "11", "name": "centos5",
"format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "centos", "version": "Wazuh v4.1"},
{"target": "BIONIC", "os_name": "Ubuntu", "os_major": "18", "os_minor": "04", "name": "Ubuntu-bionic",
"format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "ubuntu", "version": "Wazuh v4.1"},
{"target": "XENIAL", "os_name": "Ubuntu", "os_major": "16", "os_minor": "04", "name": "Ubuntu-xenial",
"format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "ubuntu", "version": "Wazuh v4.1"},
{"target": "TRUSTY", "os_name": "Ubuntu", "os_major": "14", "os_minor": "04", "name": "Ubuntu-trusty",
"format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "ubuntu", "version": "Wazuh v4.1"},
{"target": "BUSTER", "os_name": "Debian GNU/Linux", "os_major": "10", "os_minor": "", "name": "debian10",
"format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "debian", "version": "Wazuh v4.1"},
{"target": "STRETCH", "os_name": "Debian GNU/Linux", "os_major": "9", "os_minor": "", "name": "debian9",
"format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "debian", "version": "Wazuh v4.1"},
{"target": "MAC", "os_name": "Mac OS X", "os_major": "10", "os_minor": "15", "name": "macos-catalina",
"format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "darwin", "version": "Wazuh v4.1"},
{"target": "MAC", "os_name": "Mac OS X", "os_major": "10", "os_minor": "15", "name": "macos-catalina",
"format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "darwin", "version": "Wazuh v4.0"},
{"target": "MAC", "os_name": "Mac OS X Server", "os_major": "5", "os_minor": "10", "name": "macos-server",
"format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "darwin", "version": "Wazuh v4.1"}
]
system_data_ids = [system['target'] for system in system_data]
# Configuration data
configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters)
# Fixtures
@pytest.fixture(scope='module', params=configurations, ids=ids)
def get_configuration(request):
"""Get configurations from the module."""
return request.param
@pytest.fixture(scope='module', params=system_data, ids=system_data_ids)
@vd.mock_cve_db
def mock_vulnerability_scan(request, mock_agent):
"""
It allows to mock the vulnerability scan inserting custom packages, feeds and changing the host system
"""
# Mock system
vd.modify_system(agent_id=mock_agent, os_name=request.param['os_name'], os_major=request.param['os_major'],
os_minor=request.param['os_minor'], name=vd.MOCKED_AGENT_NAME,
os_platform=request.param['os_platform'], version=request.param['version'])
# Insert a vulnerability in table VULNERABILITIES
vd.insert_vulnerability(cveid='CWE-000', operation='less than', operation_value='1.0.0',
package='test', target=request.param['target'])
# Add custom vulnerabilities and feeds
for vulnerability in nvd_vulnerabilities['vulnerabilities_nvd']:
vd.insert_package(**vulnerability['package'], source=vulnerability['package']['name'],
format=request.param['format'], agent=mock_agent)
def test_vulnerabilities_report(get_configuration, configure_environment, restart_modulesd, check_cve_db,
mock_vulnerability_scan):
'''
description:
Check if inserted vulnerable packages are reported by vulnerability detector using the NVD feed.
tier:
0
min_version:
4.1
parameters:
- get_configuration (fixture), Get configurations from the module.
- configure_environment (fixture), Configure a custom environment for testing.
- restart_modulesd (fixture), Restart the wazuh-modulesd daemon.
- check_cve_db (fixture), Check if the CVE database exists and its tables are created
- mock_vulnerability_scan (fixture), Mock the vulnerability scan inserting custom packages, feeds and changing the host system.
use_cases:
Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent
to generate the corresponding alerts from NVD feed.
expected_output:
- "Agent '000' is vulnerable to '{cve}'. Condition: '{patch} patch is not installed.'" (if agent OS is Windows).
- "The '{package}' package .* from agent .* is vulnerable to '{cve}'" (for no Windows agents).
- "The NVD found a total of '{vulnerabilities_number}' potential vulnerabilities for agent .*"
tags:
- nvd
- cve
'''
vulnerabilities_number = mock_vulnerability_scan["vulnerabilities_number"]
if mock_vulnerability_scan['format'] == 'pkg' and mock_vulnerability_scan['version'] == 'Wazuh v4.0':
version = mock_vulnerability_scan['version']
wazuh_log_monitor.start(
timeout=SCAN_TIMEOUT,
update_position=False,
callback=vd.make_vuln_callback(fr"Agent .* has an unsupported Wazuh version: '{version}'"),
error_message="The expected event 'Agent .* has an unsupported Wazuh version' not found"
)
return
# Check the vulnerabilities of inserted packages
try:
for item in nvd_vulnerabilities['vulnerabilities_nvd']:
vd.check_vulnerability_scan_event(wazuh_log_monitor, item['package']['name'], item['cve']['cveid'])
except TimeoutError:
check_time_travel(time_travel=True, interval=timedelta(seconds=300))
for item in nvd_vulnerabilities['vulnerabilities_nvd']:
vd.check_vulnerability_scan_event(wazuh_log_monitor, item['package']['name'], item['cve']['cveid'])
# Check that the number of NVD vulnerabilities is the expected
if mock_vulnerability_scan["format"] != "win":
vd.check_detected_vulnerabilities_number(wazuh_log_monitor=wazuh_log_monitor,
expected_vulnerabilities_number=vulnerabilities_number,
feed_source='NVD', timeout=vd.VULN_DETECTOR_SCAN_TIMEOUT)
vd.check_if_modulesd_is_running() test_scan_nvd_feed.json{
"copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. <[email protected]>.\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2",
"type": "Integration",
"description": "These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed.",
"tiers": [
0
],
"component": "Server",
"platform": [
"Linux, RHEL5",
"Linux, RHEL6",
"Linux, RHEL7",
"Linux, RHEL8",
"Linux, Amazon Linux 1",
"Linux, Amazon Linux 2",
"Linux, Debian BUSTER",
"Linux, Debian STRETCH",
"Linux, Debian WHEEZY",
"Linux, Ubuntu BIONIC",
"Linux, Ubuntu XENIAL",
"Linux, Ubuntu TRUSTY",
"Linux, Arch Linux"
],
"checks": [
"There are as many NVD alerts as vulnerable packages.",
"There are 0 NVD vulnerability alerts for RedHat provider.",
"The alerts are produced by the NVD provider."
],
"references": [
"https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html"
],
"tags": [
"linux",
"modulesd",
"vulnerability_detector"
],
"name": "test_scan_nvd_feed.py",
"id": 16,
"group_id": 2,
"tests": [
{
"description": "Check if inserted vulnerable packages are reported by vulnerability detector using the NVD feed.",
"tier": 0,
"min_version": 4.1,
"parameters": [
"get_configuration (fixture), Get configurations from the module.",
"configure_environment (fixture), Configure a custom environment for testing.",
"restart_modulesd (fixture), Restart the wazuh-modulesd daemon.",
"check_cve_db (fixture), Check if the CVE database exists and its tables are created",
"mock_vulnerability_scan (fixture), Mock the vulnerability scan inserting custom packages, feeds and changing the host system."
],
"use_cases": "Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from NVD feed.",
"expected_output": [
"r\"Agent '000' is vulnerable to '{cve}'. Condition, '{patch} patch is not installed.'\" (if agent OS is Windows).",
"r\"The '{package}' package .* from agent .* is vulnerable to '{cve}'\" (for no Windows agents).",
"r\"The NVD found a total of '{vulnerabilities_number}' potential vulnerabilities for agent .*\""
],
"tags": [
"nvd",
"cve"
],
"name": "test_vulnerabilities_report",
"test_cases": [
"scan_nvd_configuration-RHEL8",
"scan_nvd_configuration-RHEL7",
"scan_nvd_configuration-RHEL6",
"scan_nvd_configuration-RHEL5",
"scan_nvd_configuration-BIONIC",
"scan_nvd_configuration-XENIAL",
"scan_nvd_configuration-TRUSTY",
"scan_nvd_configuration-BUSTER",
"scan_nvd_configuration-STRETCH"
]
}
]
} test_scan_nvd_feed.yamlchecks:
- There are as many NVD alerts as vulnerable packages.
- There are 0 NVD vulnerability alerts for RedHat provider.
- The alerts are produced by the NVD provider.
component: Server
copyright: 'Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the
terms of GPLv2'
description: These tests will mock RedHat, Canonical, Debian, and Windows systems,
and insert custom vulnerabilities and vulnerable packages to check if Vulnerability
Detector generates the vulnerability alerts from NVD feed.
group_id: 2
id: 16
name: test_scan_nvd_feed.py
platform:
- Linux, RHEL5
- Linux, RHEL6
- Linux, RHEL7
- Linux, RHEL8
- Linux, Amazon Linux 1
- Linux, Amazon Linux 2
- Linux, Debian BUSTER
- Linux, Debian STRETCH
- Linux, Debian WHEEZY
- Linux, Ubuntu BIONIC
- Linux, Ubuntu XENIAL
- Linux, Ubuntu TRUSTY
- Linux, Arch Linux
references:
- https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html
tags:
- linux
- modulesd
- vulnerability_detector
tests:
- description: Check if inserted vulnerable packages are reported by vulnerability
detector using the NVD feed.
expected_output:
- r"Agent '000' is vulnerable to '{cve}'. Condition, '{patch} patch is not installed.'"
(if agent OS is Windows).
- r"The '{package}' package .* from agent .* is vulnerable to '{cve}'" (for no Windows
agents).
- r"The NVD found a total of '{vulnerabilities_number}' potential vulnerabilities
for agent .*"
min_version: 4.1
name: test_vulnerabilities_report
parameters:
- get_configuration (fixture), Get configurations from the module.
- configure_environment (fixture), Configure a custom environment for testing.
- restart_modulesd (fixture), Restart the wazuh-modulesd daemon.
- check_cve_db (fixture), Check if the CVE database exists and its tables are created
- mock_vulnerability_scan (fixture), Mock the vulnerability scan inserting custom
packages, feeds and changing the host system.
tags:
- nvd
- cve
test_cases:
- scan_nvd_configuration-RHEL8
- scan_nvd_configuration-RHEL7
- scan_nvd_configuration-RHEL6
- scan_nvd_configuration-RHEL5
- scan_nvd_configuration-BIONIC
- scan_nvd_configuration-XENIAL
- scan_nvd_configuration-TRUSTY
- scan_nvd_configuration-BUSTER
- scan_nvd_configuration-STRETCH
tier: 0
use_cases: Several vulnerable packages with their respective vulnerabilities are
inserted into a simulated agent to generate the corresponding alerts from NVD
feed.
tiers:
- 0
type: Integration
Show documentation
'''
copyright:
Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the terms of GPLv2
type:
Integration
description:
These tests will mock RedHat systems and insert custom vulnerabilities and vulnerable packages
to check if Vulnerability Detector generates the alerts from the RedHat provider feed.
tiers:
- 0
component:
Server
platform:
- Linux, RHEL5
- Linux, RHEL6
- Linux, RHEL7
- Linux, RHEL8
- Linux, Amazon Linux 1
- Linux, Amazon Linux 2
- Linux, Debian BUSTER
- Linux, Debian STRETCH
- Linux, Debian WHEEZY
- Linux, Ubuntu BIONIC
- Linux, Ubuntu XENIAL
- Linux, Ubuntu TRUSTY
- Linux, Arch Linux
checks:
- A report of the corresponding vulnerabilities is generated in the ossec.log file.
references:
- https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html
tags:
- linux
- vulnerability_detector
'''
import os
import pytest
from wazuh_testing.tools import LOG_FILE_PATH
from wazuh_testing.tools.configuration import load_wazuh_configurations
from wazuh_testing.tools.monitoring import FileMonitor
from wazuh_testing.tools import file
from wazuh_testing import vulnerability_detector as vd
# Marks
pytestmark = pytest.mark.tier(level=0)
# Variables
current_test_path = os.path.dirname(os.path.realpath(__file__))
test_data_path = os.path.join(current_test_path, 'data')
configurations_path = os.path.join(test_data_path, 'wazuh_redhat_inventory.yaml')
redhat_vulnerabilities_data_path = os.path.join(test_data_path, 'redhat_vulnerabilities.json')
wazuh_log_monitor = FileMonitor(LOG_FILE_PATH)
SCAN_TIMEOUT = 40
# Set configuration
parameters = [{'NVD_JSON_PATH': os.path.join(test_data_path, vd.REAL_NVD_FEED)}]
ids = ['redhat_scan_configuration']
# Read JSON data template
redhat_vulnerabilities = file.read_json_file(redhat_vulnerabilities_data_path)
redhat_data_ids = [system['target'] for system in redhat_vulnerabilities]
# Configuration data
configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters)
# Fixtures
@pytest.fixture(scope='module', params=configurations, ids=ids)
def get_configuration(request):
"""Get configurations from the module."""
return request.param
@pytest.fixture(scope='module', params=redhat_vulnerabilities, ids=redhat_data_ids)
@vd.mock_cve_db
def mock_vulnerability_scan(request, mock_agent):
"""
It allows to mock the vulnerability scan inserting custom packages, feeds and changing the host system
"""
# Mock system
vd.modify_system(agent_id=mock_agent, os_name=request.param['os_name'], os_major=request.param['os_major'],
os_minor=request.param['os_minor'], name=vd.MOCKED_AGENT_NAME)
# Add custom vulnerabilities and feeds
for vulnerability in request.param['vulnerabilities']:
vd.insert_package(**vulnerability['package'], agent=mock_agent, source=vulnerability['package']['name'])
vd.insert_vulnerability(**vulnerability['cve'], package=vulnerability['package']['name'],
target=request.param['target'])
def test_redhat_vulnerabilities_report(get_configuration, configure_environment, restart_modulesd, check_cve_db,
mock_vulnerability_scan):
'''
description:
Checks if Vulnerability Detector is able to detect mock vulnerabilities on a Linux/RedHat system.
tier:
0
min_version:
4.1
parameters:
- get_configuration (fixture), Get configurations from the module.
- configure_environment (fixture), Configure a custom environment for testing.
- restart_modulesd (fixture), Restart the wazuh-modulesd daemon.
- check_cve_db (fixture), Check if the CVE database exists and its tables are created
- mock_vulnerability_scan (fixture), Mock the vulnerability scan inserting custom packages, feeds and changing the host system.
use_cases:
Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts.
expected_output:
- "The {feed_source} found a total of '{expected_vulnerabilities_number}' potential vulnerabilities for agent .*"
- "The '{package}' package .* from agent .* is vulnerable to '{cve}'"
tags:
- cve
- rhel
- oval
'''
vulnerabilities_number = len(mock_vulnerability_scan['vulnerabilities'])
# Check that the number of OVAL vulnerabilities is the expected
vd.check_detected_vulnerabilities_number(wazuh_log_monitor=wazuh_log_monitor,
expected_vulnerabilities_number=vulnerabilities_number,
feed_source='OVAL', timeout=vd.VULN_DETECTOR_SCAN_TIMEOUT)
# Check the vulnerabilities of packages inserted
for item in mock_vulnerability_scan['vulnerabilities']:
vd.check_vulnerability_scan_event(wazuh_log_monitor=wazuh_log_monitor, package=item['package']['name'],
cve=item['cve']['cveid'])
vd.check_if_modulesd_is_running() test_redhat_inventory_redhat_feed{
"copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. <[email protected]>.\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2",
"type": "Integration",
"description": "These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed.",
"tiers": [
0
],
"component": "Server",
"platform": [
"Linux, RHEL5",
"Linux, RHEL6",
"Linux, RHEL7",
"Linux, RHEL8",
"Linux, Amazon Linux 1",
"Linux, Amazon Linux 2",
"Linux, Debian BUSTER",
"Linux, Debian STRETCH",
"Linux, Debian WHEEZY",
"Linux, Ubuntu BIONIC",
"Linux, Ubuntu XENIAL",
"Linux, Ubuntu TRUSTY",
"Linux, Arch Linux"
],
"checks": [
"There are as many NVD alerts as vulnerable packages.",
"There are 0 NVD vulnerability alerts for RedHat provider.",
"The alerts are produced by the NVD provider."
],
"references": [
"https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html"
],
"tags": [
"linux",
"modulesd",
"vulnerability_detector"
],
"name": "test_scan_nvd_feed.py",
"id": 16,
"group_id": 2,
"tests": [
{
"description": "Check if inserted vulnerable packages are reported by vulnerability detector using the NVD feed.",
"tier": 0,
"min_version": 4.1,
"parameters": [
"get_configuration (fixture), Get configurations from the module.",
"configure_environment (fixture), Configure a custom environment for testing.",
"restart_modulesd (fixture), Restart the wazuh-modulesd daemon.",
"check_cve_db (fixture), Check if the CVE database exists and its tables are created",
"mock_vulnerability_scan (fixture), Mock the vulnerability scan inserting custom packages, feeds and changing the host system."
],
"use_cases": "Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from NVD feed.",
"expected_output": [
"r\"Agent '000' is vulnerable to '{cve}'. Condition, '{patch} patch is not installed.'\" (if agent OS is Windows).",
"r\"The '{package}' package .* from agent .* is vulnerable to '{cve}'\" (for no Windows agents).",
"r\"The NVD found a total of '{vulnerabilities_number}' potential vulnerabilities for agent .*\""
],
"tags": [
"nvd",
"cve"
],
"name": "test_vulnerabilities_report",
"test_cases": [
"scan_nvd_configuration-RHEL8",
"scan_nvd_configuration-RHEL7",
"scan_nvd_configuration-RHEL6",
"scan_nvd_configuration-RHEL5",
"scan_nvd_configuration-BIONIC",
"scan_nvd_configuration-XENIAL",
"scan_nvd_configuration-TRUSTY",
"scan_nvd_configuration-BUSTER",
"scan_nvd_configuration-STRETCH"
]
}
]
} test_redhat_inventory_redhat_feedchecks:
- There are as many NVD alerts as vulnerable packages.
- There are 0 NVD vulnerability alerts for RedHat provider.
- The alerts are produced by the NVD provider.
component: Server
copyright: 'Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the
terms of GPLv2'
description: These tests will mock RedHat, Canonical, Debian, and Windows systems,
and insert custom vulnerabilities and vulnerable packages to check if Vulnerability
Detector generates the vulnerability alerts from NVD feed.
group_id: 2
id: 16
name: test_scan_nvd_feed.py
platform:
- Linux, RHEL5
- Linux, RHEL6
- Linux, RHEL7
- Linux, RHEL8
- Linux, Amazon Linux 1
- Linux, Amazon Linux 2
- Linux, Debian BUSTER
- Linux, Debian STRETCH
- Linux, Debian WHEEZY
- Linux, Ubuntu BIONIC
- Linux, Ubuntu XENIAL
- Linux, Ubuntu TRUSTY
- Linux, Arch Linux
references:
- https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html
tags:
- linux
- modulesd
- vulnerability_detector
tests:
- description: Check if inserted vulnerable packages are reported by vulnerability
detector using the NVD feed.
expected_output:
- r"Agent '000' is vulnerable to '{cve}'. Condition, '{patch} patch is not installed.'"
(if agent OS is Windows).
- r"The '{package}' package .* from agent .* is vulnerable to '{cve}'" (for no Windows
agents).
- r"The NVD found a total of '{vulnerabilities_number}' potential vulnerabilities
for agent .*"
min_version: 4.1
name: test_vulnerabilities_report
parameters:
- get_configuration (fixture), Get configurations from the module.
- configure_environment (fixture), Configure a custom environment for testing.
- restart_modulesd (fixture), Restart the wazuh-modulesd daemon.
- check_cve_db (fixture), Check if the CVE database exists and its tables are created
- mock_vulnerability_scan (fixture), Mock the vulnerability scan inserting custom
packages, feeds and changing the host system.
tags:
- nvd
- cve
test_cases:
- scan_nvd_configuration-RHEL8
- scan_nvd_configuration-RHEL7
- scan_nvd_configuration-RHEL6
- scan_nvd_configuration-RHEL5
- scan_nvd_configuration-BIONIC
- scan_nvd_configuration-XENIAL
- scan_nvd_configuration-TRUSTY
- scan_nvd_configuration-BUSTER
- scan_nvd_configuration-STRETCH
tier: 0
use_cases: Several vulnerable packages with their respective vulnerabilities are
inserted into a simulated agent to generate the corresponding alerts from NVD
feed.
tiers:
- 0
type: Integration
Show documentation
'''
copyright:
Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the terms of GPLv2
type:
Integration
description:
The tests will modify the value of ignore_time tag in ossec.conf, set different times and check the result in ossec.log.
tiers:
- 0
component:
Server
platform:
- Linux, RHEL5
- Linux, RHEL6
- Linux, RHEL7
- Linux, RHEL8
- Linux, Amazon Linux 1
- Linux, Amazon Linux 2
- Linux, Debian BUSTER
- Linux, Debian STRETCH
- Linux, Debian WHEEZY
- Linux, Ubuntu BIONIC
- Linux, Ubuntu XENIAL
- Linux, Ubuntu TRUSTY
- Linux, Arch Linux
checks:
- Vulnerabilities alerts are not generated before ignore_time time set.
- Vulnerabilities alerts are generated after ignore_time time set.
references:
- https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html
tags:
- linux
- modulesd
- wazuh-db
- vulnerability_detector
'''
import os
from datetime import timedelta
import pytest
import wazuh_testing.vulnerability_detector as vd
from wazuh_testing.fim import check_time_travel
from wazuh_testing.tools import LOG_FILE_PATH
from wazuh_testing.tools.configuration import load_wazuh_configurations
from wazuh_testing.tools.monitoring import FileMonitor
from wazuh_testing.tools.services import control_service
from wazuh_testing.tools.time import time_to_seconds
# Marks
pytestmark = pytest.mark.tier(level=0)
# variables
test_path = os.path.dirname(os.path.realpath(__file__))
test_data_path = os.path.join(test_path, 'data')
nvd_feed_path = os.path.join(os.path.dirname(test_path), 'test_scan_results', 'data', vd.REAL_NVD_FEED)
configurations_path = os.path.join(test_data_path, 'wazuh_ignore_time.yaml')
wazuh_log_monitor = FileMonitor(LOG_FILE_PATH)
callback_string_vulnerability = f"'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'"
parameters = [{'IGNORE_TIME': '3600s', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path},
{'IGNORE_TIME': '60m', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path},
{'IGNORE_TIME': '1h', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path}]
metadata = [{'ignore_time': '3600s', 'timeout': 30, 'jumps': 2, 'interval': '5s'},
{'ignore_time': '60m', 'timeout': 30, 'jumps': 2, 'interval': '5s'},
{'ignore_time': '1h', 'timeout': 30, 'jumps': 2, 'interval': '5s'}]
# Configuration data
configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters, metadata=metadata)
# fixtures
@pytest.fixture(scope='module', params=configurations)
def get_configuration(request):
"""Get configurations from the module."""
return request.param
@pytest.fixture(scope='module')
def prepare_agent(mock_agent):
control_service('stop', daemon='wazuh-db')
vd.clean_vd_tables(mock_agent)
vd.insert_package(agent=mock_agent, vendor="Red Hat, Inc.")
vd.insert_vulnerability()
control_service('start', daemon='wazuh-db')
yield mock_agent
def test_ignore_time(get_configuration, configure_environment, restart_modulesd, prepare_agent,
custom_callback_vulnerability=vd.make_vuln_callback(callback_string_vulnerability)):
'''
description:
Check if an alert is not fired during the ignore time interval.
tier:
0
min_version:
4.1
parameters:
- get_configuration (fixture), Get configurations from the module.
- configure_environment (fixture), Configure a custom environment for testing.
- restart_modulesd (fixture), Restart the wazuh-modulesd daemon.
- prepare_agent (fixture), Creates a mock agent with a vulnerability for testing purposes.
- custom_callback_vulnerability (lambda), Create a callback function from a text pattern.
use_cases:
Different time intervals are used in which alerts are to be ignored.
expected_output:
- "'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'"
tags:
- time_travel
'''
control_service('stop', daemon='wazuh-modulesd')
control_service('stop', daemon='wazuh-db')
vd.update_last_scan(agent=prepare_agent)
control_service('start', daemon='wazuh-db')
control_service('start', daemon='wazuh-modulesd')
ignore_time = get_configuration['metadata']['ignore_time']
jumps = get_configuration['metadata']['jumps']
seconds_to_travel = time_to_seconds(ignore_time) / jumps
# Check for initial alert
wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'],
callback=custom_callback_vulnerability,
error_message='Alert did not appear at the start of the test')
# Check if alert does not appear during ignore time
for _ in range(1, jumps):
check_time_travel(time_travel=True, interval=timedelta(seconds=seconds_to_travel))
with pytest.raises(TimeoutError):
wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'],
callback=custom_callback_vulnerability)
raise AttributeError('Alert appeared before ignore_time was finished')
# Travel to the time set in ignore time
check_time_travel(time_travel=True, interval=timedelta(seconds=seconds_to_travel))
# Check for final alert
wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'],
callback=custom_callback_vulnerability,
error_message='Alert did not appear at the end of the test') test_general_settings_ignore_time.json{
"copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. <[email protected]>.\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2",
"type": "Integration",
"description": "The tests will modify the value of ignore_time tag in ossec.conf, set different times and check the result in ossec.log.",
"tiers": [
0
],
"component": "Server",
"platform": [
"Linux, RHEL5",
"Linux, RHEL6",
"Linux, RHEL7",
"Linux, RHEL8",
"Linux, Amazon Linux 1",
"Linux, Amazon Linux 2",
"Linux, Debian BUSTER",
"Linux, Debian STRETCH",
"Linux, Debian WHEEZY",
"Linux, Ubuntu BIONIC",
"Linux, Ubuntu XENIAL",
"Linux, Ubuntu TRUSTY",
"Linux, Arch Linux"
],
"checks": [
"Vulnerabilities alerts are not generated before ignore_time time set.",
"Vulnerabilities alerts are generated after ignore_time time set."
],
"references": [
"https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html"
],
"tags": [
"linux",
"modulesd",
"wazuh-db",
"vulnerability_detector"
],
"name": "test_general_settings_ignore_time.py",
"id": 5,
"group_id": 2,
"tests": [
{
"description": "Check if an alert is not fired during the ignore time interval.",
"tier": 0,
"min_version": 4.1,
"parameters": [
"get_configuration (fixture), Get configurations from the module.",
"configure_environment (fixture), Configure a custom environment for testing.",
"restart_modulesd (fixture), Restart the wazuh-modulesd daemon.",
"prepare_agent (fixture), Creates a mock agent with a vulnerability for testing purposes.",
"custom_callback_vulnerability (lambda), Create a callback function from a text pattern."
],
"use_cases": "Different time intervals are used in which alerts are to be ignored.",
"expected_output": [
"'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'"
],
"tags": [
"time_travel"
],
"name": "test_ignore_time",
"test_cases": [
"get_configuration0",
"get_configuration1",
"get_configuration2"
]
}
]
} test_general_settings_ignore_time.yamlchecks:
- Vulnerabilities alerts are not generated before ignore_time time set.
- Vulnerabilities alerts are generated after ignore_time time set.
component: Server
copyright: 'Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the
terms of GPLv2'
description: The tests will modify the value of ignore_time tag in ossec.conf, set
different times and check the result in ossec.log.
group_id: 2
id: 5
name: test_general_settings_ignore_time.py
platform:
- Linux, RHEL5
- Linux, RHEL6
- Linux, RHEL7
- Linux, RHEL8
- Linux, Amazon Linux 1
- Linux, Amazon Linux 2
- Linux, Debian BUSTER
- Linux, Debian STRETCH
- Linux, Debian WHEEZY
- Linux, Ubuntu BIONIC
- Linux, Ubuntu XENIAL
- Linux, Ubuntu TRUSTY
- Linux, Arch Linux
references:
- https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html
tags:
- linux
- modulesd
- wazuh-db
- vulnerability_detector
tests:
- description: Check if an alert is not fired during the ignore time interval.
expected_output:
- '''{vd.DEFAULT_PACKAGE_NAME}''.+is vulnerable to ''{vd.DEFAULT_VULNERABILITY_ID}'''
min_version: 4.1
name: test_ignore_time
parameters:
- get_configuration (fixture), Get configurations from the module.
- configure_environment (fixture), Configure a custom environment for testing.
- restart_modulesd (fixture), Restart the wazuh-modulesd daemon.
- prepare_agent (fixture), Creates a mock agent with a vulnerability for testing
purposes.
- custom_callback_vulnerability (lambda), Create a callback function from a text
pattern.
tags:
- time_travel
test_cases:
- get_configuration0
- get_configuration1
- get_configuration2
tier: 0
use_cases: Different time intervals are used in which alerts are to be ignored.
tiers:
- 0
type: Integration
Show documentation
'''
copyright:
Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the terms of GPLv2
type:
Integration
description:
These tests will check if, during enrollment, the agent re-establishes communication with the manager
under different situations that interrupt it.
The objective is to check that, with different states in the clients.key file, the agent
successfully enrolls after losing connection with remoted.
tiers:
- 0
component:
Agent
platform:
- Linux, RHEL5
- Linux, RHEL6
- Linux, RHEL7
- Linux, RHEL8
- Linux, Amazon Linux 1
- Linux, Amazon Linux 2
- Linux, Debian BUSTER
- Linux, Debian STRETCH
- Linux, Debian WHEEZY
- Linux, Ubuntu BIONIC
- Linux, Ubuntu XENIAL
- Linux, Ubuntu TRUSTY
- Linux, Arch Linux
- Windows, 7
- Windows, 8
- Windows, 10
- Windows, Server 2003
- Windows, Server 2012
- Windows, Server 2016
checks:
- The agent has keys and loses communication with remoted.
- The agent does not have keys and loses communication with remoted when enrollment has been started.
- The agent does not have `client.keys` file and loses communication with remoted when enrollment has been started.
- The agent has keys, loses communication with remoted, and performs multiple enrollment requests.
- The agent does not have keys, remoted is unavailable for several seconds and multiple connection requests
are performed before a new enrollment is made.
references:
- https://documentation.wazuh.com/current/user-manual/reference/daemons/ossec-agentd.html
tags:
- linux
- agentd
'''
import os
import platform
from time import sleep
import pytest
from datetime import datetime, timedelta
from wazuh_testing.agent import CLIENT_KEYS_PATH, SERVER_CERT_PATH, SERVER_KEY_PATH
from wazuh_testing.tools import WAZUH_PATH, LOG_FILE_PATH
from wazuh_testing.tools.authd_sim import AuthdSimulator
from wazuh_testing.tools.configuration import load_wazuh_configurations
from wazuh_testing.tools.file import truncate_file
from wazuh_testing.tools.monitoring import QueueMonitor, FileMonitor
from wazuh_testing.tools.remoted_sim import RemotedSimulator
from wazuh_testing.tools.services import control_service
# Marks
pytestmark = [pytest.mark.linux, pytest.mark.win32, pytest.mark.tier(level=0), pytest.mark.agent]
test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data')
configurations_path = os.path.join(test_data_path, 'wazuh_conf.yaml')
params = [
{
'SERVER_ADDRESS': '127.0.0.1',
'REMOTED_PORT': 1514,
'PROTOCOL': 'tcp',
},
{
'SERVER_ADDRESS': '127.0.0.1',
'REMOTED_PORT': 1514,
'PROTOCOL': 'udp',
}
]
metadata = [
{'PROTOCOL': 'tcp'},
{'PROTOCOL': 'udp'}
]
config_ids = ['tcp', 'udp']
configurations = load_wazuh_configurations(configurations_path, __name__, params=params, metadata=metadata)
log_monitor_paths = []
receiver_sockets_params = []
monitored_sockets_params = []
receiver_sockets, monitored_sockets, log_monitors = None, None, None # Set in the fixtures
authd_server = AuthdSimulator(params[0]['SERVER_ADDRESS'], key_path=SERVER_KEY_PATH, cert_path=SERVER_CERT_PATH)
remoted_server = None
def teardown():
global remoted_server
if remoted_server is not None:
remoted_server.stop()
def set_debug_mode():
"""Set debug2 for agentd in local internal options file."""
if platform.system() == 'win32' or platform.system() == 'Windows':
local_int_conf_path = os.path.join(WAZUH_PATH, 'local_internal_options.conf')
debug_line = 'windows.debug=2\nagent.recv_timeout=5\n'
else:
local_int_conf_path = os.path.join(WAZUH_PATH, 'etc', 'local_internal_options.conf')
debug_line = 'agent.debug=2\nagent.recv_timeout=5\n'
with open(local_int_conf_path, 'r') as local_file_read:
lines = local_file_read.readlines()
for line in lines:
if line == debug_line:
return
with open(local_int_conf_path, 'a') as local_file_write:
local_file_write.write('\n' + debug_line)
set_debug_mode()
# fixtures
@pytest.fixture(scope="module", params=configurations, ids=config_ids)
def get_configuration(request):
"""Get configurations from the module"""
return request.param
@pytest.fixture(scope="function")
def configure_authd_server(request):
"""Initialize a simulated authd connection."""
authd_server.start()
global monitored_sockets
monitored_sockets = QueueMonitor(authd_server.queue)
authd_server.clear()
yield
authd_server.shutdown()
def start_authd():
"""Enable authd to accept connections and perform enrollments."""
authd_server.set_mode("ACCEPT")
authd_server.clear()
def stop_authd():
"""Disable authd to accept connections and perform enrollments."""
authd_server.set_mode("REJECT")
def set_authd_id():
"""Set agent id to 101 in the authd simulated connection."""
authd_server.agent_id = 101
def clean_keys():
"""Clear the agent's client.keys file."""
truncate_file(CLIENT_KEYS_PATH)
sleep(1)
def delete_keys():
"""Remove the agent's client.keys file."""
os.remove(CLIENT_KEYS_PATH)
sleep(1)
def set_keys():
"""Write to client.keys file the agent's enrollment details."""
with open(CLIENT_KEYS_PATH, 'w+') as f:
f.write("100 ubuntu-agent any TopSecret")
sleep(1)
def wait_notify(line):
"""Callback function to wait for agent checkins to the manager."""
if 'Sending keep alive:' in line:
return line
return None
def wait_enrollment(line):
"""Callback function to wait for enrollment."""
if 'Valid key received' in line:
return line
return None
def wait_enrollment_try(line):
"""Callback function to wait for enrollment attempt."""
if 'Requesting a key' in line:
return line
return None
def search_error_messages():
"""Retrieve the line of the log file where first error is found.
Returns:
str: String where the error is found or None if errors are not found.
"""
with open(LOG_FILE_PATH, 'r') as log_file:
lines = log_file.readlines()
for line in lines:
if f"ERROR:" in line:
return line
return None
# Tests
"""
This test covers the scenario of Agent starting with keys,
when misses communication with Remoted and a new enrollment is sent to Authd.
"""
def test_agentd_reconection_enrollment_with_keys(configure_authd_server, configure_environment, get_configuration):
'''
description:
Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd.
In this case, the agent starts with keys.
tier:
0
min_version:
4.1
parameters:
- configure_authd_server (fixture), Initialize a simulated authd connection.
- configure_environment (fixture), Configure a custom environment for testing.
- get_configuration (fixture), Get configurations from the module.
use_cases:
Requests are made using TCP and UDP protocols together with a client.keys file
that includes the previously generated agent keys.
expected_output:
- "Sending keep alive:"
- "Valid key received"
- "Sending keep alive:"
tags:
- remoted_simulator
'''
global remoted_server
remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK',
client_keys=CLIENT_KEYS_PATH)
# Stop target Agent
control_service('stop')
# Prepare test
start_authd()
set_authd_id()
set_keys()
# Clean logs
truncate_file(LOG_FILE_PATH)
# Start target Agent
control_service('start')
# Start hearing logs
truncate_file(LOG_FILE_PATH)
log_monitor = FileMonitor(LOG_FILE_PATH)
# hearing on enrollment server
authd_server.clear()
# Wait until Agent is notifying Manager
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
# Start rejecting Agent
remoted_server.set_mode('REJECT')
# hearing on enrollment server
authd_server.clear()
# Wait until Agent asks a new key to enrollment
log_monitor.start(timeout=180, callback=wait_enrollment,
error_message="Agent never enrolled after rejecting connection!")
# Start responding to Agent
remoted_server.set_mode('CONTROLLED_ACK')
# Wait until Agent is notifying Manager
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message"
"""
This test covers the scenario of Agent starting without client.keys file
and an enrollment is sent to Authd to start communicating with Remoted
"""
def test_agentd_reconection_enrollment_no_keys_file(configure_authd_server, configure_environment, get_configuration):
'''
description:
Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd.
In this case, the agent starts without the client.keys file.
tier:
0
min_version:
4.1
parameters:
- configure_authd_server (fixture), Initialize a simulated authd connection.
- configure_environment (fixture), Configure a custom environment for testing.
- get_configuration (fixture), Get configurations from the module.
use_cases:
Requests are made using TCP and UDP protocols.
expected_output:
- "Valid key received"
- "Sending keep alive:"
- "Valid key received"
- "Sending keep alive:"
tags:
- remoted_simulator
'''
global remoted_server
remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK',
client_keys=CLIENT_KEYS_PATH)
# Stop target Agent
control_service('stop')
# Clean logs
truncate_file(LOG_FILE_PATH)
# Prepare test
start_authd()
set_authd_id()
delete_keys()
# Start target Agent
control_service('start')
# start hearing logs
log_monitor = FileMonitor(LOG_FILE_PATH)
# hearing on enrollment server
authd_server.clear()
# Wait until Agent asks keys for the first time
log_monitor.start(timeout=50, callback=wait_enrollment,
error_message="Agent never enrolled for the first time.")
# Wait until Agent is notifing Manager
log_monitor.start(timeout=50, callback=wait_notify, error_message="Notify message from agent was never sent!")
# Start rejecting Agent
remoted_server.set_mode('REJECT')
# hearing on enrollment server
authd_server.clear()
# Wait until Agent asks a new key to enrollment
log_monitor.start(timeout=180, callback=wait_enrollment,
error_message="Agent never enrolled after rejecting connection!")
# Start responding to Agent
remoted_server.set_mode('CONTROLLED_ACK')
# Wait until Agent is notifing Manager
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message"
"""
This test covers the scenario of Agent starting without keys in client.keys file
and an enrollment is sent to Authd to start communicating with Remoted
"""
def test_agentd_reconection_enrollment_no_keys(configure_authd_server, configure_environment, get_configuration):
'''
description:
Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd.
In this case, the agent has its client.keys file empty.
tier:
0
min_version:
4.1
parameters:
- configure_authd_server (fixture), Initialize a simulated authd connection.
- configure_environment (fixture), Configure a custom environment for testing.
- get_configuration (fixture), Get configurations from the module.
use_cases:
Requests are made using TCP and UDP protocols together with a empty client.keys file.
expected_output:
- "Valid key received"
- "Sending keep alive:"
- "Valid key received"
- "Sending keep alive:"
tags:
- remoted_simulator
'''
global remoted_server
remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK',
client_keys=CLIENT_KEYS_PATH)
# Stop target Agent
control_service('stop')
# Clean logs
truncate_file(LOG_FILE_PATH)
# Prepare test
start_authd()
set_authd_id()
clean_keys()
# Start target Agent
control_service('start')
# start hearing logs
log_monitor = FileMonitor(LOG_FILE_PATH)
# hearing on enrollment server
authd_server.clear()
# Wait until Agent asks keys for the first time
log_monitor.start(timeout=120, callback=wait_enrollment,
error_message="Agent never enrolled for the first time rejecting connection!")
# Wait until Agent is notifying Manager
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message"
# Start rejecting Agent
remoted_server.set_mode('REJECT')
# hearing on enrollment server
authd_server.clear()
# Wait until Agent asks a new key to enrollment
log_monitor.start(timeout=180, callback=wait_enrollment,
error_message="Agent never enrolled after rejecting connection!")
# Start responding to Agent
remoted_server.set_mode('CONTROLLED_ACK')
# Wait until Agent is notifying Manager
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message"
"""
This test covers and check the scenario of Agent starting without keys
and multiple retries are required until the new key is obtained to start communicating with Remoted
"""
def test_agentd_initial_enrollment_retries(configure_authd_server, configure_environment, get_configuration):
'''
description:
Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd.
For this, the agent starts without keys and perform multiple enrollment requests
to authd before getting the new key to communicate with remoted.
tier:
0
min_version:
4.1
parameters:
- configure_authd_server (fixture), Initialize a simulated authd connection.
- configure_environment (fixture), Configure a custom environment for testing.
- get_configuration (fixture), Get configurations from the module.
use_cases:
Requests are made using TCP and UDP protocols together with a empty client.keys file.
expected_output:
- "Requesting a key" (four times)
- "Valid key received"
- "Sending keep alive:"
tags:
- remoted_simulator
'''
global remoted_server
remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK',
client_keys=CLIENT_KEYS_PATH)
# Stop target Agent
control_service('stop')
# Clean logs
truncate_file(LOG_FILE_PATH)
# Preapre test
stop_authd()
set_authd_id()
clean_keys()
# Start whole Agent service to check other daemons status after initialization
control_service('start')
# Start hearing logs
log_monitor = FileMonitor(LOG_FILE_PATH)
start_time = datetime.now()
# Check for unsuccessful enrollment retries in Agentd initialization
retries = 0
while retries < 4:
retries += 1
log_monitor.start(timeout=retries * 5 + 20, callback=wait_enrollment_try,
error_message="Enrollment retry was not sent!")
stop_time = datetime.now()
expected_time = start_time + timedelta(seconds=retries * 5 - 2)
# Check if delay was applied
assert stop_time > expected_time, "Retries too quick"
# Enable authd
authd_server.clear()
authd_server.set_mode("ACCEPT")
# Wait successfully enrollment
# Wait succesfull enrollment
log_monitor.start(timeout=70, callback=wait_enrollment, error_message="No succesful enrollment after reties!")
# Wait until Agent is notifying Manager
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
# Check if no Wazuh module stopped due to Agentd Initialization
with open(LOG_FILE_PATH) as log_file:
log_lines = log_file.read().splitlines()
for line in log_lines:
if "Unable to access queue:" in line:
raise AssertionError("A Wazuh module stopped because of Agentd initialization!")
"""
This test covers and check the scenario of Agent starting with keys but Remoted is not reachable during some seconds
and multiple connection retries are required prior to requesting a new enrollment
"""
def test_agentd_connection_retries_pre_enrollment(configure_authd_server, configure_environment, get_configuration):
'''
description:
Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd.
For this, the agent starts with keys but Remoted is not available for several seconds,
then the agent performs multiple connection retries before requesting a new enrollment.
tier:
0
min_version:
4.1
parameters:
- configure_authd_server (fixture), Initialize a simulated authd connection.
- configure_environment (fixture), Configure a custom environment for testing.
- get_configuration (fixture), Get configurations from the module.
use_cases:
Requests are made using TCP and UDP protocols together with a client.keys file
that includes the previously generated agent keys.
expected_output:
- "Sending keep alive:"
tags:
- remoted_simulator
'''
global remoted_server
REMOTED_KEYS_SYNC_TIME = 10
# Start Remoted mock
remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], client_keys=CLIENT_KEYS_PATH)
# Stop target Agent
control_service('stop')
# Clean logs
truncate_file(LOG_FILE_PATH)
# Prepare test
stop_authd()
set_keys()
# Start hearing logs
log_monitor = FileMonitor(LOG_FILE_PATH)
# Start whole Agent service to check other daemons status after initialization
control_service('start')
# Simulate time of Remoted to synchronize keys by waiting previous to start responding
remoted_server.set_mode('CONTROLLED_ACK')
sleep(REMOTED_KEYS_SYNC_TIME)
# Check Agentd is finally communicating
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
Show documentation
'''
copyright:
Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the terms of GPLv2
type:
Integration
description:
Check that manager-agent communication through remoted socket works as expected.
tiers:
- 0
component:
Server
platform:
- Linux, RHEL5
- Linux, RHEL6
- Linux, RHEL7
- Linux, RHEL8
- Linux, Amazon Linux 1
- Linux, Amazon Linux 2
- Linux, Debian BUSTER
- Linux, Debian STRETCH
- Linux, Debian WHEEZY
- Linux, Ubuntu BIONIC
- Linux, Ubuntu XENIAL
- Linux, Ubuntu TRUSTY
- Linux, Arch Linux
checks:
- The getconfig request.
- The getstate request.
- The getconfig request for a disconnected agent.
references:
- https://documentation.wazuh.com/current/user-manual/reference/daemons/ossec-remoted.html
tags:
- linux
- remoted
'''
import os
import time
import pytest
import wazuh_testing.tools.agent_simulator as ag
from wazuh_testing.tools.configuration import load_wazuh_configurations
from wazuh_testing.tools.sockets import send_request
# Marks
pytestmark = pytest.mark.tier(level=0)
# Configuration
test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data')
configurations_path = os.path.join(test_data_path, 'wazuh_request_agent_info.yaml')
parameters = [
{'PROTOCOL': 'udp,tcp'},
{'PROTOCOL': 'tcp'},
{'PROTOCOL': 'udp'},
]
metadata = [
{'PROTOCOL': 'udp,tcp'},
{'PROTOCOL': 'tcp'},
{'PROTOCOL': 'udp'},
]
# test cases
test_case = {
'disconnected': ('agent getconfig disconnected',
'Cannot send request'),
'get_config': ('agent getconfig client',
'{"client":{"config-profile":"centos8","notify_time":10,"time-reconnect":60}}'),
'get_state': ('logcollector getstate',
'{"error":0,"data":{"global":{"start":"2021-02-26, 06:41:26","end":"2021-02-26 08:49:19"}}}')
}
configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters, metadata=metadata)
config_ids = [x['PROTOCOL'] for x in parameters]
# Utils
manager_address = "localhost"
# fixtures
@pytest.fixture(scope="module", params=configurations, ids=config_ids)
def get_configuration(request):
"""Get configurations from the module."""
return request.param
@pytest.mark.parametrize("command_request,expected_answer", test_case.values(), ids=list(test_case.keys()))
def test_request(get_configuration, configure_environment, remove_shared_files,
restart_remoted, command_request, expected_answer):
'''
description:
Writes (config/state) requests in $DIR/queue/ossec/request and check if remoted forwards it to the agent,
collects the response, and writes it in the socket or returns an error message if the queried
agent is disconnected.
tier:
0
min_version:
4.2
parameters:
- get_configuration (fixture), Get configurations from the module.
- configure_environment (fixture), Configure a custom environment for testing.
- remove_shared_files (fixture), Temporary removes txt files from default agent group shared files.
- restart_remoted (fixture), Restart the wazuh-remoted daemon.
- command_request (String), Use case being evaluated.
- expected_answer (String), The expected response from the agent to the manager.
use_cases:
Several requests that are made by the manager to the agent.
expected_output:
- "Cannot send request"
- "{"client":{"config-profile":"centos8","notify_time":10,"time-reconnect":60}}"
- "{"error":0,"data":{"global":{"start":"2021-02-26, 06:41:26","end":"2021-02-26 08:49:19"}}}"
tags:
- agent_simulator
'''
cfg = get_configuration['metadata']
protocols = cfg['PROTOCOL'].split(',')
agents = [ag.Agent(manager_address, "aes", os="debian8", version="4.2.0") for _ in range(len(protocols))]
for agent, protocol in zip(agents, protocols):
if "disconnected" not in command_request:
sender, injector = ag.connect(agent, manager_address, protocol)
msg_request = f'{agent.id} {command_request}'
response = send_request(msg_request)
assert expected_answer in response, "Remoted unexpected answer"
if "disconnected" not in command_request:
injector.stop_receive() test_request_agent_info.json{
"copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. <[email protected]>.\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2",
"type": "Integration",
"description": "Check that manager-agent communication through remoted socket works as expected.",
"tiers": [
0
],
"component": "Server",
"platform": [
"Linux, RHEL5",
"Linux, RHEL6",
"Linux, RHEL7",
"Linux, RHEL8",
"Linux, Amazon Linux 1",
"Linux, Amazon Linux 2",
"Linux, Debian BUSTER",
"Linux, Debian STRETCH",
"Linux, Debian WHEEZY",
"Linux, Ubuntu BIONIC",
"Linux, Ubuntu XENIAL",
"Linux, Ubuntu TRUSTY",
"Linux, Arch Linux"
],
"checks": [
"The getconfig request.",
"The getstate request.",
"The getconfig request for a disconnected agent."
],
"references": [
"https://documentation.wazuh.com/current/user-manual/reference/daemons/ossec-remoted.html"
],
"tags": [
"linux",
"remoted"
],
"name": "test_request_agent_info.py",
"id": 67,
"group_id": 47,
"tests": [
{
"description": "Writes (config/state) requests in $DIR/queue/ossec/request and check if remoted forwards it to the agent, collects the response, and writes it in the socket or returns an error message if the queried agent is disconnected.",
"tier": 0,
"min_version": 4.2,
"parameters": [
"get_configuration (fixture), Get configurations from the module.",
"configure_environment (fixture), Configure a custom environment for testing.",
"remove_shared_files (fixture), Temporary removes txt files from default agent group shared files.",
"restart_remoted (fixture), Restart the wazuh-remoted daemon.",
"command_request (String), Use case being evaluated.",
"expected_answer (String), The expected response from the agent to the manager."
],
"use_cases": "Several requests that are made by the manager to the agent.",
"expected_output": [
"Cannot send request",
"r\"{\"client\":{\"config-profile\":\"centos8\",\"notify_time\":10,\"time-reconnect\":60}}\"",
"r\"{\"error\":0,\"data\":{\"global\":{\"start\":\"2021-02-26, 06:41:26\",\"end\":\"2021-02-26 08:49:19\"}}}\""
],
"tags": [
"agent_simulator"
],
"name": "test_request",
"test_cases": [
"udp,tcp-disconnected",
"udp,tcp-get_config",
"udp,tcp-get_state",
"tcp-disconnected",
"tcp-get_config",
"tcp-get_state",
"udp-disconnected",
"udp-get_config",
"udp-get_state"
]
}
]
} test_request_agent_info.yamlchecks:
- The getconfig request.
- The getstate request.
- The getconfig request for a disconnected agent.
component: Server
copyright: 'Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the
terms of GPLv2'
description: Check that manager-agent communication through remoted socket works as
expected.
group_id: 47
id: 67
name: test_request_agent_info.py
platform:
- Linux, RHEL5
- Linux, RHEL6
- Linux, RHEL7
- Linux, RHEL8
- Linux, Amazon Linux 1
- Linux, Amazon Linux 2
- Linux, Debian BUSTER
- Linux, Debian STRETCH
- Linux, Debian WHEEZY
- Linux, Ubuntu BIONIC
- Linux, Ubuntu XENIAL
- Linux, Ubuntu TRUSTY
- Linux, Arch Linux
references:
- https://documentation.wazuh.com/current/user-manual/reference/daemons/ossec-remoted.html
tags:
- linux
- remoted
tests:
- description: Writes (config/state) requests in $DIR/queue/ossec/request and check
if remoted forwards it to the agent, collects the response, and writes it in the
socket or returns an error message if the queried agent is disconnected.
expected_output:
- Cannot send request
- r"{"client":{"config-profile":"centos8","notify_time":10,"time-reconnect":60}}"
- r"{"error":0,"data":{"global":{"start":"2021-02-26, 06:41:26","end":"2021-02-26
08:49:19"}}}"
min_version: 4.2
name: test_request
parameters:
- get_configuration (fixture), Get configurations from the module.
- configure_environment (fixture), Configure a custom environment for testing.
- remove_shared_files (fixture), Temporary removes txt files from default agent
group shared files.
- restart_remoted (fixture), Restart the wazuh-remoted daemon.
- command_request (String), Use case being evaluated.
- expected_answer (String), The expected response from the agent to the manager.
tags:
- agent_simulator
test_cases:
- udp,tcp-disconnected
- udp,tcp-get_config
- udp,tcp-get_state
- tcp-disconnected
- tcp-get_config
- tcp-get_state
- udp-disconnected
- udp-get_config
- udp-get_state
tier: 0
use_cases: Several requests that are made by the manager to the agent.
tiers:
- 0
type: Integration
|
2021-08-26QA Docs schema 2.0 proposal 3Module block
Test block
test_vulnerability_detector/test_scan_results/test_scan_nvd_feed.py'''
copyright:
Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the terms of GPLv2
type:
Integration
description: These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities and
vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed.
tiers:
- 0
component:
- Manager
path:
tests/integration/test_vulnerability_detector/test_scan_results/
daemons:
- wazuh-modulesd
os_support:
Linux:
RedHat:
- RHEL5
- RHEL6
- RHEL7
- RHEL8
Ubuntu:
- BIONIC
- XENIAL
- TRUSTY
Debian:
- JESSIE
- BUSTER
- STRETCH
- WHEEZY
coverage:
100
tags:
- NVD
'''
import os
from datetime import timedelta
from shutil import copy
import pytest
import wazuh_testing.vulnerability_detector as vd
from wazuh_testing.fim import check_time_travel
from wazuh_testing.tools import LOG_FILE_PATH
from wazuh_testing.tools import file
from wazuh_testing.tools.configuration import load_wazuh_configurations
from wazuh_testing.tools.monitoring import FileMonitor
# Marks
pytestmark = pytest.mark.tier(level=0)
# Variables
current_test_path = os.path.dirname(os.path.realpath(__file__))
test_data_path = os.path.join(current_test_path, 'data')
configurations_path = os.path.join(test_data_path, 'wazuh_nvd_configuration.yaml')
vulnerabilities_data_path = os.path.join(test_data_path, vd.VULNERABILITIES)
custom_cpe_helper_data_path = os.path.join(test_data_path, vd.CUSTOM_CPE_HELPER)
custom_msu_data_path = os.path.join(test_data_path, vd.CUSTOM_MSU)
wazuh_log_monitor = FileMonitor(LOG_FILE_PATH)
SCAN_TIMEOUT = 40
copy(custom_cpe_helper_data_path, vd.CPE_HELPER_PATH)
# Set configuration
parameters = [{'NVD_JSON_PATH': os.path.join(test_data_path, vd.CUSTOM_NVD_FEED),
'MSU_JSON_PATH': custom_msu_data_path}]
ids = ['scan_nvd_configuration']
# Read JSON data template
nvd_vulnerabilities = file.read_json_file(vulnerabilities_data_path)
system_data = [
{"target": "WINDOWS10", "os_name": "Microsoft Windows Server 2016 Datacenter Evaluation",
"os_major": "10", "os_minor": "0", "name": "windows", "format": "win",
"vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "windows", "version": "Wazuh v4.1"},
{"target": "RHEL8", "os_name": "CentOS Linux", "os_major": "8", "os_minor": "1", "name": "centos8",
"format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "centos", "version": "Wazuh v4.1"},
{"target": "RHEL7", "os_name": "CentOS Linux", "os_major": "7", "os_minor": "8", "name": "centos7",
"format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "centos", "version": "Wazuh v4.1"},
{"target": "RHEL6", "os_name": "CentOS Linux", "os_major": "6", "os_minor": "10", "name": "centos6",
"format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "centos", "version": "Wazuh v4.1"},
{"target": "RHEL5", "os_name": "CentOS Linux", "os_major": "5", "os_minor": "11", "name": "centos5",
"format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "centos", "version": "Wazuh v4.1"},
{"target": "BIONIC", "os_name": "Ubuntu", "os_major": "18", "os_minor": "04", "name": "Ubuntu-bionic",
"format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "ubuntu", "version": "Wazuh v4.1"},
{"target": "XENIAL", "os_name": "Ubuntu", "os_major": "16", "os_minor": "04", "name": "Ubuntu-xenial",
"format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "ubuntu", "version": "Wazuh v4.1"},
{"target": "TRUSTY", "os_name": "Ubuntu", "os_major": "14", "os_minor": "04", "name": "Ubuntu-trusty",
"format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "ubuntu", "version": "Wazuh v4.1"},
{"target": "BUSTER", "os_name": "Debian GNU/Linux", "os_major": "10", "os_minor": "", "name": "debian10",
"format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "debian", "version": "Wazuh v4.1"},
{"target": "STRETCH", "os_name": "Debian GNU/Linux", "os_major": "9", "os_minor": "", "name": "debian9",
"format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "debian", "version": "Wazuh v4.1"},
{"target": "MAC", "os_name": "Mac OS X", "os_major": "10", "os_minor": "15", "name": "macos-catalina",
"format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "darwin", "version": "Wazuh v4.1"},
{"target": "MAC", "os_name": "Mac OS X", "os_major": "10", "os_minor": "15", "name": "macos-catalina",
"format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "darwin", "version": "Wazuh v4.0"},
{"target": "MAC", "os_name": "Mac OS X Server", "os_major": "5", "os_minor": "10", "name": "macos-server",
"format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "darwin", "version": "Wazuh v4.1"}
]
system_data_ids = [system['target'] for system in system_data]
# Configuration data
configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters)
# Fixtures
@pytest.fixture(scope='module', params=configurations, ids=ids)
def get_configuration(request):
"""
description:
Get configurations from the module.
parameters:
- request
"""
return request.param
@pytest.fixture(scope='module', params=system_data, ids=system_data_ids)
@vd.mock_cve_db
def mock_vulnerability_scan(request, mock_agent):
"""
description:
Mocks the vulnerability scan inserting custom hotfixes, feeds and changing the host system
parameters:
- request:
type: dictionary
brief: containing the data to mock the system and the agent
- mock_agent:
type: callable
brief: fixture used to mock the agent
"""
# Mock system
vd.modify_system(agent_id=mock_agent, os_name=request.param['os_name'], os_major=request.param['os_major'],
os_minor=request.param['os_minor'], name=vd.MOCKED_AGENT_NAME,
os_platform=request.param['os_platform'], version=request.param['version'])
# Insert a vulnerability in table VULNERABILITIES
vd.insert_vulnerability(cveid='CWE-000', operation='less than', operation_value='1.0.0',
package='test', target=request.param['target'])
# Add custom vulnerabilities and feeds
for vulnerability in nvd_vulnerabilities['vulnerabilities_nvd']:
vd.insert_package(**vulnerability['package'], source=vulnerability['package']['name'],
format=request.param['format'], agent=mock_agent)
def test_vulnerabilities_report(get_configuration, configure_environment, restart_modulesd, check_cve_db,
mock_vulnerability_scan):
"""
description:
Check if inserted vulnerable packages are reported by vulnerability detector
wazuh_min_version:
4.1.0
parameters:
- configure_environment:
type: fixture
brief: Configure a custom environment for testing.
- get_configuration:
type: fixture
brief: Get configurations from the module.
- mock_vulnerability_scan:
type: callable
brief: Decorator used in any function that needs to mock cve.db
- restart_modulesd:
type: callable
brief: Restart modulesd daemon
- check_cve_db:
type: fixture
brief: Check if the CVE database exists and its tables are created
assertions:
- A report of the corresponding vulnerabilities is generated in the `ossec.log`.
test_input:
Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from NVD feed.
logging:
- ossec.log:
- r"Agent .* has an unsupported Wazuh version, {version} for agent .*"
- r"The {package} package .* from agent .* is vulnerable to {cve}"
- r"The {feed_source} found a total of {expected_vulnerabilities_number} potential vulnerabilities for agent .*"
"""
vulnerabilities_number = mock_vulnerability_scan["vulnerabilities_number"]
if mock_vulnerability_scan['format'] == 'pkg' and mock_vulnerability_scan['version'] == 'Wazuh v4.0':
version = mock_vulnerability_scan['version']
wazuh_log_monitor.start(
timeout=SCAN_TIMEOUT,
update_position=False,
callback=vd.make_vuln_callback(fr"Agent .* has an unsupported Wazuh version: '{version}'"),
error_message="The expected event 'Agent .* has an unsupported Wazuh version' not found"
)
return
# Check the vulnerabilities of inserted packages
try:
for item in nvd_vulnerabilities['vulnerabilities_nvd']:
vd.check_vulnerability_scan_event(wazuh_log_monitor, item['package']['name'], item['cve']['cveid'])
except TimeoutError:
check_time_travel(time_travel=True, interval=timedelta(seconds=300))
for item in nvd_vulnerabilities['vulnerabilities_nvd']:
vd.check_vulnerability_scan_event(wazuh_log_monitor, item['package']['name'], item['cve']['cveid'])
# Check that the number of NVD vulnerabilities is the expected
if mock_vulnerability_scan["format"] != "win":
vd.check_detected_vulnerabilities_number(wazuh_log_monitor=wazuh_log_monitor,
expected_vulnerabilities_number=vulnerabilities_number,
feed_source='NVD', timeout=vd.VULN_DETECTOR_SCAN_TIMEOUT)
vd.check_if_modulesd_is_running() test_scan_nvd_feed.json{
"copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. <[email protected]>.\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2",
"type": "Integration",
"description": "These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed.",
"tiers": [
0
],
"component": [
"Manager"
],
"path": "tests/integration/test_vulnerability_detector/test_scan_results/",
"daemons": [
"wazuh-modulesd"
],
"os_support": [
"Linux, RHEL5",
"Linux, RHEL6",
"Linux, RHEL7",
"Linux, RHEL8",
"Linux, Amazon Linux 1",
"Linux, Amazon Linux 2",
"Linux, Debian BUSTER",
"Linux, Debian STRETCH",
"Linux, Debian WHEEZY",
"Linux, Ubuntu BIONIC",
"Linux, Ubuntu XENIAL",
"Linux, Ubuntu TRUSTY",
"Linux, Arch Linux"
],
"coverage": 100,
"tags": [
"NVD"
],
"name": "test_scan_nvd_feed.py",
"id": 16,
"group_id": 2,
"tests": [
{
"description": "Check if inserted vulnerable packages are reported by vulnerability detector",
"wazuh_min_version": "4.1.0",
"parameters": [
{
"configure_environment": {
"type": "fixture",
"brief": "Configure a custom environment for testing."
}
},
{
"get_configuration": {
"type": "fixture",
"brief": "Get configurations from the module."
}
},
{
"mock_vulnerability_scan": {
"type": "callable",
"brief": "Decorator used in any function that needs to mock cve.db"
}
},
{
"restart_modulesd": {
"type": "callable",
"brief": "Restart modulesd daemon"
}
},
{
"check_cve_db": {
"type": "fixture",
"brief": "Check if the CVE database exists and its tables are created"
}
}
],
"assertions": [
"A report of the corresponding vulnerabilities is generated in the `ossec.log`."
],
"test_input": "Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from NVD feed.",
"logging": [
{
"ossec.log": [
"r\"Agent .* has an unsupported Wazuh version, {version} for agent .*\"",
"r\"The {package} package .* from agent .* is vulnerable to {cve}\"",
"r\"The {feed_source} found a total of {expected_vulnerabilities_number} potential vulnerabilities for agent .*\""
]
}
],
"name": "test_vulnerabilities_report",
"test_cases": [
"scan_nvd_configuration-RHEL8",
"scan_nvd_configuration-RHEL7",
"scan_nvd_configuration-RHEL6",
"scan_nvd_configuration-RHEL5",
"scan_nvd_configuration-BIONIC",
"scan_nvd_configuration-XENIAL",
"scan_nvd_configuration-TRUSTY",
"scan_nvd_configuration-BUSTER",
"scan_nvd_configuration-STRETCH"
]
}
]
} test_scan_nvd_feed.yamlcomponent:
- Manager
copyright: 'Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the
terms of GPLv2'
coverage: 100
daemons:
- wazuh-modulesd
description: These tests will mock RedHat, Canonical, Debian, and Windows systems,
and insert custom vulnerabilities and vulnerable packages to check if Vulnerability
Detector generates the vulnerability alerts from NVD feed.
group_id: 2
id: 16
name: test_scan_nvd_feed.py
os_support:
- Linux, RHEL5
- Linux, RHEL6
- Linux, RHEL7
- Linux, RHEL8
- Linux, Amazon Linux 1
- Linux, Amazon Linux 2
- Linux, Debian BUSTER
- Linux, Debian STRETCH
- Linux, Debian WHEEZY
- Linux, Ubuntu BIONIC
- Linux, Ubuntu XENIAL
- Linux, Ubuntu TRUSTY
- Linux, Arch Linux
path: tests/integration/test_vulnerability_detector/test_scan_results/
tags:
- NVD
tests:
- assertions:
- A report of the corresponding vulnerabilities is generated in the `ossec.log`.
description: Check if inserted vulnerable packages are reported by vulnerability
detector
logging:
- ossec.log:
- r"Agent .* has an unsupported Wazuh version, {version} for agent .*"
- r"The {package} package .* from agent .* is vulnerable to {cve}"
- r"The {feed_source} found a total of {expected_vulnerabilities_number} potential
vulnerabilities for agent .*"
name: test_vulnerabilities_report
parameters:
- configure_environment:
brief: Configure a custom environment for testing.
type: fixture
- get_configuration:
brief: Get configurations from the module.
type: fixture
- mock_vulnerability_scan:
brief: Decorator used in any function that needs to mock cve.db
type: callable
- restart_modulesd:
brief: Restart modulesd daemon
type: callable
- check_cve_db:
brief: Check if the CVE database exists and its tables are created
type: fixture
test_cases:
- scan_nvd_configuration-RHEL8
- scan_nvd_configuration-RHEL7
- scan_nvd_configuration-RHEL6
- scan_nvd_configuration-RHEL5
- scan_nvd_configuration-BIONIC
- scan_nvd_configuration-XENIAL
- scan_nvd_configuration-TRUSTY
- scan_nvd_configuration-BUSTER
- scan_nvd_configuration-STRETCH
test_input: Several vulnerable packages with their respective vulnerabilities are
inserted into a simulated agent to generate the corresponding alerts from NVD
feed.
wazuh_min_version: 4.1.0
tiers:
- 0
type: Integration test_vulnerability_detector/test_scan_results/test_redhat_inventory_redhat_feed.py'''
copyright:
Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the terms of GPLv2
type:
Integration
description: These tests will mock RedHat systems and insert custom vulnerabilities and vulnerable packages to check
if Vulnerability Detector generates the alerts from the RedHat provider feed.
tiers:
- 0
component:
- Manager
path:
tests/integration/test_vulnerability_detector/test_scan_results/
daemons:
- wazuh-modulesd
os_support:
Linux:
RedHat:
- RHEL5
- RHEL6
- RHEL7
- RHEL8
coverage:
100
tags:
- NVD
'''
import os
import pytest
from wazuh_testing.tools import LOG_FILE_PATH
from wazuh_testing.tools.configuration import load_wazuh_configurations
from wazuh_testing.tools.monitoring import FileMonitor
from wazuh_testing.tools import file
from wazuh_testing import vulnerability_detector as vd
# Marks
pytestmark = pytest.mark.tier(level=0)
# Variables
current_test_path = os.path.dirname(os.path.realpath(__file__))
test_data_path = os.path.join(current_test_path, 'data')
configurations_path = os.path.join(test_data_path, 'wazuh_redhat_inventory.yaml')
redhat_vulnerabilities_data_path = os.path.join(test_data_path, 'redhat_vulnerabilities.json')
wazuh_log_monitor = FileMonitor(LOG_FILE_PATH)
SCAN_TIMEOUT = 40
# Set configuration
parameters = [{'NVD_JSON_PATH': os.path.join(test_data_path, vd.REAL_NVD_FEED)}]
ids = ['redhat_scan_configuration']
# Read JSON data template
redhat_vulnerabilities = file.read_json_file(redhat_vulnerabilities_data_path)
redhat_data_ids = [system['target'] for system in redhat_vulnerabilities]
# Configuration data
configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters)
# Fixtures
@pytest.fixture(scope='module', params=configurations, ids=ids)
def get_configuration(request):
"""
description:
Get configurations from the module.
parameters:
- request
"""
return request.param
@pytest.fixture(scope='module', params=redhat_vulnerabilities, ids=redhat_data_ids)
@vd.mock_cve_db
def mock_vulnerability_scan(request, mock_agent):
"""
description:
Mocks the vulnerability scan inserting custom hotfixes, feeds and changing the host system
parameters:
- request:
type: dictionary
brief: containing the data to mock the system and the agent
- mock_agent:
type: callable
brief: fixture used to mock the agent
"""
# Mock system
vd.modify_system(agent_id=mock_agent, os_name=request.param['os_name'], os_major=request.param['os_major'],
os_minor=request.param['os_minor'], name=vd.MOCKED_AGENT_NAME)
# Add custom vulnerabilities and feeds
for vulnerability in request.param['vulnerabilities']:
vd.insert_package(**vulnerability['package'], agent=mock_agent, source=vulnerability['package']['name'])
vd.insert_vulnerability(**vulnerability['cve'], package=vulnerability['package']['name'],
target=request.param['target'])
def test_redhat_vulnerabilities_report(get_configuration, configure_environment, restart_modulesd, check_cve_db,
mock_vulnerability_scan):
"""
description:
Check if inserted vulnerable packages are reported by vulnerability detector
wazuh_min_version:
4.1.0
parameters:
- configure_environment:
type: fixture
brief: Configure a custom environment for testing.
- get_configuration:
type: fixture
brief: Get configurations from the module.
- mock_vulnerability_scan:
type: callable
brief: Decorator used in any function that needs to mock cve.db
- restart_modulesd:
type: callable
brief: Restart modulesd daemon
- check_cve_db:
type: fixture
brief: Check if the CVE database exists and its tables are created
assertions:
- A report of the corresponding vulnerabilities is generated in the `ossec.log`.
test_input:
Custom vulnerability config and vulnerable package
logging:
- ossec.log:
- r"The {feed_source} found a total of '{expected_vulnerabilities_number}' potential vulnerabilities "
"for agent .*"
- r"The '{package}' package .* from agent .* is vulnerable to '{cve}'"
"""
vulnerabilities_number = len(mock_vulnerability_scan['vulnerabilities'])
# Check that the number of OVAL vulnerabilities is the expected
vd.check_detected_vulnerabilities_number(wazuh_log_monitor=wazuh_log_monitor,
expected_vulnerabilities_number=vulnerabilities_number,
feed_source='OVAL', timeout=vd.VULN_DETECTOR_SCAN_TIMEOUT)
# Check the vulnerabilities of packages inserted
for item in mock_vulnerability_scan['vulnerabilities']:
vd.check_vulnerability_scan_event(wazuh_log_monitor=wazuh_log_monitor, package=item['package']['name'],
cve=item['cve']['cveid'])
vd.check_if_modulesd_is_running() test_redhat_inventory_redhat_feed.json{
"copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. <[email protected]>.\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2",
"type": "Integration",
"description": "These tests will mock RedHat systems and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the alerts from the RedHat provider feed.",
"tiers": [
0
],
"component": [
"Manager"
],
"path": "tests/integration/test_vulnerability_detector/test_scan_results/",
"daemons": [
"wazuh-modulesd"
],
"os_support": [
"Linux, RHEL5",
"Linux, RHEL6",
"Linux, RHEL7",
"Linux, RHEL8",
"Linux, Amazon Linux 1",
"Linux, Amazon Linux 2",
"Linux, Debian BUSTER",
"Linux, Debian STRETCH",
"Linux, Debian WHEEZY",
"Linux, Ubuntu BIONIC",
"Linux, Ubuntu XENIAL",
"Linux, Ubuntu TRUSTY",
"Linux, Arch Linux"
],
"coverage": 100,
"tags": [
"NVD"
],
"name": "test_redhat_inventory_redhat_feed.py",
"id": 9,
"group_id": 2,
"tests": [
{
"description": "Check if inserted vulnerable packages are reported by vulnerability detector",
"wazuh_min_version": "4.1.0",
"parameters": [
{
"configure_environment": {
"type": "fixture",
"brief": "Configure a custom environment for testing."
}
},
{
"get_configuration": {
"type": "fixture",
"brief": "Get configurations from the module."
}
},
{
"mock_vulnerability_scan": {
"type": "callable",
"brief": "Decorator used in any function that needs to mock cve.db"
}
},
{
"restart_modulesd": {
"type": "callable",
"brief": "Restart modulesd daemon"
}
},
{
"check_cve_db": {
"type": "fixture",
"brief": "Check if the CVE database exists and its tables are created"
}
}
],
"assertions": [
"A report of the corresponding vulnerabilities is generated in the `ossec.log`."
],
"test_input": "Custom vulnerability config and vulnerable package",
"logging": [
{
"ossec.log": [
"r\"The {feed_source} found a total of '{expected_vulnerabilities_number}' potential vulnerabilities \" \"for agent .*\"",
"r\"The '{package}' package .* from agent .* is vulnerable to '{cve}'\""
]
}
],
"name": "test_redhat_vulnerabilities_report",
"test_cases": [
"redhat_scan_configuration-RHEL8",
"redhat_scan_configuration-RHEL7",
"redhat_scan_configuration-RHEL6",
"redhat_scan_configuration-RHEL5"
]
}
]
} test_redhat_inventory_redhat_feed.yamlcomponent:
- Manager
copyright: 'Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the
terms of GPLv2'
coverage: 100
daemons:
- wazuh-modulesd
description: These tests will mock RedHat systems and insert custom vulnerabilities
and vulnerable packages to check if Vulnerability Detector generates the alerts
from the RedHat provider feed.
group_id: 2
id: 9
name: test_redhat_inventory_redhat_feed.py
os_support:
- Linux, RHEL5
- Linux, RHEL6
- Linux, RHEL7
- Linux, RHEL8
- Linux, Amazon Linux 1
- Linux, Amazon Linux 2
- Linux, Debian BUSTER
- Linux, Debian STRETCH
- Linux, Debian WHEEZY
- Linux, Ubuntu BIONIC
- Linux, Ubuntu XENIAL
- Linux, Ubuntu TRUSTY
- Linux, Arch Linux
path: tests/integration/test_vulnerability_detector/test_scan_results/
tags:
- NVD
tests:
- assertions:
- A report of the corresponding vulnerabilities is generated in the `ossec.log`.
description: Check if inserted vulnerable packages are reported by vulnerability
detector
logging:
- ossec.log:
- r"The {feed_source} found a total of '{expected_vulnerabilities_number}' potential
vulnerabilities " "for agent .*"
- r"The '{package}' package .* from agent .* is vulnerable to '{cve}'"
name: test_redhat_vulnerabilities_report
parameters:
- configure_environment:
brief: Configure a custom environment for testing.
type: fixture
- get_configuration:
brief: Get configurations from the module.
type: fixture
- mock_vulnerability_scan:
brief: Decorator used in any function that needs to mock cve.db
type: callable
- restart_modulesd:
brief: Restart modulesd daemon
type: callable
- check_cve_db:
brief: Check if the CVE database exists and its tables are created
type: fixture
test_cases:
- redhat_scan_configuration-RHEL8
- redhat_scan_configuration-RHEL7
- redhat_scan_configuration-RHEL6
- redhat_scan_configuration-RHEL5
test_input: Custom vulnerability config and vulnerable package
wazuh_min_version: 4.1.0
tiers:
- 0
type: Integration
tests/integration/test_vulnerability_detector/test_general_settings/test_general_settings_ignore_time.py'''
copyright:
Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the terms of GPLv2
type:
Integration
description: The tests will modify the value of `ignore_time` tag in `ossec.conf`, set different times
and check the result in `ossec.log`.
tiers:
- 0
component:
- Manager
path:
tests/integration/test_vulnerability_detector/test_general_settings/
daemons:
- wazuh-modulesd
os_support:
- RedHat:
- RHEL5
- RHEL6
- RHEL7
- RHEL8
- Ubuntu:
- BIONIC
- XENIAL
- TRUSTY
- Debian:
- JESSIE
- BUSTER
- STRETCH
- WHEEZY
coverage:
100
tags:
- NVD
'''
import os
from datetime import timedelta
import pytest
import wazuh_testing.vulnerability_detector as vd
from wazuh_testing.fim import check_time_travel
from wazuh_testing.tools import LOG_FILE_PATH
from wazuh_testing.tools.configuration import load_wazuh_configurations
from wazuh_testing.tools.monitoring import FileMonitor
from wazuh_testing.tools.services import control_service
from wazuh_testing.tools.time import time_to_seconds
# Marks
pytestmark = pytest.mark.tier(level=0)
# variables
test_path = os.path.dirname(os.path.realpath(__file__))
test_data_path = os.path.join(test_path, 'data')
nvd_feed_path = os.path.join(os.path.dirname(test_path), 'test_scan_results', 'data', vd.REAL_NVD_FEED)
configurations_path = os.path.join(test_data_path, 'wazuh_ignore_time.yaml')
wazuh_log_monitor = FileMonitor(LOG_FILE_PATH)
callback_string_vulnerability = f"'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'"
parameters = [{'IGNORE_TIME': '3600s', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path},
{'IGNORE_TIME': '60m', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path},
{'IGNORE_TIME': '1h', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path}]
metadata = [{'ignore_time': '3600s', 'timeout': 30, 'jumps': 2, 'interval': '5s'},
{'ignore_time': '60m', 'timeout': 30, 'jumps': 2, 'interval': '5s'},
{'ignore_time': '1h', 'timeout': 30, 'jumps': 2, 'interval': '5s'}]
# Configuration data
configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters, metadata=metadata)
# fixtures
@pytest.fixture(scope='module', params=configurations)
def get_configuration(request):
"""
description:
Get configurations from the module.
parameters:
- request
"""
return request.param
@pytest.fixture(scope='module')
def prepare_agent(mock_agent):
control_service('stop', daemon='wazuh-db')
vd.clean_vd_tables(mock_agent)
vd.insert_package(agent=mock_agent, vendor="Red Hat, Inc.")
vd.insert_vulnerability()
control_service('start', daemon='wazuh-db')
yield mock_agent
def test_ignore_time(get_configuration, configure_environment, restart_modulesd, prepare_agent,
custom_callback_vulnerability=vd.make_vuln_callback(callback_string_vulnerability)):
"""
description:
Check if an alert is not fired during the ignore time interval
wazuh_min_version:
4.1.0
parameters:
- configure_environment:
type: fixture
brief: Configure a custom environment for testing.
- get_configuration:
type: fixture
brief: Get configurations from the module.
- restart_modulesd:
type: callable
brief: Restart modulesd daemon
- prepare_agent:
type: fixture
brief: setup and start an agent
- custom_callback_vulnerability:
type: str
brief: custon callback created
assertions:
- Vulnerabilities alerts are not generated before `ignore_time` time set.
- Vulnerabilities alerts are generated after `ignore_time` time set.
test_input:
Custom vulnerability config and vulnerable package
logging:
- ossec.log:
- r"'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'"
"""
control_service('stop', daemon='wazuh-modulesd')
control_service('stop', daemon='wazuh-db')
vd.update_last_scan(agent=prepare_agent)
control_service('start', daemon='wazuh-db')
control_service('start', daemon='wazuh-modulesd')
ignore_time = get_configuration['metadata']['ignore_time']
jumps = get_configuration['metadata']['jumps']
seconds_to_travel = time_to_seconds(ignore_time) / jumps
# Check for initial alert
wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'],
callback=custom_callback_vulnerability,
error_message='Alert did not appear at the start of the test')
# Check if alert does not appear during ignore time
for _ in range(1, jumps):
check_time_travel(time_travel=True, interval=timedelta(seconds=seconds_to_travel))
with pytest.raises(TimeoutError):
wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'],
callback=custom_callback_vulnerability)
raise AttributeError('Alert appeared before ignore_time was finished')
# Travel to the time set in ignore time
check_time_travel(time_travel=True, interval=timedelta(seconds=seconds_to_travel))
# Check for final alert
wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'],
callback=custom_callback_vulnerability,
error_message='Alert did not appear at the end of the test') test_general_settings_ignore_time.json{
"copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. <[email protected]>.\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2",
"type": "Integration",
"description": "The tests will modify the value of `ignore_time` tag in `ossec.conf`, set different times and check the result in `ossec.log`.",
"tiers": [
0
],
"component": [
"Manager"
],
"path": "tests/integration/test_vulnerability_detector/test_general_settings/",
"daemons": [
"wazuh-modulesd"
],
"os_support": [
"Linux, RHEL5",
"Linux, RHEL6",
"Linux, RHEL7",
"Linux, RHEL8",
"Linux, Amazon Linux 1",
"Linux, Amazon Linux 2",
"Linux, Debian BUSTER",
"Linux, Debian STRETCH",
"Linux, Debian WHEEZY",
"Linux, Ubuntu BIONIC",
"Linux, Ubuntu XENIAL",
"Linux, Ubuntu TRUSTY",
"Linux, Arch Linux"
],
"coverage": 100,
"tags": [
"NVD"
],
"name": "test_general_settings_ignore_time.py",
"id": 5,
"group_id": 2,
"tests": [
{
"description": "Check if an alert is not fired during the ignore time interval",
"wazuh_min_version": "4.1.0",
"parameters": [
{
"configure_environment": {
"type": "fixture",
"brief": "Configure a custom environment for testing."
}
},
{
"get_configuration": {
"type": "fixture",
"brief": "Get configurations from the module."
}
},
{
"restart_modulesd": {
"type": "callable",
"brief": "Restart modulesd daemon"
}
},
{
"prepare_agent": {
"type": "fixture",
"brief": "setup and start an agent"
}
},
{
"custom_callback_vulnerability": {
"type": "str",
"brief": "custon callback created"
}
}
],
"assertions": [
"Vulnerabilities alerts are not generated before `ignore_time` time set.",
"Vulnerabilities alerts are generated after `ignore_time` time set."
],
"test_input": "Custom vulnerability config and vulnerable package",
"logging": [
{
"ossec.log": [
"r\"'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'\""
]
}
],
"name": "test_ignore_time",
"test_cases": [
"get_configuration0",
"get_configuration1",
"get_configuration2"
]
}
]
} test_general_settings_ignore_time.yamlcomponent:
- Manager
copyright: 'Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the
terms of GPLv2'
coverage: 100
daemons:
- wazuh-modulesd
description: The tests will modify the value of `ignore_time` tag in `ossec.conf`,
set different times and check the result in `ossec.log`.
group_id: 2
id: 5
name: test_general_settings_ignore_time.py
os_support:
- Linux, RHEL5
- Linux, RHEL6
- Linux, RHEL7
- Linux, RHEL8
- Linux, Amazon Linux 1
- Linux, Amazon Linux 2
- Linux, Debian BUSTER
- Linux, Debian STRETCH
- Linux, Debian WHEEZY
- Linux, Ubuntu BIONIC
- Linux, Ubuntu XENIAL
- Linux, Ubuntu TRUSTY
- Linux, Arch Linux
path: tests/integration/test_vulnerability_detector/test_general_settings/
tags:
- NVD
tests:
- assertions:
- Vulnerabilities alerts are not generated before `ignore_time` time set.
- Vulnerabilities alerts are generated after `ignore_time` time set.
description: Check if an alert is not fired during the ignore time interval
logging:
- ossec.log:
- r"'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'"
name: test_ignore_time
parameters:
- configure_environment:
brief: Configure a custom environment for testing.
type: fixture
- get_configuration:
brief: Get configurations from the module.
type: fixture
- restart_modulesd:
brief: Restart modulesd daemon
type: callable
- prepare_agent:
brief: setup and start an agent
type: fixture
- custom_callback_vulnerability:
brief: custon callback created
type: str
test_cases:
- get_configuration0
- get_configuration1
- get_configuration2
test_input: Custom vulnerability config and vulnerable package
wazuh_min_version: 4.1.0
tiers:
- 0
type: Integration tests/integration/test_agentd/test_agentd_reconnection.py
'''
copyright:
Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the terms of GPLv2
type:
Integration
description:
These tests will check if, during enrollment, the agent re-establishes communication with the manager
under different situations that interrupt it.
The objective is to check that, with different states in the clients.key file, the agent
successfully enrolls after losing connection with remoted.
tiers:
- 0
component:
Agent
path:
tests/integration/test_agentd/
daemons:
- wazuh-agentd
os_support:
- Linux, RHEL5
- Linux, RHEL6
- Linux, RHEL7
- Linux, RHEL8
- Linux, Amazon Linux 1
- Linux, Amazon Linux 2
- Linux, Debian BUSTER
- Linux, Debian STRETCH
- Linux, Debian WHEEZY
- Linux, Ubuntu BIONIC
- Linux, Ubuntu XENIAL
- Linux, Ubuntu TRUSTY
- Linux, Arch Linux
- Windows, 7
- Windows, 8
- Windows, 10
- Windows, Server 2003
- Windows, Server 2012
- Windows, Server 2016
coverage:
33
tags:
- linux
- agentd
'''
import os
import platform
from time import sleep
import pytest
from datetime import datetime, timedelta
from wazuh_testing.agent import CLIENT_KEYS_PATH, SERVER_CERT_PATH, SERVER_KEY_PATH
from wazuh_testing.tools import WAZUH_PATH, LOG_FILE_PATH
from wazuh_testing.tools.authd_sim import AuthdSimulator
from wazuh_testing.tools.configuration import load_wazuh_configurations
from wazuh_testing.tools.file import truncate_file
from wazuh_testing.tools.monitoring import QueueMonitor, FileMonitor
from wazuh_testing.tools.remoted_sim import RemotedSimulator
from wazuh_testing.tools.services import control_service
# Marks
pytestmark = [pytest.mark.linux, pytest.mark.win32, pytest.mark.tier(level=0), pytest.mark.agent]
test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data')
configurations_path = os.path.join(test_data_path, 'wazuh_conf.yaml')
params = [
{
'SERVER_ADDRESS': '127.0.0.1',
'REMOTED_PORT': 1514,
'PROTOCOL': 'tcp',
},
{
'SERVER_ADDRESS': '127.0.0.1',
'REMOTED_PORT': 1514,
'PROTOCOL': 'udp',
}
]
metadata = [
{'PROTOCOL': 'tcp'},
{'PROTOCOL': 'udp'}
]
config_ids = ['tcp', 'udp']
configurations = load_wazuh_configurations(configurations_path, __name__, params=params, metadata=metadata)
log_monitor_paths = []
receiver_sockets_params = []
monitored_sockets_params = []
receiver_sockets, monitored_sockets, log_monitors = None, None, None # Set in the fixtures
authd_server = AuthdSimulator(params[0]['SERVER_ADDRESS'], key_path=SERVER_KEY_PATH, cert_path=SERVER_CERT_PATH)
remoted_server = None
def teardown():
global remoted_server
if remoted_server is not None:
remoted_server.stop()
def set_debug_mode():
"""Set debug2 for agentd in local internal options file."""
if platform.system() == 'win32' or platform.system() == 'Windows':
local_int_conf_path = os.path.join(WAZUH_PATH, 'local_internal_options.conf')
debug_line = 'windows.debug=2\nagent.recv_timeout=5\n'
else:
local_int_conf_path = os.path.join(WAZUH_PATH, 'etc', 'local_internal_options.conf')
debug_line = 'agent.debug=2\nagent.recv_timeout=5\n'
with open(local_int_conf_path, 'r') as local_file_read:
lines = local_file_read.readlines()
for line in lines:
if line == debug_line:
return
with open(local_int_conf_path, 'a') as local_file_write:
local_file_write.write('\n' + debug_line)
set_debug_mode()
# fixtures
@pytest.fixture(scope="module", params=configurations, ids=config_ids)
def get_configuration(request):
"""Get configurations from the module"""
return request.param
@pytest.fixture(scope="function")
def configure_authd_server(request):
"""Initialize a simulated authd connection."""
authd_server.start()
global monitored_sockets
monitored_sockets = QueueMonitor(authd_server.queue)
authd_server.clear()
yield
authd_server.shutdown()
def start_authd():
"""Enable authd to accept connections and perform enrollments."""
authd_server.set_mode("ACCEPT")
authd_server.clear()
def stop_authd():
"""Disable authd to accept connections and perform enrollments."""
authd_server.set_mode("REJECT")
def set_authd_id():
"""Set agent id to 101 in the authd simulated connection."""
authd_server.agent_id = 101
def clean_keys():
"""Clear the agent's client.keys file."""
truncate_file(CLIENT_KEYS_PATH)
sleep(1)
def delete_keys():
"""Remove the agent's client.keys file."""
os.remove(CLIENT_KEYS_PATH)
sleep(1)
def set_keys():
"""Write to client.keys file the agent's enrollment details."""
with open(CLIENT_KEYS_PATH, 'w+') as f:
f.write("100 ubuntu-agent any TopSecret")
sleep(1)
def wait_notify(line):
"""Callback function to wait for agent checkins to the manager."""
if 'Sending keep alive:' in line:
return line
return None
def wait_enrollment(line):
"""Callback function to wait for enrollment."""
if 'Valid key received' in line:
return line
return None
def wait_enrollment_try(line):
"""Callback function to wait for enrollment attempt."""
if 'Requesting a key' in line:
return line
return None
def search_error_messages():
"""Retrieve the line of the log file where first error is found.
Returns:
str: String where the error is found or None if errors are not found.
"""
with open(LOG_FILE_PATH, 'r') as log_file:
lines = log_file.readlines()
for line in lines:
if f"ERROR:" in line:
return line
return None
# Tests
"""
This test covers the scenario of Agent starting with keys,
when misses communication with Remoted and a new enrollment is sent to Authd.
"""
def test_agentd_reconection_enrollment_with_keys(configure_authd_server, configure_environment, get_configuration):
'''
description:
Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd.
In this case, the agent starts with keys.
wazuh_min_version:
4.1
parameters:
- configure_authd_server (fixture), Initialize a simulated authd connection.
- configure_environment (fixture), Configure a custom environment for testing.
- get_configuration (fixture), Get configurations from the module.
assertions:
- The agent has keys and loses communication with remoted.
test_input:
Requests are made using TCP and UDP protocols together with a client.keys file
that includes the previously generated agent keys.
logging:
- ossec.log, "Valid key received"
- ossec.log, "Sending keep alive:"
- ossec.log, "Valid key received"
- ossec.log, "Sending keep alive:"
tags:
- remoted_simulator
'''
global remoted_server
remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK',
client_keys=CLIENT_KEYS_PATH)
# Stop target Agent
control_service('stop')
# Prepare test
start_authd()
set_authd_id()
set_keys()
# Clean logs
truncate_file(LOG_FILE_PATH)
# Start target Agent
control_service('start')
# Start hearing logs
truncate_file(LOG_FILE_PATH)
log_monitor = FileMonitor(LOG_FILE_PATH)
# hearing on enrollment server
authd_server.clear()
# Wait until Agent is notifying Manager
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
# Start rejecting Agent
remoted_server.set_mode('REJECT')
# hearing on enrollment server
authd_server.clear()
# Wait until Agent asks a new key to enrollment
log_monitor.start(timeout=180, callback=wait_enrollment,
error_message="Agent never enrolled after rejecting connection!")
# Start responding to Agent
remoted_server.set_mode('CONTROLLED_ACK')
# Wait until Agent is notifying Manager
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message"
"""
This test covers the scenario of Agent starting without client.keys file
and an enrollment is sent to Authd to start communicating with Remoted
"""
def test_agentd_reconection_enrollment_no_keys_file(configure_authd_server, configure_environment, get_configuration):
'''
description:
Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd.
In this case, the agent starts without the client.keys file.
wazuh_min_version:
4.1
parameters:
- configure_authd_server (fixture), Initialize a simulated authd connection.
- configure_environment (fixture), Configure a custom environment for testing.
- get_configuration (fixture), Get configurations from the module.
assertions:
- The agent does not have keys and loses communication with remoted when enrollment has been started.
test_input:
Requests are made using TCP and UDP protocols.
logging:
- ossec.log, "Valid key received"
- ossec.log, "Sending keep alive:"
- ossec.log, "Valid key received"
- ossec.log, "Sending keep alive:"
tags:
- remoted_simulator
'''
global remoted_server
remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK',
client_keys=CLIENT_KEYS_PATH)
# Stop target Agent
control_service('stop')
# Clean logs
truncate_file(LOG_FILE_PATH)
# Prepare test
start_authd()
set_authd_id()
delete_keys()
# Start target Agent
control_service('start')
# start hearing logs
log_monitor = FileMonitor(LOG_FILE_PATH)
# hearing on enrollment server
authd_server.clear()
# Wait until Agent asks keys for the first time
log_monitor.start(timeout=50, callback=wait_enrollment,
error_message="Agent never enrolled for the first time.")
# Wait until Agent is notifing Manager
log_monitor.start(timeout=50, callback=wait_notify, error_message="Notify message from agent was never sent!")
# Start rejecting Agent
remoted_server.set_mode('REJECT')
# hearing on enrollment server
authd_server.clear()
# Wait until Agent asks a new key to enrollment
log_monitor.start(timeout=180, callback=wait_enrollment,
error_message="Agent never enrolled after rejecting connection!")
# Start responding to Agent
remoted_server.set_mode('CONTROLLED_ACK')
# Wait until Agent is notifing Manager
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message"
"""
This test covers the scenario of Agent starting without keys in client.keys file
and an enrollment is sent to Authd to start communicating with Remoted
"""
def test_agentd_reconection_enrollment_no_keys(configure_authd_server, configure_environment, get_configuration):
'''
description:
Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd.
In this case, the agent has its client.keys file empty.
wazuh_min_version:
4.1
parameters:
- configure_authd_server (fixture), Initialize a simulated authd connection.
- configure_environment (fixture), Configure a custom environment for testing.
- get_configuration (fixture), Get configurations from the module.
assertions:
- The agent does not have `client.keys` file and loses communication
with remoted when enrollment has been started.
test_input:
Requests are made using TCP and UDP protocols together with a empty client.keys file.
logging:
- ossec.log, "Valid key received"
- ossec.log, "Sending keep alive:"
- ossec.log, "Valid key received"
- ossec.log, "Sending keep alive:"
tags:
- remoted_simulator
'''
global remoted_server
remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK',
client_keys=CLIENT_KEYS_PATH)
# Stop target Agent
control_service('stop')
# Clean logs
truncate_file(LOG_FILE_PATH)
# Prepare test
start_authd()
set_authd_id()
clean_keys()
# Start target Agent
control_service('start')
# start hearing logs
log_monitor = FileMonitor(LOG_FILE_PATH)
# hearing on enrollment server
authd_server.clear()
# Wait until Agent asks keys for the first time
log_monitor.start(timeout=120, callback=wait_enrollment,
error_message="Agent never enrolled for the first time rejecting connection!")
# Wait until Agent is notifying Manager
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message"
# Start rejecting Agent
remoted_server.set_mode('REJECT')
# hearing on enrollment server
authd_server.clear()
# Wait until Agent asks a new key to enrollment
log_monitor.start(timeout=180, callback=wait_enrollment,
error_message="Agent never enrolled after rejecting connection!")
# Start responding to Agent
remoted_server.set_mode('CONTROLLED_ACK')
# Wait until Agent is notifying Manager
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message"
"""
This test covers and check the scenario of Agent starting without keys
and multiple retries are required until the new key is obtained to start communicating with Remoted
"""
def test_agentd_initial_enrollment_retries(configure_authd_server, configure_environment, get_configuration):
'''
description:
Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd.
For this, the agent starts without keys and perform multiple enrollment requests
to authd before getting the new key to communicate with remoted.
wazuh_min_version:
4.1
parameters:
- configure_authd_server (fixture), Initialize a simulated authd connection.
- configure_environment (fixture), Configure a custom environment for testing.
- get_configuration (fixture), Get configurations from the module.
assertions:
- The agent has keys, loses communication with remoted, and performs multiple enrollment requests.
test_input:
Requests are made using TCP and UDP protocols together with a empty client.keys file.
logging:
- ossec.log, "Sending keep alive:"
- ossec.log, "Requesting a key" (four times)
- ossec.log, "Valid key received"
- ossec.log, "Sending keep alive:"
tags:
- remoted_simulator
'''
global remoted_server
remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK',
client_keys=CLIENT_KEYS_PATH)
# Stop target Agent
control_service('stop')
# Clean logs
truncate_file(LOG_FILE_PATH)
# Preapre test
stop_authd()
set_authd_id()
clean_keys()
# Start whole Agent service to check other daemons status after initialization
control_service('start')
# Start hearing logs
log_monitor = FileMonitor(LOG_FILE_PATH)
start_time = datetime.now()
# Check for unsuccessful enrollment retries in Agentd initialization
retries = 0
while retries < 4:
retries += 1
log_monitor.start(timeout=retries * 5 + 20, callback=wait_enrollment_try,
error_message="Enrollment retry was not sent!")
stop_time = datetime.now()
expected_time = start_time + timedelta(seconds=retries * 5 - 2)
# Check if delay was applied
assert stop_time > expected_time, "Retries too quick"
# Enable authd
authd_server.clear()
authd_server.set_mode("ACCEPT")
# Wait successfully enrollment
# Wait succesfull enrollment
log_monitor.start(timeout=70, callback=wait_enrollment, error_message="No succesful enrollment after reties!")
# Wait until Agent is notifying Manager
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
# Check if no Wazuh module stopped due to Agentd Initialization
with open(LOG_FILE_PATH) as log_file:
log_lines = log_file.read().splitlines()
for line in log_lines:
if "Unable to access queue:" in line:
raise AssertionError("A Wazuh module stopped because of Agentd initialization!")
"""
This test covers and check the scenario of Agent starting with keys but Remoted is not reachable during some seconds
and multiple connection retries are required prior to requesting a new enrollment
"""
def test_agentd_connection_retries_pre_enrollment(configure_authd_server, configure_environment, get_configuration):
'''
description:
Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd.
For this, the agent starts with keys but Remoted is not available for several seconds,
then the agent performs multiple connection retries before requesting a new enrollment.
wazuh_min_version:
4.1
parameters:
- configure_authd_server (fixture), Initialize a simulated authd connection.
- configure_environment (fixture), Configure a custom environment for testing.
- get_configuration (fixture), Get configurations from the module.
assertions:
- The agent does not have keys, remoted is unavailable for several seconds and multiple connection
requests are performed before a new enrollment is made.
test_input:
Requests are made using TCP and UDP protocols together with a client.keys file
that includes the previously generated agent keys.
logging:
- ossec.log, "Sending keep alive:"
tags:
- remoted_simulator
'''
global remoted_server
REMOTED_KEYS_SYNC_TIME = 10
# Start Remoted mock
remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], client_keys=CLIENT_KEYS_PATH)
# Stop target Agent
control_service('stop')
# Clean logs
truncate_file(LOG_FILE_PATH)
# Prepare test
stop_authd()
set_keys()
# Start hearing logs
log_monitor = FileMonitor(LOG_FILE_PATH)
# Start whole Agent service to check other daemons status after initialization
control_service('start')
# Simulate time of Remoted to synchronize keys by waiting previous to start responding
remoted_server.set_mode('CONTROLLED_ACK')
sleep(REMOTED_KEYS_SYNC_TIME)
# Check Agentd is finally communicating
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") test_remoted/test_agent_communication/test_request_agent_info.py
'''
copyright:
Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the terms of GPLv2
type:
Integration
description:
Check that manager-agent communication through remoted socket works as expected.
tiers:
- 0
component:
Manager
path:
tests/integration/test_remoted/test_agent_communication/
daemons:
- wazuh-remoted
- wazuh-agentd
os_support:
- Linux, RHEL5
- Linux, RHEL6
- Linux, RHEL7
- Linux, RHEL8
- Linux, Amazon Linux 1
- Linux, Amazon Linux 2
- Linux, Debian BUSTER
- Linux, Debian STRETCH
- Linux, Debian WHEEZY
- Linux, Ubuntu BIONIC
- Linux, Ubuntu XENIAL
- Linux, Ubuntu TRUSTY
- Linux, Arch Linux
coverage:
33
tags:
- linux
- remoted
'''
import os
import time
import pytest
import wazuh_testing.tools.agent_simulator as ag
from wazuh_testing.tools.configuration import load_wazuh_configurations
from wazuh_testing.tools.sockets import send_request
# Marks
pytestmark = pytest.mark.tier(level=0)
# Configuration
test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data')
configurations_path = os.path.join(test_data_path, 'wazuh_request_agent_info.yaml')
parameters = [
{'PROTOCOL': 'udp,tcp'},
{'PROTOCOL': 'tcp'},
{'PROTOCOL': 'udp'},
]
metadata = [
{'PROTOCOL': 'udp,tcp'},
{'PROTOCOL': 'tcp'},
{'PROTOCOL': 'udp'},
]
# test cases
test_case = {
'disconnected': ('agent getconfig disconnected',
'Cannot send request'),
'get_config': ('agent getconfig client',
'{"client":{"config-profile":"centos8","notify_time":10,"time-reconnect":60}}'),
'get_state': ('logcollector getstate',
'{"error":0,"data":{"global":{"start":"2021-02-26, 06:41:26","end":"2021-02-26 08:49:19"}}}')
}
configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters, metadata=metadata)
config_ids = [x['PROTOCOL'] for x in parameters]
# Utils
manager_address = "localhost"
# fixtures
@pytest.fixture(scope="module", params=configurations, ids=config_ids)
def get_configuration(request):
"""Get configurations from the module."""
return request.param
@pytest.mark.parametrize("command_request,expected_answer", test_case.values(), ids=list(test_case.keys()))
def test_request(get_configuration, configure_environment, remove_shared_files,
restart_remoted, command_request, expected_answer):
'''
description:
Writes (config/state) requests in $DIR/queue/ossec/request and check if remoted forwards it to the agent,
collects the response, and writes it in the socket or returns an error message if the queried
agent is disconnected.
wazuh_min_version:
4.2
parameters:
- get_configuration (fixture), Get configurations from the module.
- configure_environment (fixture), Configure a custom environment for testing.
- remove_shared_files (fixture), Temporary removes txt files from default agent group shared files.
- restart_remoted (fixture), Restart the wazuh-remoted daemon.
- command_request (String), Use case being evaluated.
- expected_answer (String), The expected response from the agent to the manager.
assertions:
- The getconfig request.
- The getstate request.
- The getconfig request for a disconnected agent.
test_input:
Several requests that are made by the manager to the agent.
logging:
- ossec.log:
- "Cannot send request"
- r"{"client":{"config-profile":"centos8","notify_time":10,"time-reconnect":60}}"
- r"{"error":0,"data":{"global":{"start":"2021-02-26, 06:41:26","end":"2021-02-26 08:49:19"}}}"
tags:
- agent_simulator
'''
cfg = get_configuration['metadata']
protocols = cfg['PROTOCOL'].split(',')
agents = [ag.Agent(manager_address, "aes", os="debian8", version="4.2.0") for _ in range(len(protocols))]
for agent, protocol in zip(agents, protocols):
if "disconnected" not in command_request:
sender, injector = ag.connect(agent, manager_address, protocol)
msg_request = f'{agent.id} {command_request}'
response = send_request(msg_request)
assert expected_answer in response, "Remoted unexpected answer"
if "disconnected" not in command_request:
injector.stop_receive() test_request_agent_info.json{
"copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. <[email protected]>.\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2",
"type": "Integration",
"description": "Check that manager-agent communication through remoted socket works as expected.",
"tiers": [
0
],
"component": "Manager",
"path": "tests/integration/test_remoted/test_agent_communication/",
"daemons": [
"wazuh-remoted",
"wazuh-agentd"
],
"os_support": [
"Linux, RHEL5",
"Linux, RHEL6",
"Linux, RHEL7",
"Linux, RHEL8",
"Linux, Amazon Linux 1",
"Linux, Amazon Linux 2",
"Linux, Debian BUSTER",
"Linux, Debian STRETCH",
"Linux, Debian WHEEZY",
"Linux, Ubuntu BIONIC",
"Linux, Ubuntu XENIAL",
"Linux, Ubuntu TRUSTY",
"Linux, Arch Linux"
],
"coverage": 33,
"tags": [
"linux",
"remoted"
],
"name": "test_request_agent_info.py",
"id": 67,
"group_id": 47,
"tests": [
{
"description": "Writes (config/state) requests in $DIR/queue/ossec/request and check if remoted forwards it to the agent, collects the response, and writes it in the socket or returns an error message if the queried agent is disconnected.",
"wazuh_min_version": 4.2,
"parameters": [
"get_configuration (fixture), Get configurations from the module.",
"configure_environment (fixture), Configure a custom environment for testing.",
"remove_shared_files (fixture), Temporary removes txt files from default agent group shared files.",
"restart_remoted (fixture), Restart the wazuh-remoted daemon.",
"command_request (String), Use case being evaluated.",
"expected_answer (String), The expected response from the agent to the manager."
],
"assertions": [
"The getconfig request.",
"The getstate request.",
"The getconfig request for a disconnected agent."
],
"test_input": "Several requests that are made by the manager to the agent.",
"logging": [
"ossec.log, \"Cannot send request\"",
"ossec.log, \"{\"client\":{\"config-profile\":\"centos8\",\"notify_time\":10,\"time-reconnect\":60}}\"",
"ossec.log, \"{\"error\":0,\"data\":{\"global\":{\"start\":\"2021-02-26, 06:41:26\",\"end\":\"2021-02-26 08:49:19\"}}}\""
],
"tags": [
"agent_simulator"
],
"name": "test_request",
"test_cases": [
"udp,tcp-disconnected",
"udp,tcp-get_config",
"udp,tcp-get_state",
"tcp-disconnected",
"tcp-get_config",
"tcp-get_state",
"udp-disconnected",
"udp-get_config",
"udp-get_state"
]
}
]
} test_request_agent_info.yamlcomponent: Manager
copyright: 'Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the
terms of GPLv2'
coverage: 33
daemons:
- wazuh-remoted
- wazuh-agentd
description: Check that manager-agent communication through remoted socket works as
expected.
group_id: 47
id: 67
name: test_request_agent_info.py
os_support:
- Linux, RHEL5
- Linux, RHEL6
- Linux, RHEL7
- Linux, RHEL8
- Linux, Amazon Linux 1
- Linux, Amazon Linux 2
- Linux, Debian BUSTER
- Linux, Debian STRETCH
- Linux, Debian WHEEZY
- Linux, Ubuntu BIONIC
- Linux, Ubuntu XENIAL
- Linux, Ubuntu TRUSTY
- Linux, Arch Linux
path: tests/integration/test_remoted/test_agent_communication/
tags:
- linux
- remoted
tests:
- assertions:
- The getconfig request.
- The getstate request.
- The getconfig request for a disconnected agent.
description: Writes (config/state) requests in $DIR/queue/ossec/request and check
if remoted forwards it to the agent, collects the response, and writes it in the
socket or returns an error message if the queried agent is disconnected.
logging:
- ossec.log, "Cannot send request"
- ossec.log, "{"client":{"config-profile":"centos8","notify_time":10,"time-reconnect":60}}"
- ossec.log, "{"error":0,"data":{"global":{"start":"2021-02-26, 06:41:26","end":"2021-02-26
08:49:19"}}}"
name: test_request
parameters:
- get_configuration (fixture), Get configurations from the module.
- configure_environment (fixture), Configure a custom environment for testing.
- remove_shared_files (fixture), Temporary removes txt files from default agent
group shared files.
- restart_remoted (fixture), Restart the wazuh-remoted daemon.
- command_request (String), Use case being evaluated.
- expected_answer (String), The expected response from the agent to the manager.
tags:
- agent_simulator
test_cases:
- udp,tcp-disconnected
- udp,tcp-get_config
- udp,tcp-get_state
- tcp-disconnected
- tcp-get_config
- tcp-get_state
- udp-disconnected
- udp-get_config
- udp-get_state
test_input: Several requests that are made by the manager to the agent.
wazuh_min_version: 4.2
tiers:
- 0
type: Integration
|
2021-08-27A draft has been created in the QA Wiki (https:/wazuh/wazuh-qa/wiki/Documenting-tests-using-the-DocGenerator-tool) explaining how to document tests using DocGenerator. This document will be updated as soon as we have the final QA-Docs schema. |
2021-09-07QA Docs schema 2.0 proposal 4Module block
Test block
Sample documentationtest_vulnerability_detector/test_scan_results/test_scan_nvd_feed.py
'''
copyright:
Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the terms of GPLv2
type:
integration
brief:
These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities and
vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed.
tier:
0
modules:
- vulnerability_detector
components:
- manager
path:
tests/integration/test_vulnerability_detector/test_scan_results/test_scan_nvd_feed.py
daemons:
- wazuh-modulesd
- wazuh-db
os_platform:
- linux
os_version:
- Amazon Linux 1
- Amazon Linux 2
- Arch Linux
- CentOS 6
- CentOS 7
- CentOS 8
- Debian Buster
- Debian Stretch
- Debian Jessie
- Debian Wheezy
- Red Hat 6
- Red Hat 7
- Red Hat 8
- Ubuntu Bionic
- Ubuntu Trusty
- Ubuntu Xenial
- Windows 7
- Windows 8
- Windows 10
- Windows Server 2003
- Windows Server 2012
- Windows Server 2016
references:
- https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html
pytest_args:
-
tags:
- nvd
'''
import os
from datetime import timedelta
from shutil import copy
import pytest
import wazuh_testing.vulnerability_detector as vd
from wazuh_testing.fim import check_time_travel
from wazuh_testing.tools import LOG_FILE_PATH
from wazuh_testing.tools import file
from wazuh_testing.tools.configuration import load_wazuh_configurations
from wazuh_testing.tools.monitoring import FileMonitor
# Marks
pytestmark = pytest.mark.tier(level=0)
# Variables
current_test_path = os.path.dirname(os.path.realpath(__file__))
test_data_path = os.path.join(current_test_path, 'data')
configurations_path = os.path.join(test_data_path, 'wazuh_nvd_configuration.yaml')
vulnerabilities_data_path = os.path.join(test_data_path, vd.VULNERABILITIES)
custom_cpe_helper_data_path = os.path.join(test_data_path, vd.CUSTOM_CPE_HELPER)
custom_msu_data_path = os.path.join(test_data_path, vd.CUSTOM_MSU)
wazuh_log_monitor = FileMonitor(LOG_FILE_PATH)
SCAN_TIMEOUT = 40
copy(custom_cpe_helper_data_path, vd.CPE_HELPER_PATH)
# Set configuration
parameters = [{'NVD_JSON_PATH': os.path.join(test_data_path, vd.CUSTOM_NVD_FEED),
'MSU_JSON_PATH': custom_msu_data_path}]
ids = ['scan_nvd_configuration']
# Read JSON data template
nvd_vulnerabilities = file.read_json_file(vulnerabilities_data_path)
system_data = [
{"target": "WINDOWS10", "os_name": "Microsoft Windows Server 2016 Datacenter Evaluation",
"os_major": "10", "os_minor": "0", "name": "windows", "format": "win",
"vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "windows", "version": "Wazuh v4.1"},
{"target": "RHEL8", "os_name": "CentOS Linux", "os_major": "8", "os_minor": "1", "name": "centos8",
"format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "centos", "version": "Wazuh v4.1"},
{"target": "RHEL7", "os_name": "CentOS Linux", "os_major": "7", "os_minor": "8", "name": "centos7",
"format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "centos", "version": "Wazuh v4.1"},
{"target": "RHEL6", "os_name": "CentOS Linux", "os_major": "6", "os_minor": "10", "name": "centos6",
"format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "centos", "version": "Wazuh v4.1"},
{"target": "RHEL5", "os_name": "CentOS Linux", "os_major": "5", "os_minor": "11", "name": "centos5",
"format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "centos", "version": "Wazuh v4.1"},
{"target": "BIONIC", "os_name": "Ubuntu", "os_major": "18", "os_minor": "04", "name": "Ubuntu-bionic",
"format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "ubuntu", "version": "Wazuh v4.1"},
{"target": "XENIAL", "os_name": "Ubuntu", "os_major": "16", "os_minor": "04", "name": "Ubuntu-xenial",
"format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "ubuntu", "version": "Wazuh v4.1"},
{"target": "TRUSTY", "os_name": "Ubuntu", "os_major": "14", "os_minor": "04", "name": "Ubuntu-trusty",
"format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "ubuntu", "version": "Wazuh v4.1"},
{"target": "BUSTER", "os_name": "Debian GNU/Linux", "os_major": "10", "os_minor": "", "name": "debian10",
"format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "debian", "version": "Wazuh v4.1"},
{"target": "STRETCH", "os_name": "Debian GNU/Linux", "os_major": "9", "os_minor": "", "name": "debian9",
"format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "debian", "version": "Wazuh v4.1"},
{"target": "MAC", "os_name": "Mac OS X", "os_major": "10", "os_minor": "15", "name": "macos-catalina",
"format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "darwin", "version": "Wazuh v4.1"},
{"target": "MAC", "os_name": "Mac OS X", "os_major": "10", "os_minor": "15", "name": "macos-catalina",
"format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "darwin", "version": "Wazuh v4.0"},
{"target": "MAC", "os_name": "Mac OS X Server", "os_major": "5", "os_minor": "10", "name": "macos-server",
"format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']),
"os_platform": "darwin", "version": "Wazuh v4.1"}
]
system_data_ids = [system['target'] for system in system_data]
# Configuration data
configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters)
# Fixtures
@pytest.fixture(scope='module', params=configurations, ids=ids)
def get_configuration(request):
"""Get configurations from the module."""
return request.param
@pytest.fixture(scope='module', params=system_data, ids=system_data_ids)
@vd.mock_cve_db
def mock_vulnerability_scan(request, mock_agent):
"""
It allows to mock the vulnerability scan inserting custom packages, feeds and changing the host system
"""
# Mock system
vd.modify_system(agent_id=mock_agent, os_name=request.param['os_name'], os_major=request.param['os_major'],
os_minor=request.param['os_minor'], name=vd.MOCKED_AGENT_NAME,
os_platform=request.param['os_platform'], version=request.param['version'])
# Insert a vulnerability in table VULNERABILITIES
vd.insert_vulnerability(cveid='CWE-000', operation='less than', operation_value='1.0.0',
package='test', target=request.param['target'])
# Add custom vulnerabilities and feeds
for vulnerability in nvd_vulnerabilities['vulnerabilities_nvd']:
vd.insert_package(**vulnerability['package'], source=vulnerability['package']['name'],
format=request.param['format'], agent=mock_agent)
def test_vulnerabilities_report(get_configuration, configure_environment, restart_modulesd, check_cve_db,
mock_vulnerability_scan):
'''
description:
Check if inserted vulnerable packages are reported by vulnerability detector using the NVD feed.
wazuh_min_version:
4.2
parameters:
- get_configuration:
type: fixture
brief: Get configurations from the module.
- configure_environment:
type: fixture
brief: Configure a custom environment for testing.
- restart_modulesd:
type: fixture
brief: Restart modulesd daemon.
- check_cve_db:
type: fixture
brief: Check if the CVE database exists and its tables are created.
- mock_vulnerability_scan:
type: fixture
brief: Decorator used in any function that needs to mock cve.db.
assertions:
- Verify that the NVD vulnerabilities of the inserted packages are detected.
- Verify that the number of NVD vulnerabilities detected is as expected.
input_description:
Several vulnerable packages with their respective vulnerabilities are inserted into
a simulated agent to generate the corresponding alerts from NVD feed.
expected_output:
- r"Agent .* has an unsupported Wazuh version, {version} for agent .*"
- r"The {package} package .* from agent .* is vulnerable to {cve}"
- r"The {feed_source} found a total of {expected_vulnerabilities_number} potential vulnerabilities for agent .*"
tags:
-
'''
vulnerabilities_number = mock_vulnerability_scan["vulnerabilities_number"]
if mock_vulnerability_scan['format'] == 'pkg' and mock_vulnerability_scan['version'] == 'Wazuh v4.0':
version = mock_vulnerability_scan['version']
wazuh_log_monitor.start(
timeout=SCAN_TIMEOUT,
update_position=False,
callback=vd.make_vuln_callback(fr"Agent .* has an unsupported Wazuh version: '{version}'"),
error_message="The expected event 'Agent .* has an unsupported Wazuh version' not found"
)
return
# Check the vulnerabilities of inserted packages
try:
for item in nvd_vulnerabilities['vulnerabilities_nvd']:
vd.check_vulnerability_scan_event(wazuh_log_monitor, item['package']['name'], item['cve']['cveid'])
except TimeoutError:
check_time_travel(time_travel=True, interval=timedelta(seconds=300))
for item in nvd_vulnerabilities['vulnerabilities_nvd']:
vd.check_vulnerability_scan_event(wazuh_log_monitor, item['package']['name'], item['cve']['cveid'])
# Check that the number of NVD vulnerabilities is the expected
if mock_vulnerability_scan["format"] != "win":
vd.check_detected_vulnerabilities_number(wazuh_log_monitor=wazuh_log_monitor,
expected_vulnerabilities_number=vulnerabilities_number,
feed_source='NVD', timeout=vd.VULN_DETECTOR_SCAN_TIMEOUT)
vd.check_if_modulesd_is_running() test_scan_nvd_feed.yamlbrief: These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert
custom vulnerabilities and vulnerable packages to check if Vulnerability Detector
generates the vulnerability alerts from NVD feed.
components:
- manager
copyright: 'Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the
terms of GPLv2'
daemons:
- wazuh-modulesd
- wazuh-db
group_id: 4
id: 12
modules:
- vulnerability_detector
name: test_scan_nvd_feed.py
os_platform:
- linux
os_version:
- Amazon Linux 1
- Amazon Linux 2
- Arch Linux
- CentOS 6
- CentOS 7
- CentOS 8
- Debian Buster
- Debian Stretch
- Debian Jessie
- Debian Wheezy
- Red Hat 6
- Red Hat 7
- Red Hat 8
- Ubuntu Bionic
- Ubuntu Trusty
- Ubuntu Xenial
- Windows 7
- Windows 8
- Windows 10
- Windows Server 2003
- Windows Server 2012
- Windows Server 2016
path: tests/integration/test_vulnerability_detector/test_scan_results/test_scan_nvd_feed.py
pytest_args:
- null
references:
- https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html
tags:
- nvd
tests:
- assertions:
- Verify that the NVD vulnerabilities of the inserted packages are detected.
- Verify that the number of NVD vulnerabilities detected is as expected.
description: Check if inserted vulnerable packages are reported by vulnerability
detector using the NVD feed.
expected_output:
- r"Agent .* has an unsupported Wazuh version, {version} for agent .*"
- r"The {package} package .* from agent .* is vulnerable to {cve}"
- r"The {feed_source} found a total of {expected_vulnerabilities_number} potential
vulnerabilities for agent .*"
input_description: Several vulnerable packages with their respective vulnerabilities
are inserted into a simulated agent to generate the corresponding alerts from
NVD feed.
inputs:
- scan_nvd_configuration-WINDOWS10
- scan_nvd_configuration-RHEL8
- scan_nvd_configuration-RHEL7
- scan_nvd_configuration-RHEL6
- scan_nvd_configuration-RHEL5
- scan_nvd_configuration-BIONIC
- scan_nvd_configuration-XENIAL
- scan_nvd_configuration-TRUSTY
- scan_nvd_configuration-BUSTER
- scan_nvd_configuration-STRETCH
- scan_nvd_configuration-MAC0
- scan_nvd_configuration-MAC1
- scan_nvd_configuration-MAC2
name: test_vulnerabilities_report
parameters:
- get_configuration:
brief: Get configurations from the module.
type: fixture
- configure_environment:
brief: Configure a custom environment for testing.
type: fixture
- restart_modulesd:
brief: Restart modulesd daemon.
type: fixture
- check_cve_db:
brief: Check if the CVE database exists and its tables are created.
type: fixture
- mock_vulnerability_scan:
brief: Decorator used in any function that needs to mock cve.db.
type: fixture
tags:
- null
wazuh_min_version: 4.2
tier: 0
type: integration test_scan_nvd_feed.json{
"copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. <[email protected]>.\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2",
"type": "integration",
"brief": "These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed.",
"tier": 0,
"modules": [
"vulnerability_detector"
],
"components": [
"manager"
],
"path": "tests/integration/test_vulnerability_detector/test_scan_results/test_scan_nvd_feed.py",
"daemons": [
"wazuh-modulesd",
"wazuh-db"
],
"os_platform": [
"linux"
],
"os_version": [
"Amazon Linux 1",
"Amazon Linux 2",
"Arch Linux",
"CentOS 6",
"CentOS 7",
"CentOS 8",
"Debian Buster",
"Debian Stretch",
"Debian Jessie",
"Debian Wheezy",
"Red Hat 6",
"Red Hat 7",
"Red Hat 8",
"Ubuntu Bionic",
"Ubuntu Trusty",
"Ubuntu Xenial",
"Windows 7",
"Windows 8",
"Windows 10",
"Windows Server 2003",
"Windows Server 2012",
"Windows Server 2016"
],
"references": [
"https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html"
],
"pytest_args": [
null
],
"tags": [
"nvd"
],
"name": "test_scan_nvd_feed.py",
"id": 12,
"group_id": 4,
"tests": [
{
"description": "Check if inserted vulnerable packages are reported by vulnerability detector using the NVD feed.",
"wazuh_min_version": 4.2,
"parameters": [
{
"get_configuration": {
"type": "fixture",
"brief": "Get configurations from the module."
}
},
{
"configure_environment": {
"type": "fixture",
"brief": "Configure a custom environment for testing."
}
},
{
"restart_modulesd": {
"type": "fixture",
"brief": "Restart modulesd daemon."
}
},
{
"check_cve_db": {
"type": "fixture",
"brief": "Check if the CVE database exists and its tables are created."
}
},
{
"mock_vulnerability_scan": {
"type": "fixture",
"brief": "Decorator used in any function that needs to mock cve.db."
}
}
],
"assertions": [
"Verify that the NVD vulnerabilities of the inserted packages are detected.",
"Verify that the number of NVD vulnerabilities detected is as expected."
],
"input_description": "Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from NVD feed.",
"expected_output": [
"r\"Agent .* has an unsupported Wazuh version, {version} for agent .*\"",
"r\"The {package} package .* from agent .* is vulnerable to {cve}\"",
"r\"The {feed_source} found a total of {expected_vulnerabilities_number} potential vulnerabilities for agent .*\""
],
"tags": [
null
],
"name": "test_vulnerabilities_report",
"inputs": [
"scan_nvd_configuration-WINDOWS10",
"scan_nvd_configuration-RHEL8",
"scan_nvd_configuration-RHEL7",
"scan_nvd_configuration-RHEL6",
"scan_nvd_configuration-RHEL5",
"scan_nvd_configuration-BIONIC",
"scan_nvd_configuration-XENIAL",
"scan_nvd_configuration-TRUSTY",
"scan_nvd_configuration-BUSTER",
"scan_nvd_configuration-STRETCH",
"scan_nvd_configuration-MAC0",
"scan_nvd_configuration-MAC1",
"scan_nvd_configuration-MAC2"
]
}
]
}
test_vulnerability_detector/test_scan_results/test_redhat_inventory_redhat_feed.py
'''
copyright:
Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the terms of GPLv2
type:
integration
brief:
These tests will mock RedHat systems and insert custom vulnerabilities and vulnerable packages to check
if Vulnerability Detector generates the alerts from the RedHat provider feed.
tier:
0
modules:
- vulnerability_detector
components:
- manager
path:
tests/integration/test_vulnerability_detector/test_scan_results/test_redhat_inventory_redhat_feed.py
daemons:
- wazuh-modulesd
- wazuh-db
os_platform:
- linux
os_version:
- Amazon Linux 1
- Amazon Linux 2
- Arch Linux
- CentOS 6
- CentOS 7
- CentOS 8
- Debian Buster
- Debian Stretch
- Debian Jessie
- Debian Wheezy
- Red Hat 6
- Red Hat 7
- Red Hat 8
- Ubuntu Bionic
- Ubuntu Trusty
- Ubuntu Xenial
- Windows 7
- Windows 8
- Windows 10
- Windows Server 2003
- Windows Server 2012
- Windows Server 2016
references:
- https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html
pytest_args:
-
tags:
- oval
'''
import os
import pytest
from wazuh_testing.tools import LOG_FILE_PATH
from wazuh_testing.tools.configuration import load_wazuh_configurations
from wazuh_testing.tools.monitoring import FileMonitor
from wazuh_testing.tools import file
from wazuh_testing import vulnerability_detector as vd
# Marks
pytestmark = pytest.mark.tier(level=0)
# Variables
current_test_path = os.path.dirname(os.path.realpath(__file__))
test_data_path = os.path.join(current_test_path, 'data')
configurations_path = os.path.join(test_data_path, 'wazuh_redhat_inventory.yaml')
redhat_vulnerabilities_data_path = os.path.join(test_data_path, 'redhat_vulnerabilities.json')
wazuh_log_monitor = FileMonitor(LOG_FILE_PATH)
SCAN_TIMEOUT = 40
# Set configuration
parameters = [{'NVD_JSON_PATH': os.path.join(test_data_path, vd.REAL_NVD_FEED)}]
ids = ['redhat_scan_configuration']
# Read JSON data template
redhat_vulnerabilities = file.read_json_file(redhat_vulnerabilities_data_path)
redhat_data_ids = [system['target'] for system in redhat_vulnerabilities]
# Configuration data
configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters)
# Fixtures
@pytest.fixture(scope='module', params=configurations, ids=ids)
def get_configuration(request):
"""Get configurations from the module."""
return request.param
@pytest.fixture(scope='module', params=redhat_vulnerabilities, ids=redhat_data_ids)
@vd.mock_cve_db
def mock_vulnerability_scan(request, mock_agent):
"""
It allows to mock the vulnerability scan inserting custom packages, feeds and changing the host system
"""
# Mock system
vd.modify_system(agent_id=mock_agent, os_name=request.param['os_name'], os_major=request.param['os_major'],
os_minor=request.param['os_minor'], name=vd.MOCKED_AGENT_NAME)
# Add custom vulnerabilities and feeds
for vulnerability in request.param['vulnerabilities']:
vd.insert_package(**vulnerability['package'], agent=mock_agent, source=vulnerability['package']['name'])
vd.insert_vulnerability(**vulnerability['cve'], package=vulnerability['package']['name'],
target=request.param['target'])
def test_redhat_vulnerabilities_report(get_configuration, configure_environment, restart_modulesd, check_cve_db,
mock_vulnerability_scan):
'''
description:
Check if inserted vulnerable packages are reported by vulnerability detector using the redhat feed.
wazuh_min_version:
4.2
parameters:
- get_configuration:
type: fixture
brief: Get configurations from the module.
- configure_environment:
type: fixture
brief: Configure a custom environment for testing.
- restart_modulesd:
type: fixture
brief: Restart the `wazuh-modulesd` daemon.
- check_cve_db:
type: fixture
brief: Check if the CVE database exists and its tables are created.
- mock_vulnerability_scan:
type: fixture
brief: Decorator used in any function that needs to mock cve.db.
assertions:
- Verify that the number of OVAL vulnerabilities detected is as expected.
- Verify that the redhat feed vulnerabilities of the inserted packages are detected.
input_description:
Several vulnerable packages with their respective vulnerabilities are inserted into
a simulated agent to generate the corresponding alerts from redhat OVAL feed.
expected_output:
- r"The {feed_source} found a total of '{expected_vulnerabilities_number}'
potential vulnerabilities for agent .*"
- r"The '{package}' package .* from agent .* is vulnerable to '{cve}'"
tags:
-
'''
vulnerabilities_number = len(mock_vulnerability_scan['vulnerabilities'])
# Check that the number of OVAL vulnerabilities is the expected
vd.check_detected_vulnerabilities_number(wazuh_log_monitor=wazuh_log_monitor,
expected_vulnerabilities_number=vulnerabilities_number,
feed_source='OVAL', timeout=vd.VULN_DETECTOR_SCAN_TIMEOUT)
# Check the vulnerabilities of packages inserted
for item in mock_vulnerability_scan['vulnerabilities']:
vd.check_vulnerability_scan_event(wazuh_log_monitor=wazuh_log_monitor, package=item['package']['name'],
cve=item['cve']['cveid'])
vd.check_if_modulesd_is_running() test_redhat_inventory_redhat_feed.yaml
brief: These tests will mock RedHat systems and insert custom vulnerabilities and
vulnerable packages to check if Vulnerability Detector generates the alerts from
the RedHat provider feed.
components:
- manager
copyright: 'Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the
terms of GPLv2'
daemons:
- wazuh-modulesd
- wazuh-db
group_id: 4
id: 10
modules:
- vulnerability_detector
name: test_redhat_inventory_redhat_feed.py
os_platform:
- linux
os_version:
- Amazon Linux 1
- Amazon Linux 2
- Arch Linux
- CentOS 6
- CentOS 7
- CentOS 8
- Debian Buster
- Debian Stretch
- Debian Jessie
- Debian Wheezy
- Red Hat 6
- Red Hat 7
- Red Hat 8
- Ubuntu Bionic
- Ubuntu Trusty
- Ubuntu Xenial
- Windows 7
- Windows 8
- Windows 10
- Windows Server 2003
- Windows Server 2012
- Windows Server 2016
path: tests/integration/test_vulnerability_detector/test_scan_results/test_redhat_inventory_redhat_feed.py
pytest_args:
- null
references:
- https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html
tags:
- oval
tests:
- assertions:
- Verify that the number of OVAL vulnerabilities detected is as expected.
- Verify that the redhat feed vulnerabilities of the inserted packages are detected.
description: Check if inserted vulnerable packages are reported by vulnerability
detector using the redhat feed.
expected_output:
- r"The {feed_source} found a total of '{expected_vulnerabilities_number}' potential
vulnerabilities for agent .*"
- r"The '{package}' package .* from agent .* is vulnerable to '{cve}'"
input_description: Several vulnerable packages with their respective vulnerabilities
are inserted into a simulated agent to generate the corresponding alerts from
redhat OVAL feed.
inputs:
- redhat_scan_configuration-RHEL8
- redhat_scan_configuration-RHEL7
- redhat_scan_configuration-RHEL6
- redhat_scan_configuration-RHEL5
name: test_redhat_vulnerabilities_report
parameters:
- get_configuration:
brief: Get configurations from the module.
type: fixture
- configure_environment:
brief: Configure a custom environment for testing.
type: fixture
- restart_modulesd:
brief: Restart the `wazuh-modulesd` daemon.
type: fixture
- check_cve_db:
brief: Check if the CVE database exists and its tables are created.
type: fixture
- mock_vulnerability_scan:
brief: Decorator used in any function that needs to mock cve.db.
type: fixture
tags:
- null
wazuh_min_version: 4.2
tier: 0
type: integration test_redhat_inventory_redhat_feed.json
{
"copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. <[email protected]>.\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2",
"type": "integration",
"brief": "These tests will mock RedHat systems and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the alerts from the RedHat provider feed.",
"tier": 0,
"modules": [
"vulnerability_detector"
],
"components": [
"manager"
],
"path": "tests/integration/test_vulnerability_detector/test_scan_results/test_redhat_inventory_redhat_feed.py",
"daemons": [
"wazuh-modulesd",
"wazuh-db"
],
"os_platform": [
"linux"
],
"os_version": [
"Amazon Linux 1",
"Amazon Linux 2",
"Arch Linux",
"CentOS 6",
"CentOS 7",
"CentOS 8",
"Debian Buster",
"Debian Stretch",
"Debian Jessie",
"Debian Wheezy",
"Red Hat 6",
"Red Hat 7",
"Red Hat 8",
"Ubuntu Bionic",
"Ubuntu Trusty",
"Ubuntu Xenial",
"Windows 7",
"Windows 8",
"Windows 10",
"Windows Server 2003",
"Windows Server 2012",
"Windows Server 2016"
],
"references": [
"https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html"
],
"pytest_args": [
null
],
"tags": [
"oval"
],
"name": "test_redhat_inventory_redhat_feed.py",
"id": 10,
"group_id": 4,
"tests": [
{
"description": "Check if inserted vulnerable packages are reported by vulnerability detector using the redhat feed.",
"wazuh_min_version": 4.2,
"parameters": [
{
"get_configuration": {
"type": "fixture",
"brief": "Get configurations from the module."
}
},
{
"configure_environment": {
"type": "fixture",
"brief": "Configure a custom environment for testing."
}
},
{
"restart_modulesd": {
"type": "fixture",
"brief": "Restart the `wazuh-modulesd` daemon."
}
},
{
"check_cve_db": {
"type": "fixture",
"brief": "Check if the CVE database exists and its tables are created."
}
},
{
"mock_vulnerability_scan": {
"type": "fixture",
"brief": "Decorator used in any function that needs to mock cve.db."
}
}
],
"assertions": [
"Verify that the number of OVAL vulnerabilities detected is as expected.",
"Verify that the redhat feed vulnerabilities of the inserted packages are detected."
],
"input_description": "Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from redhat OVAL feed.",
"expected_output": [
"r\"The {feed_source} found a total of '{expected_vulnerabilities_number}' potential vulnerabilities for agent .*\"",
"r\"The '{package}' package .* from agent .* is vulnerable to '{cve}'\""
],
"tags": [
null
],
"name": "test_redhat_vulnerabilities_report",
"inputs": [
"redhat_scan_configuration-RHEL8",
"redhat_scan_configuration-RHEL7",
"redhat_scan_configuration-RHEL6",
"redhat_scan_configuration-RHEL5"
]
}
]
}
test_vulnerability_detector/test_general_settings/test_general_settings_ignore_time.py
'''
copyright:
Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the terms of GPLv2
type:
integration
brief:
The tests will modify the value of `ignore_time` tag in `ossec.conf`, set different times
and check the result in `ossec.log`. This tag sets the time during which vulnerabilities
that have already been alerted will be ignored.
tier:
0
modules:
- vulnerability_detector
components:
- manager
path:
tests/integration/test_vulnerability_detector/test_general_settings/test_general_settings_ignore_time.py
daemons:
- wazuh-modulesd
- wazuh-db
os_platform:
- linux
os_version:
- Amazon Linux 1
- Amazon Linux 2
- Arch Linux
- CentOS 6
- CentOS 7
- CentOS 8
- Debian Buster
- Debian Stretch
- Debian Jessie
- Debian Wheezy
- Red Hat 6
- Red Hat 7
- Red Hat 8
- Ubuntu Bionic
- Ubuntu Trusty
- Ubuntu Xenial
references:
- https://documentation.wazuh.com/current/user-manual/reference/ossec-conf/vuln-detector.html#ignore-time
pytest_args:
-
tags:
- oval
'''
import os
from datetime import timedelta
import pytest
import wazuh_testing.vulnerability_detector as vd
from wazuh_testing.fim import check_time_travel
from wazuh_testing.tools import LOG_FILE_PATH
from wazuh_testing.tools.configuration import load_wazuh_configurations
from wazuh_testing.tools.monitoring import FileMonitor
from wazuh_testing.tools.services import control_service
from wazuh_testing.tools.time import time_to_seconds
# Marks
pytestmark = pytest.mark.tier(level=0)
# variables
test_path = os.path.dirname(os.path.realpath(__file__))
test_data_path = os.path.join(test_path, 'data')
nvd_feed_path = os.path.join(os.path.dirname(test_path), 'test_scan_results', 'data', vd.REAL_NVD_FEED)
configurations_path = os.path.join(test_data_path, 'wazuh_ignore_time.yaml')
wazuh_log_monitor = FileMonitor(LOG_FILE_PATH)
callback_string_vulnerability = f"'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'"
parameters = [{'IGNORE_TIME': '3600s', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path},
{'IGNORE_TIME': '60m', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path},
{'IGNORE_TIME': '1h', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path}]
metadata = [{'ignore_time': '3600s', 'timeout': 30, 'jumps': 2, 'interval': '5s'},
{'ignore_time': '60m', 'timeout': 30, 'jumps': 2, 'interval': '5s'},
{'ignore_time': '1h', 'timeout': 30, 'jumps': 2, 'interval': '5s'}]
# Configuration data
configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters, metadata=metadata)
# fixtures
@pytest.fixture(scope='module', params=configurations)
def get_configuration(request):
"""Get configurations from the module."""
return request.param
@pytest.fixture(scope='module')
def prepare_agent(mock_agent):
control_service('stop', daemon='wazuh-db')
vd.clean_vd_tables(mock_agent)
vd.insert_package(agent=mock_agent, vendor="Red Hat, Inc.")
vd.insert_vulnerability()
control_service('start', daemon='wazuh-db')
yield mock_agent
def test_ignore_time(get_configuration, configure_environment, restart_modulesd, prepare_agent,
custom_callback_vulnerability=vd.make_vuln_callback(callback_string_vulnerability)):
'''
description:
Check if an alert is not fired during the `ignore_time` interval.
wazuh_min_version:
4.2
parameters:
- get_configuration:
type: fixture
brief: Get configurations from the module.
- configure_environment:
type: fixture
brief: Configure a custom environment for testing.
- restart_modulesd:
type: fixture
brief: Restart the `wazuh-modulesd` daemon.
- prepare_agent:
type: fixture
brief: Create a mock agent with test packages and vulnerabilities.
- custom_callback_vulnerability:
type: lambda
brief: Custom vulnerability detector callback function from a text pattern.
assertions:
- Verify that vulnerabilities alerts are not generated before the `ignore_time` time set.
- Verify that vulnerabilities alerts are generated after the `ignore_time` time set.
input_description:
Several time intervals are used together with a test agent with
a vulnerable package and its respective vulnerability.
expected_output:
- r"'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'"
tags:
- time_travel
'''
control_service('stop', daemon='wazuh-modulesd')
control_service('stop', daemon='wazuh-db')
vd.update_last_scan(agent=prepare_agent)
control_service('start', daemon='wazuh-db')
control_service('start', daemon='wazuh-modulesd')
ignore_time = get_configuration['metadata']['ignore_time']
jumps = get_configuration['metadata']['jumps']
seconds_to_travel = time_to_seconds(ignore_time) / jumps
# Check for initial alert
wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'],
callback=custom_callback_vulnerability,
error_message='Alert did not appear at the start of the test')
# Check if alert does not appear during ignore time
for _ in range(1, jumps):
check_time_travel(time_travel=True, interval=timedelta(seconds=seconds_to_travel))
with pytest.raises(TimeoutError):
wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'],
callback=custom_callback_vulnerability)
raise AttributeError('Alert appeared before ignore_time was finished')
# Travel to the time set in ignore time
check_time_travel(time_travel=True, interval=timedelta(seconds=seconds_to_travel))
# Check for final alert
wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'],
callback=custom_callback_vulnerability,
error_message='Alert did not appear at the end of the test') test_general_settings_ignore_time.yaml
brief: The tests will modify the value of `ignore_time` tag in `ossec.conf`, set different
times and check the result in `ossec.log`. This tag sets the time during which vulnerabilities
that have already been alerted will be ignored.
components:
- manager
copyright: 'Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the
terms of GPLv2'
daemons:
- wazuh-modulesd
- wazuh-db
group_id: 0
id: 2
modules:
- vulnerability_detector
name: test_general_settings_ignore_time.py
os_platform:
- linux
os_version:
- Amazon Linux 1
- Amazon Linux 2
- Arch Linux
- CentOS 6
- CentOS 7
- CentOS 8
- Debian Buster
- Debian Stretch
- Debian Jessie
- Debian Wheezy
- Red Hat 6
- Red Hat 7
- Red Hat 8
- Ubuntu Bionic
- Ubuntu Trusty
- Ubuntu Xenial
path: tests/integration/test_vulnerability_detector/test_general_settings/test_general_settings_ignore_time.py
pytest_args:
- null
references:
- https://documentation.wazuh.com/current/user-manual/reference/ossec-conf/vuln-detector.html#ignore-time
tags:
- oval
tests:
- assertions:
- Verify that vulnerabilities alerts are not generated before the `ignore_time`
time set.
- Verify that vulnerabilities alerts are generated after the `ignore_time` time
set.
description: Check if an alert is not fired during the `ignore_time` interval.
expected_output:
- r"'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'"
input_description: Several time intervals are used together with a test agent with
a vulnerable package and its respective vulnerability.
inputs:
- get_configuration0
- get_configuration1
- get_configuration2
name: test_ignore_time
parameters:
- get_configuration:
brief: Get configurations from the module.
type: fixture
- configure_environment:
brief: Configure a custom environment for testing.
type: fixture
- restart_modulesd:
brief: Restart the `wazuh-modulesd` daemon.
type: fixture
- prepare_agent:
brief: Create a mock agent with test packages and vulnerabilities.
type: fixture
- custom_callback_vulnerability:
brief: Custom vulnerability detector callback function from a text pattern.
type: lambda
tags:
- time_travel
wazuh_min_version: 4.2
tier: 0
type: integration test_general_settings_ignore_time.json
{
"copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. <[email protected]>.\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2",
"type": "integration",
"brief": "The tests will modify the value of `ignore_time` tag in `ossec.conf`, set different times and check the result in `ossec.log`. This tag sets the time during which vulnerabilities that have already been alerted will be ignored.",
"tier": 0,
"modules": [
"vulnerability_detector"
],
"components": [
"manager"
],
"path": "tests/integration/test_vulnerability_detector/test_general_settings/test_general_settings_ignore_time.py",
"daemons": [
"wazuh-modulesd",
"wazuh-db"
],
"os_platform": [
"linux"
],
"os_version": [
"Amazon Linux 1",
"Amazon Linux 2",
"Arch Linux",
"CentOS 6",
"CentOS 7",
"CentOS 8",
"Debian Buster",
"Debian Stretch",
"Debian Jessie",
"Debian Wheezy",
"Red Hat 6",
"Red Hat 7",
"Red Hat 8",
"Ubuntu Bionic",
"Ubuntu Trusty",
"Ubuntu Xenial"
],
"references": [
"https://documentation.wazuh.com/current/user-manual/reference/ossec-conf/vuln-detector.html#ignore-time"
],
"pytest_args": [
null
],
"tags": [
"oval"
],
"name": "test_general_settings_ignore_time.py",
"id": 2,
"group_id": 0,
"tests": [
{
"description": "Check if an alert is not fired during the `ignore_time` interval.",
"wazuh_min_version": 4.2,
"parameters": [
{
"get_configuration": {
"type": "fixture",
"brief": "Get configurations from the module."
}
},
{
"configure_environment": {
"type": "fixture",
"brief": "Configure a custom environment for testing."
}
},
{
"restart_modulesd": {
"type": "fixture",
"brief": "Restart the `wazuh-modulesd` daemon."
}
},
{
"prepare_agent": {
"type": "fixture",
"brief": "Create a mock agent with test packages and vulnerabilities."
}
},
{
"custom_callback_vulnerability": {
"type": "lambda",
"brief": "Custom vulnerability detector callback function from a text pattern."
}
}
],
"assertions": [
"Verify that vulnerabilities alerts are not generated before the `ignore_time` time set.",
"Verify that vulnerabilities alerts are generated after the `ignore_time` time set."
],
"input_description": "Several time intervals are used together with a test agent with a vulnerable package and its respective vulnerability.",
"expected_output": [
"r\"'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'\""
],
"tags": [
"time_travel"
],
"name": "test_ignore_time",
"inputs": [
"get_configuration0",
"get_configuration1",
"get_configuration2"
]
}
]
}
test_agentd/test_agentd_reconnection.py
'''
copyright:
Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the terms of GPLv2
type:
integration
brief:
These tests will check if, during enrollment, the agent re-establishes communication with
the manager under different situations that interrupt it. The objective is to check that,
with different states in the `clients.key` file, the agent successfully
enrolls after losing connection with remoted.
tier:
0
modules:
- agentd
components:
- agent
path:
tests/integration/test_agentd/test_agentd_reconnection.py
daemons:
- wazuh-agentd
- wazuh-remoted
os_platform:
- linux
- windows
os_version:
- Amazon Linux 1
- Amazon Linux 2
- Arch Linux
- CentOS 6
- CentOS 7
- CentOS 8
- Debian Buster
- Debian Stretch
- Debian Jessie
- Debian Wheezy
- Red Hat 6
- Red Hat 7
- Red Hat 8
- Ubuntu Bionic
- Ubuntu Trusty
- Ubuntu Xenial
- Windows 7
- Windows 8
- Windows 10
- Windows Server 2003
- Windows Server 2012
- Windows Server 2016
references:
- https://documentation.wazuh.com/current/user-manual/registering/index.html#registering-wazuh-agents
- https://documentation.wazuh.com/current/user-manual/reference/tools/agent-auth.html#agent-auth
pytest_args:
-
tags:
- enrollment
- keys
'''
import os
import platform
from time import sleep
import pytest
from datetime import datetime, timedelta
from wazuh_testing.agent import CLIENT_KEYS_PATH, SERVER_CERT_PATH, SERVER_KEY_PATH
from wazuh_testing.tools import WAZUH_PATH, LOG_FILE_PATH
from wazuh_testing.tools.authd_sim import AuthdSimulator
from wazuh_testing.tools.configuration import load_wazuh_configurations
from wazuh_testing.tools.file import truncate_file
from wazuh_testing.tools.monitoring import QueueMonitor, FileMonitor
from wazuh_testing.tools.remoted_sim import RemotedSimulator
from wazuh_testing.tools.services import control_service
# Marks
pytestmark = [pytest.mark.linux, pytest.mark.win32, pytest.mark.tier(level=0), pytest.mark.agent]
test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data')
configurations_path = os.path.join(test_data_path, 'wazuh_conf.yaml')
params = [
{
'SERVER_ADDRESS': '127.0.0.1',
'REMOTED_PORT': 1514,
'PROTOCOL': 'tcp',
},
{
'SERVER_ADDRESS': '127.0.0.1',
'REMOTED_PORT': 1514,
'PROTOCOL': 'udp',
}
]
metadata = [
{'PROTOCOL': 'tcp'},
{'PROTOCOL': 'udp'}
]
config_ids = ['tcp', 'udp']
configurations = load_wazuh_configurations(configurations_path, __name__, params=params, metadata=metadata)
log_monitor_paths = []
receiver_sockets_params = []
monitored_sockets_params = []
receiver_sockets, monitored_sockets, log_monitors = None, None, None # Set in the fixtures
authd_server = AuthdSimulator(params[0]['SERVER_ADDRESS'], key_path=SERVER_KEY_PATH, cert_path=SERVER_CERT_PATH)
remoted_server = None
def teardown():
global remoted_server
if remoted_server is not None:
remoted_server.stop()
def set_debug_mode():
"""Set debug2 for agentd in local internal options file."""
if platform.system() == 'win32' or platform.system() == 'Windows':
local_int_conf_path = os.path.join(WAZUH_PATH, 'local_internal_options.conf')
debug_line = 'windows.debug=2\nagent.recv_timeout=5\n'
else:
local_int_conf_path = os.path.join(WAZUH_PATH, 'etc', 'local_internal_options.conf')
debug_line = 'agent.debug=2\nagent.recv_timeout=5\n'
with open(local_int_conf_path, 'r') as local_file_read:
lines = local_file_read.readlines()
for line in lines:
if line == debug_line:
return
with open(local_int_conf_path, 'a') as local_file_write:
local_file_write.write('\n' + debug_line)
set_debug_mode()
# fixtures
@pytest.fixture(scope="module", params=configurations, ids=config_ids)
def get_configuration(request):
"""Get configurations from the module"""
return request.param
@pytest.fixture(scope="function")
def configure_authd_server(request):
"""Initialize a simulated authd connection."""
authd_server.start()
global monitored_sockets
monitored_sockets = QueueMonitor(authd_server.queue)
authd_server.clear()
yield
authd_server.shutdown()
def start_authd():
"""Enable authd to accept connections and perform enrollments."""
authd_server.set_mode("ACCEPT")
authd_server.clear()
def stop_authd():
"""Disable authd to accept connections and perform enrollments."""
authd_server.set_mode("REJECT")
def set_authd_id():
"""Set agent id to 101 in the authd simulated connection."""
authd_server.agent_id = 101
def clean_keys():
"""Clear the agent's client.keys file."""
truncate_file(CLIENT_KEYS_PATH)
sleep(1)
def delete_keys():
"""Remove the agent's client.keys file."""
os.remove(CLIENT_KEYS_PATH)
sleep(1)
def set_keys():
"""Write to client.keys file the agent's enrollment details."""
with open(CLIENT_KEYS_PATH, 'w+') as f:
f.write("100 ubuntu-agent any TopSecret")
sleep(1)
def wait_notify(line):
"""Callback function to wait for agent checkins to the manager."""
if 'Sending keep alive:' in line:
return line
return None
def wait_enrollment(line):
"""Callback function to wait for enrollment."""
if 'Valid key received' in line:
return line
return None
def wait_enrollment_try(line):
"""Callback function to wait for enrollment attempt."""
if 'Requesting a key' in line:
return line
return None
def search_error_messages():
"""Retrieve the line of the log file where first error is found.
Returns:
str: String where the error is found or None if errors are not found.
"""
with open(LOG_FILE_PATH, 'r') as log_file:
lines = log_file.readlines()
for line in lines:
if f"ERROR:" in line:
return line
return None
# Tests
"""
This test covers the scenario of Agent starting with keys,
when misses communication with Remoted and a new enrollment is sent to Authd.
"""
def test_agentd_reconection_enrollment_with_keys(configure_authd_server, configure_environment, get_configuration):
'''
description:
Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd.
In this case, the agent starts with keys.
wazuh_min_version:
4.2
parameters:
- configure_authd_server:
type: fixture
brief: Initializes a simulated authd connection.
- configure_environment:
type: fixture
brief: Configure a custom environment for testing.
- get_configuration:
type: fixture
brief: Get configurations from the module.
assertions:
- Verify that the agent enrollment is successful.
input_description:
An IP address and port are used for the server using the `TCP` and `UDP` protocols.
expected_output:
- r"Valid key received"
- r"Sending keep alive"
tags:
- enrollment
- simulator
'''
global remoted_server
remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK',
client_keys=CLIENT_KEYS_PATH)
# Stop target Agent
control_service('stop')
# Prepare test
start_authd()
set_authd_id()
set_keys()
# Clean logs
truncate_file(LOG_FILE_PATH)
# Start target Agent
control_service('start')
# Start hearing logs
truncate_file(LOG_FILE_PATH)
log_monitor = FileMonitor(LOG_FILE_PATH)
# hearing on enrollment server
authd_server.clear()
# Wait until Agent is notifying Manager
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
# Start rejecting Agent
remoted_server.set_mode('REJECT')
# hearing on enrollment server
authd_server.clear()
# Wait until Agent asks a new key to enrollment
log_monitor.start(timeout=180, callback=wait_enrollment,
error_message="Agent never enrolled after rejecting connection!")
# Start responding to Agent
remoted_server.set_mode('CONTROLLED_ACK')
# Wait until Agent is notifying Manager
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message"
"""
This test covers the scenario of Agent starting without client.keys file
and an enrollment is sent to Authd to start communicating with Remoted
"""
def test_agentd_reconection_enrollment_no_keys_file(configure_authd_server, configure_environment, get_configuration):
'''
description:
Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd.
In this case, the agent doesn't have client.keys file.
wazuh_min_version:
4.2
parameters:
- configure_authd_server:
type: fixture
brief: Initializes a simulated authd connection.
- configure_environment:
type: fixture
brief: Configure a custom environment for testing.
- get_configuration:
type: fixture
brief: Get configurations from the module.
assertions:
- Verify that the agent enrollment is successful.
input_description:
An IP address and port are used for the server using the `TCP` and `UDP` protocols.
expected_output:
- r"Valid key received"
- r"Sending keep alive"
tags:
- enrollment
- simulator
'''
global remoted_server
remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK',
client_keys=CLIENT_KEYS_PATH)
# Stop target Agent
control_service('stop')
# Clean logs
truncate_file(LOG_FILE_PATH)
# Prepare test
start_authd()
set_authd_id()
delete_keys()
# Start target Agent
control_service('start')
# start hearing logs
log_monitor = FileMonitor(LOG_FILE_PATH)
# hearing on enrollment server
authd_server.clear()
# Wait until Agent asks keys for the first time
log_monitor.start(timeout=50, callback=wait_enrollment,
error_message="Agent never enrolled for the first time.")
# Wait until Agent is notifing Manager
log_monitor.start(timeout=50, callback=wait_notify, error_message="Notify message from agent was never sent!")
# Start rejecting Agent
remoted_server.set_mode('REJECT')
# hearing on enrollment server
authd_server.clear()
# Wait until Agent asks a new key to enrollment
log_monitor.start(timeout=180, callback=wait_enrollment,
error_message="Agent never enrolled after rejecting connection!")
# Start responding to Agent
remoted_server.set_mode('CONTROLLED_ACK')
# Wait until Agent is notifing Manager
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message"
"""
This test covers the scenario of Agent starting without keys in client.keys file
and an enrollment is sent to Authd to start communicating with Remoted
"""
def test_agentd_reconection_enrollment_no_keys(configure_authd_server, configure_environment, get_configuration):
'''
description:
Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd.
In this case, the agent has its client.keys file empty.
wazuh_min_version:
4.2
parameters:
- configure_authd_server:
type: fixture
brief: Initializes a simulated authd connection.
- configure_environment:
type: fixture
brief: Configure a custom environment for testing.
- get_configuration:
type: fixture
brief: Get configurations from the module.
assertions:
- Verify that the agent enrollment is successful.
input_description:
An IP address and port are used for the server using the `TCP` and `UDP` protocols.
expected_output:
- r"Valid key received"
- r"Sending keep alive"
tags:
- enrollment
- simulator
'''
global remoted_server
remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK',
client_keys=CLIENT_KEYS_PATH)
# Stop target Agent
control_service('stop')
# Clean logs
truncate_file(LOG_FILE_PATH)
# Prepare test
start_authd()
set_authd_id()
clean_keys()
# Start target Agent
control_service('start')
# start hearing logs
log_monitor = FileMonitor(LOG_FILE_PATH)
# hearing on enrollment server
authd_server.clear()
# Wait until Agent asks keys for the first time
log_monitor.start(timeout=120, callback=wait_enrollment,
error_message="Agent never enrolled for the first time rejecting connection!")
# Wait until Agent is notifying Manager
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message"
# Start rejecting Agent
remoted_server.set_mode('REJECT')
# hearing on enrollment server
authd_server.clear()
# Wait until Agent asks a new key to enrollment
log_monitor.start(timeout=180, callback=wait_enrollment,
error_message="Agent never enrolled after rejecting connection!")
# Start responding to Agent
remoted_server.set_mode('CONTROLLED_ACK')
# Wait until Agent is notifying Manager
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message"
"""
This test covers and check the scenario of Agent starting without keys
and multiple retries are required until the new key is obtained to start communicating with Remoted
"""
def test_agentd_initial_enrollment_retries(configure_authd_server, configure_environment, get_configuration):
'''
description:
Check how the agent behaves when it makes multiple enrollment attempts before getting its key.
For this, the agent starts without keys and perform multiple enrollment requests
to authd before getting the new key to communicate with remoted.
wazuh_min_version:
4.2
parameters:
- configure_authd_server:
type: fixture
brief: Initializes a simulated authd connection.
- configure_environment:
type: fixture
brief: Configure a custom environment for testing.
- get_configuration:
type: fixture
brief: Get configurations from the module.
assertions:
- Verify that the agent enrollment is successful.
input_description:
An IP address and port are used for the server using the `TCP` and `UDP` protocols.
expected_output:
- r"Requesting a key"
- r"Valid key received"
- r"Sending keep alive"
tags:
- enrollment
- simulator
'''
global remoted_server
remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK',
client_keys=CLIENT_KEYS_PATH)
# Stop target Agent
control_service('stop')
# Clean logs
truncate_file(LOG_FILE_PATH)
# Preapre test
stop_authd()
set_authd_id()
clean_keys()
# Start whole Agent service to check other daemons status after initialization
control_service('start')
# Start hearing logs
log_monitor = FileMonitor(LOG_FILE_PATH)
start_time = datetime.now()
# Check for unsuccessful enrollment retries in Agentd initialization
retries = 0
while retries < 4:
retries += 1
log_monitor.start(timeout=retries * 5 + 20, callback=wait_enrollment_try,
error_message="Enrollment retry was not sent!")
stop_time = datetime.now()
expected_time = start_time + timedelta(seconds=retries * 5 - 2)
# Check if delay was applied
assert stop_time > expected_time, "Retries too quick"
# Enable authd
authd_server.clear()
authd_server.set_mode("ACCEPT")
# Wait successfully enrollment
# Wait succesfull enrollment
log_monitor.start(timeout=70, callback=wait_enrollment, error_message="No succesful enrollment after reties!")
# Wait until Agent is notifying Manager
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!")
# Check if no Wazuh module stopped due to Agentd Initialization
with open(LOG_FILE_PATH) as log_file:
log_lines = log_file.read().splitlines()
for line in log_lines:
if "Unable to access queue:" in line:
raise AssertionError("A Wazuh module stopped because of Agentd initialization!")
"""
This test covers and check the scenario of Agent starting with keys but Remoted is not reachable during some seconds
and multiple connection retries are required prior to requesting a new enrollment
"""
def test_agentd_connection_retries_pre_enrollment(configure_authd_server, configure_environment, get_configuration):
'''
description:
Check how the agent behaves when Remoted is not available and performs multiple connection attempts to it.
For this, the agent starts with keys but `remoted` is not available for several seconds,
then the agent performs multiple connection retries before requesting a new enrollment.
wazuh_min_version:
4.2
parameters:
- configure_authd_server:
type: fixture
brief: Initializes a simulated authd connection.
- configure_environment:
type: fixture
brief: Configure a custom environment for testing.
- get_configuration:
type: fixture
brief: Get configurations from the module.
assertions:
- Verify that the agent enrollment is successful.
input_description:
An IP address and port are used for the server using the `TCP` and `UDP` protocols.
expected_output:
- r"Sending keep alive"
tags:
- enrollment
- simulator
'''
global remoted_server
REMOTED_KEYS_SYNC_TIME = 10
# Start Remoted mock
remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], client_keys=CLIENT_KEYS_PATH)
# Stop target Agent
control_service('stop')
# Clean logs
truncate_file(LOG_FILE_PATH)
# Prepare test
stop_authd()
set_keys()
# Start hearing logs
log_monitor = FileMonitor(LOG_FILE_PATH)
# Start whole Agent service to check other daemons status after initialization
control_service('start')
# Simulate time of Remoted to synchronize keys by waiting previous to start responding
remoted_server.set_mode('CONTROLLED_ACK')
sleep(REMOTED_KEYS_SYNC_TIME)
# Check Agentd is finally communicating
log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") test_agentd_reconnection.yaml
brief: These tests will check if, during enrollment, the agent re-establishes communication
with the manager under different situations that interrupt it. The objective is
to check that, with different states in the `clients.key` file, the agent successfully
enrolls after losing connection with remoted.
components:
- agent
copyright: 'Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the
terms of GPLv2'
daemons:
- wazuh-agentd
- wazuh-remoted
group_id: 14
id: 19
modules:
- agentd
name: test_agentd_reconnection.py
os_platform:
- linux
- windows
os_version:
- Amazon Linux 1
- Amazon Linux 2
- Arch Linux
- CentOS 6
- CentOS 7
- CentOS 8
- Debian Buster
- Debian Stretch
- Debian Jessie
- Debian Wheezy
- Red Hat 6
- Red Hat 7
- Red Hat 8
- Ubuntu Bionic
- Ubuntu Trusty
- Ubuntu Xenial
- Windows 7
- Windows 8
- Windows 10
- Windows Server 2003
- Windows Server 2012
- Windows Server 2016
path: tests/integration/test_agentd/test_agentd_reconnection.py
pytest_args:
- null
references:
- https://documentation.wazuh.com/current/user-manual/registering/index.html#registering-wazuh-agents
- https://documentation.wazuh.com/current/user-manual/reference/tools/agent-auth.html#agent-auth
tags:
- enrollment
- keys
tests:
- assertions:
- Verify that the agent enrollment is successful.
description: Check how the agent behaves when losing communication with remoted
and a new enrollment is sent to authd. In this case, the agent starts with keys.
expected_output:
- r"Valid key received"
- r"Sending keep alive"
input_description: An IP address and port are used for the server using the `TCP`
and `UDP` protocols.
inputs:
- tcp
- udp
name: test_agentd_reconection_enrollment_with_keys
parameters:
- configure_authd_server:
brief: Initializes a simulated authd connection.
type: fixture
- configure_environment:
brief: Configure a custom environment for testing.
type: fixture
- get_configuration:
brief: Get configurations from the module.
type: fixture
tags:
- enrollment
- simulator
wazuh_min_version: 4.2
- assertions:
- Verify that the agent enrollment is successful.
description: Check how the agent behaves when losing communication with remoted
and a new enrollment is sent to authd. In this case, the agent doesn't have client.keys
file.
expected_output:
- r"Valid key received"
- r"Sending keep alive"
input_description: An IP address and port are used for the server using the `TCP`
and `UDP` protocols.
inputs:
- tcp
- udp
name: test_agentd_reconection_enrollment_no_keys_file
parameters:
- configure_authd_server:
brief: Initializes a simulated authd connection.
type: fixture
- configure_environment:
brief: Configure a custom environment for testing.
type: fixture
- get_configuration:
brief: Get configurations from the module.
type: fixture
tags:
- enrollment
- simulator
wazuh_min_version: 4.2
- assertions:
- Verify that the agent enrollment is successful.
description: Check how the agent behaves when losing communication with remoted
and a new enrollment is sent to authd. In this case, the agent has its client.keys
file empty.
expected_output:
- r"Valid key received"
- r"Sending keep alive"
input_description: An IP address and port are used for the server using the `TCP`
and `UDP` protocols.
inputs:
- tcp
- udp
name: test_agentd_reconection_enrollment_no_keys
parameters:
- configure_authd_server:
brief: Initializes a simulated authd connection.
type: fixture
- configure_environment:
brief: Configure a custom environment for testing.
type: fixture
- get_configuration:
brief: Get configurations from the module.
type: fixture
tags:
- enrollment
- simulator
wazuh_min_version: 4.2
- assertions:
- Verify that the agent enrollment is successful.
description: Check how the agent behaves when it makes multiple enrollment attempts
before getting its key. For this, the agent starts without keys and perform multiple
enrollment requests to authd before getting the new key to communicate with remoted.
expected_output:
- r"Requesting a key"
- r"Valid key received"
- r"Sending keep alive"
input_description: An IP address and port are used for the server using the `TCP`
and `UDP` protocols.
inputs:
- tcp
- udp
name: test_agentd_initial_enrollment_retries
parameters:
- configure_authd_server:
brief: Initializes a simulated authd connection.
type: fixture
- configure_environment:
brief: Configure a custom environment for testing.
type: fixture
- get_configuration:
brief: Get configurations from the module.
type: fixture
tags:
- enrollment
- simulator
wazuh_min_version: 4.2
- assertions:
- Verify that the agent enrollment is successful.
description: Check how the agent behaves when Remoted is not available and performs
multiple connection attempts to it. For this, the agent starts with keys but `remoted`
is not available for several seconds, then the agent performs multiple connection
retries before requesting a new enrollment.
expected_output:
- r"Sending keep alive"
input_description: An IP address and port are used for the server using the `TCP`
and `UDP` protocols.
inputs:
- tcp
- udp
name: test_agentd_connection_retries_pre_enrollment
parameters:
- configure_authd_server:
brief: Initializes a simulated authd connection.
type: fixture
- configure_environment:
brief: Configure a custom environment for testing.
type: fixture
- get_configuration:
brief: Get configurations from the module.
type: fixture
tags:
- enrollment
- simulator
wazuh_min_version: 4.2
tier: 0
type: integration test_agentd_reconnection.json
{
"copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. <[email protected]>.\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2",
"type": "integration",
"brief": "These tests will check if, during enrollment, the agent re-establishes communication with the manager under different situations that interrupt it. The objective is to check that, with different states in the `clients.key` file, the agent successfully enrolls after losing connection with remoted.",
"tier": 0,
"modules": [
"agentd"
],
"components": [
"agent"
],
"path": "tests/integration/test_agentd/test_agentd_reconnection.py",
"daemons": [
"wazuh-agentd",
"wazuh-remoted"
],
"os_platform": [
"linux",
"windows"
],
"os_version": [
"Amazon Linux 1",
"Amazon Linux 2",
"Arch Linux",
"CentOS 6",
"CentOS 7",
"CentOS 8",
"Debian Buster",
"Debian Stretch",
"Debian Jessie",
"Debian Wheezy",
"Red Hat 6",
"Red Hat 7",
"Red Hat 8",
"Ubuntu Bionic",
"Ubuntu Trusty",
"Ubuntu Xenial",
"Windows 7",
"Windows 8",
"Windows 10",
"Windows Server 2003",
"Windows Server 2012",
"Windows Server 2016"
],
"references": [
"https://documentation.wazuh.com/current/user-manual/registering/index.html#registering-wazuh-agents",
"https://documentation.wazuh.com/current/user-manual/reference/tools/agent-auth.html#agent-auth"
],
"pytest_args": [
null
],
"tags": [
"enrollment",
"keys"
],
"name": "test_agentd_reconnection.py",
"id": 19,
"group_id": 14,
"tests": [
{
"description": "Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent starts with keys.",
"wazuh_min_version": 4.2,
"parameters": [
{
"configure_authd_server": {
"type": "fixture",
"brief": "Initializes a simulated authd connection."
}
},
{
"configure_environment": {
"type": "fixture",
"brief": "Configure a custom environment for testing."
}
},
{
"get_configuration": {
"type": "fixture",
"brief": "Get configurations from the module."
}
}
],
"assertions": [
"Verify that the agent enrollment is successful."
],
"input_description": "An IP address and port are used for the server using the `TCP` and `UDP` protocols.",
"expected_output": [
"r\"Valid key received\"",
"r\"Sending keep alive\""
],
"tags": [
"enrollment",
"simulator"
],
"name": "test_agentd_reconection_enrollment_with_keys",
"inputs": [
"tcp",
"udp"
]
},
{
"description": "Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent doesn't have client.keys file.",
"wazuh_min_version": 4.2,
"parameters": [
{
"configure_authd_server": {
"type": "fixture",
"brief": "Initializes a simulated authd connection."
}
},
{
"configure_environment": {
"type": "fixture",
"brief": "Configure a custom environment for testing."
}
},
{
"get_configuration": {
"type": "fixture",
"brief": "Get configurations from the module."
}
}
],
"assertions": [
"Verify that the agent enrollment is successful."
],
"input_description": "An IP address and port are used for the server using the `TCP` and `UDP` protocols.",
"expected_output": [
"r\"Valid key received\"",
"r\"Sending keep alive\""
],
"tags": [
"enrollment",
"simulator"
],
"name": "test_agentd_reconection_enrollment_no_keys_file",
"inputs": [
"tcp",
"udp"
]
},
{
"description": "Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent has its client.keys file empty.",
"wazuh_min_version": 4.2,
"parameters": [
{
"configure_authd_server": {
"type": "fixture",
"brief": "Initializes a simulated authd connection."
}
},
{
"configure_environment": {
"type": "fixture",
"brief": "Configure a custom environment for testing."
}
},
{
"get_configuration": {
"type": "fixture",
"brief": "Get configurations from the module."
}
}
],
"assertions": [
"Verify that the agent enrollment is successful."
],
"input_description": "An IP address and port are used for the server using the `TCP` and `UDP` protocols.",
"expected_output": [
"r\"Valid key received\"",
"r\"Sending keep alive\""
],
"tags": [
"enrollment",
"simulator"
],
"name": "test_agentd_reconection_enrollment_no_keys",
"inputs": [
"tcp",
"udp"
]
},
{
"description": "Check how the agent behaves when it makes multiple enrollment attempts before getting its key. For this, the agent starts without keys and perform multiple enrollment requests to authd before getting the new key to communicate with remoted.",
"wazuh_min_version": 4.2,
"parameters": [
{
"configure_authd_server": {
"type": "fixture",
"brief": "Initializes a simulated authd connection."
}
},
{
"configure_environment": {
"type": "fixture",
"brief": "Configure a custom environment for testing."
}
},
{
"get_configuration": {
"type": "fixture",
"brief": "Get configurations from the module."
}
}
],
"assertions": [
"Verify that the agent enrollment is successful."
],
"input_description": "An IP address and port are used for the server using the `TCP` and `UDP` protocols.",
"expected_output": [
"r\"Requesting a key\"",
"r\"Valid key received\"",
"r\"Sending keep alive\""
],
"tags": [
"enrollment",
"simulator"
],
"name": "test_agentd_initial_enrollment_retries",
"inputs": [
"tcp",
"udp"
]
},
{
"description": "Check how the agent behaves when Remoted is not available and performs multiple connection attempts to it. For this, the agent starts with keys but `remoted` is not available for several seconds, then the agent performs multiple connection retries before requesting a new enrollment.",
"wazuh_min_version": 4.2,
"parameters": [
{
"configure_authd_server": {
"type": "fixture",
"brief": "Initializes a simulated authd connection."
}
},
{
"configure_environment": {
"type": "fixture",
"brief": "Configure a custom environment for testing."
}
},
{
"get_configuration": {
"type": "fixture",
"brief": "Get configurations from the module."
}
}
],
"assertions": [
"Verify that the agent enrollment is successful."
],
"input_description": "An IP address and port are used for the server using the `TCP` and `UDP` protocols.",
"expected_output": [
"r\"Sending keep alive\""
],
"tags": [
"enrollment",
"simulator"
],
"name": "test_agentd_connection_retries_pre_enrollment",
"inputs": [
"tcp",
"udp"
]
}
]
}
test_remoted/test_agent_communication/test_request_agent_info.py
'''
copyright:
Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the terms of GPLv2
type:
integration
brief:
Check that manager-agent communication through `wazuh-remoted` socket works as expected.
tier:
0
modules:
- remoted
components:
- manager
path:
tests/integration/test_remoted/test_agent_communication/test_request_agent_info.py
daemons:
- wazuh-agentd
- wazuh-remoted
os_platform:
- linux
os_version:
- Amazon Linux 1
- Amazon Linux 2
- Arch Linux
- CentOS 6
- CentOS 7
- CentOS 8
- Debian Buster
- Debian Stretch
- Debian Jessie
- Debian Wheezy
- Red Hat 6
- Red Hat 7
- Red Hat 8
- Ubuntu Bionic
- Ubuntu Trusty
- Ubuntu Xenial
references:
- https://documentation.wazuh.com/current/development/message-format.html
pytest_args:
-
tags:
-
'''
import os
import time
import pytest
import wazuh_testing.tools.agent_simulator as ag
from wazuh_testing.tools.configuration import load_wazuh_configurations
from wazuh_testing.tools.sockets import send_request
# Marks
pytestmark = pytest.mark.tier(level=0)
# Configuration
test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data')
configurations_path = os.path.join(test_data_path, 'wazuh_request_agent_info.yaml')
parameters = [
{'PROTOCOL': 'udp,tcp'},
{'PROTOCOL': 'tcp'},
{'PROTOCOL': 'udp'},
]
metadata = [
{'PROTOCOL': 'udp,tcp'},
{'PROTOCOL': 'tcp'},
{'PROTOCOL': 'udp'},
]
# test cases
test_case = {
'disconnected': ('agent getconfig disconnected',
'Cannot send request'),
'get_config': ('agent getconfig client',
'{"client":{"config-profile":"centos8","notify_time":10,"time-reconnect":60}}'),
'get_state': ('logcollector getstate',
'{"error":0,"data":{"global":{"start":"2021-02-26, 06:41:26","end":"2021-02-26 08:49:19"}}}')
}
configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters, metadata=metadata)
config_ids = [x['PROTOCOL'] for x in parameters]
# Utils
manager_address = "localhost"
# fixtures
@pytest.fixture(scope="module", params=configurations, ids=config_ids)
def get_configuration(request):
"""Get configurations from the module."""
return request.param
@pytest.mark.parametrize("command_request,expected_answer", test_case.values(), ids=list(test_case.keys()))
def test_request(get_configuration, configure_environment, remove_shared_files,
restart_remoted, command_request, expected_answer):
'''
description:
Writes (config/state) requests in $DIR/queue/ossec/request and check if remoted forwards it to the agent,
collects the response, and writes it in the socket or returns an error message if the queried
agent is disconnected.
wazuh_min_version:
4.2
parameters:
- get_configuration:
type: fixture
brief: Get configurations from the module.
- configure_environment:
type: fixture
brief: Configure a custom environment for testing.
- remove_shared_files:
type: fixture
brief: Temporary removes txt files from default agent group shared files.
- restart_remoted:
type: fixture
brief: Restart the wazuh-remoted daemon.
- command_request:
type: string
brief: Use case being evaluated.
- expected_answer:
type: string
brief: The expected response from the agent to the manager.
assertions:
- Verify that the request cannot be made if the agent is disconnected.
- Verify that after making a request, the expected response is received.
input_description:
Several requests that are made by the manager to the agent and their expected responses.
expected_output:
- r"Cannot send request"
- r"{"client":{"config-profile":"centos8","notify_time":10,"time-reconnect":60}}"
- r"{"error":0,"data":{"global":{"start":"2021-02-26, 06:41:26","end":"2021-02-26 08:49:19"}}}"
tags:
- simulator
'''
cfg = get_configuration['metadata']
protocols = cfg['PROTOCOL'].split(',')
agents = [ag.Agent(manager_address, "aes", os="debian8", version="4.2.0") for _ in range(len(protocols))]
for agent, protocol in zip(agents, protocols):
if "disconnected" not in command_request:
sender, injector = ag.connect(agent, manager_address, protocol)
msg_request = f'{agent.id} {command_request}'
response = send_request(msg_request)
assert expected_answer in response, "Remoted unexpected answer"
if "disconnected" not in command_request:
injector.stop_receive() test_request_agent_info.yaml
brief: Check that manager-agent communication through `wazuh-remoted` socket works
as expected.
components:
- manager
copyright: 'Copyright (C) 2015-2021, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is free software; you can redistribute it and/or modify it under the
terms of GPLv2'
daemons:
- wazuh-agentd
- wazuh-remoted
group_id: 21
id: 28
modules:
- remoted
name: test_request_agent_info.py
os_platform:
- linux
os_version:
- Amazon Linux 1
- Amazon Linux 2
- Arch Linux
- CentOS 6
- CentOS 7
- CentOS 8
- Debian Buster
- Debian Stretch
- Debian Jessie
- Debian Wheezy
- Red Hat 6
- Red Hat 7
- Red Hat 8
- Ubuntu Bionic
- Ubuntu Trusty
- Ubuntu Xenial
path: tests/integration/test_remoted/test_agent_communication/test_request_agent_info.py
pytest_args:
- null
references:
- https://documentation.wazuh.com/current/development/message-format.html
tags:
- null
tests:
- assertions:
- Verify that the request cannot be made if the agent is disconnected.
- Verify that after making a request, the expected response is received.
description: Writes (config/state) requests in $DIR/queue/ossec/request and check
if remoted forwards it to the agent, collects the response, and writes it in the
socket or returns an error message if the queried agent is disconnected.
expected_output:
- r"Cannot send request"
- r"{"client":{"config-profile":"centos8","notify_time":10,"time-reconnect":60}}"
- r"{"error":0,"data":{"global":{"start":"2021-02-26, 06:41:26","end":"2021-02-26
08:49:19"}}}"
input_description: Several requests that are made by the manager to the agent and
their expected responses.
inputs:
- udp,tcp-disconnected
- udp,tcp-get_config
- udp,tcp-get_state
- tcp-disconnected
- tcp-get_config
- tcp-get_state
- udp-disconnected
- udp-get_config
- udp-get_state
name: test_request
parameters:
- get_configuration:
brief: Get configurations from the module.
type: fixture
- configure_environment:
brief: Configure a custom environment for testing.
type: fixture
- remove_shared_files:
brief: Temporary removes txt files from default agent group shared files.
type: fixture
- restart_remoted:
brief: Restart the wazuh-remoted daemon.
type: fixture
- command_request:
brief: Use case being evaluated.
type: string
- expected_answer:
brief: The expected response from the agent to the manager.
type: string
tags:
- simulator
wazuh_min_version: 4.2
tier: 0
type: integration test_request_agent_info.json
{
"copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. <[email protected]>.\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2",
"type": "integration",
"brief": "Check that manager-agent communication through `wazuh-remoted` socket works as expected.",
"tier": 0,
"modules": [
"remoted"
],
"components": [
"manager"
],
"path": "tests/integration/test_remoted/test_agent_communication/test_request_agent_info.py",
"daemons": [
"wazuh-agentd",
"wazuh-remoted"
],
"os_platform": [
"linux"
],
"os_version": [
"Amazon Linux 1",
"Amazon Linux 2",
"Arch Linux",
"CentOS 6",
"CentOS 7",
"CentOS 8",
"Debian Buster",
"Debian Stretch",
"Debian Jessie",
"Debian Wheezy",
"Red Hat 6",
"Red Hat 7",
"Red Hat 8",
"Ubuntu Bionic",
"Ubuntu Trusty",
"Ubuntu Xenial"
],
"references": [
"https://documentation.wazuh.com/current/development/message-format.html"
],
"pytest_args": [
null
],
"tags": [
null
],
"name": "test_request_agent_info.py",
"id": 28,
"group_id": 21,
"tests": [
{
"description": "Writes (config/state) requests in $DIR/queue/ossec/request and check if remoted forwards it to the agent, collects the response, and writes it in the socket or returns an error message if the queried agent is disconnected.",
"wazuh_min_version": 4.2,
"parameters": [
{
"get_configuration": {
"type": "fixture",
"brief": "Get configurations from the module."
}
},
{
"configure_environment": {
"type": "fixture",
"brief": "Configure a custom environment for testing."
}
},
{
"remove_shared_files": {
"type": "fixture",
"brief": "Temporary removes txt files from default agent group shared files."
}
},
{
"restart_remoted": {
"type": "fixture",
"brief": "Restart the wazuh-remoted daemon."
}
},
{
"command_request": {
"type": "string",
"brief": "Use case being evaluated."
}
},
{
"expected_answer": {
"type": "string",
"brief": "The expected response from the agent to the manager."
}
}
],
"assertions": [
"Verify that the request cannot be made if the agent is disconnected.",
"Verify that after making a request, the expected response is received."
],
"input_description": "Several requests that are made by the manager to the agent and their expected responses.",
"expected_output": [
"r\"Cannot send request\"",
"r\"{\"client\":{\"config-profile\":\"centos8\",\"notify_time\":10,\"time-reconnect\":60}}\"",
"r\"{\"error\":0,\"data\":{\"global\":{\"start\":\"2021-02-26, 06:41:26\",\"end\":\"2021-02-26 08:49:19\"}}}\""
],
"tags": [
"simulator"
],
"name": "test_request",
"inputs": [
"udp,tcp-disconnected",
"udp,tcp-get_config",
"udp,tcp-get_state",
"tcp-disconnected",
"tcp-get_config",
"tcp-get_state",
"udp-disconnected",
"udp-get_config",
"udp-get_state"
]
}
]
} |
The following tests have been updated: * test_DOS_blocking_system.py * test_bruteforce_blocking_system.py * test_cache.py * test_cors.py * test_drop_privileges.py * test_experimental_features.py * test_host_port.py * test_https.py * test_jwt_token_exp_timeout.py The current scheme of the issue #1694 has been used. Update DocGenerator/config.yaml PEP-8 fixes Closes: #1806
The following tests have been updated: * test_logs.py * test_rbac_mode.py * test_request_timeout.py * test_use_only_authd.py * test_add_old_resource.py * test_admin_resources.py Minor corrections in the documentation of the remaining tests. The current scheme of the issue #1694 has been used. PEP-8 fixes. Closes: #1806
The following tests have been documentated: * test_basic_configuration_log_format.py * test_basic_configuration_out_format.py * test_basic_configuration_query.py * test_basic_configuration_reconnect_time.py * test_basic_configuration_target.py * test_keep_running.py * test_location.py * test_location_exclude.py * test_location_custom_sockets.py Minor fixes in the remaining tests. The current scheme of the issue #1694 has been used. PEP-8 fixes. Closes: #1813
The following tests have been documentated: * test_log_format_values.py * test_macos_file_status_basic.py * test_macos_file_status_predicate.py * test_macos_file_status_when_no_macos.py * test_macos_format_basic.py * test_macos_format_only_future_events.py * test_macos_format_query.py The current scheme of the issue #1694 has been used. PEP-8 fixes. Closes: #1813
The following tests have been documented: - test_invalid_decoder_syntax.py - test_invalid_rules_syntax.py - test_invalid_socket_input.py - test_invalid_session_token.py - test_rules_verbose.py - test_remove_old_session_for_inactivity.py - test_remove_old_sessions.py - test_remove_session.py - test_load_rules_decoders.py - test_altert_labels.py - test_cdb_labels.py - test_decoder_labels.py - test_rule_labels.py All of them pass the PEP8 check. The current schema of the issue #1694 has been used.
Description
After evaluating different schema proposals, the number four is the one that will be used, since it includes the elements that, in our opinion, generate the most comprehensive and detailed documentation.
This schema also includes the recommendations provided in issue #1626, such as detailed information about the operating systems where the tests will run or the
PyTest
arguments that should be used to run the modules.QA Docs schema 2.0
Module block
value:
"realtime"
brief:
Uses real-time file monitoring...
Test block
type:
fixture
brief:
Configure a custom environment for testing.
- tags: - experimental_enabled configuration: experimental_features: true
,- tags: - experimental_disabled configuration: experimental_features: false
YAML
file (conf.yaml) which includes API configuration parameters (IPs and ports).INFO: \(\d+\): The update of the Debian Buster feed finished successfully.
Sample documentation
test_vulnerability_detector/test_scan_results/test_scan_nvd_feed.py
test_scan_nvd_feed.yaml
test_scan_nvd_feed.json
test_vulnerability_detector/test_scan_results/test_redhat_inventory_redhat_feed.py
test_redhat_inventory_redhat_feed.yaml
test_redhat_inventory_redhat_feed.json
test_vulnerability_detector/test_general_settings/test_general_settings_ignore_time.py
test_general_settings_ignore_time.yaml
test_general_settings_ignore_time.json
test_agentd/test_agentd_reconnection.py
test_agentd_reconnection.yaml
test_agentd_reconnection.json
test_remoted/test_agent_communication/test_request_agent_info.py
test_request_agent_info.yaml
test_request_agent_info.json
Tasks
Test block
table.DocGenerator
.Additional tasks
Related issues
qa-docs
schema 2.0 #1796The text was updated successfully, but these errors were encountered: