Skip to content

Commit

Permalink
refac: Remove the extended behavior. #2461
Browse files Browse the repository at this point in the history
The extended behavior will be used for the full scan test refactor.
  • Loading branch information
Luis Gonzalez committed Feb 3, 2022
1 parent b300b14 commit 6a6cfc7
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 169 deletions.
42 changes: 39 additions & 3 deletions tests/integration/test_vulnerability_detector/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,23 @@
# Created by Wazuh, Inc. <[email protected]>.
# This program is free software; you can redistribute it and/or modify it under the terms of GPLv2

from subprocess import CalledProcessError

import os
import time
import pytest

from subprocess import CalledProcessError

import wazuh_testing.db_interface.agent_db as adb
from wazuh_testing.tools import LOG_FILE_PATH
from wazuh_testing.tools.file import truncate_file
from wazuh_testing.tools.monitoring import FileMonitor
from wazuh_testing.tools.services import control_service
from wazuh_testing.modules.vulnerability_detector.utils import clean_vd_tables
from wazuh_testing.mocking import create_mocked_agent, delete_mocked_agent
from wazuh_testing.db_interface import cve_db
from wazuh_testing.mocking import set_system
from wazuh_testing import db_interface as dbi
from wazuh_testing.modules import vulnerability_detector as vd
from wazuh_testing.db_interface import cve_db


@pytest.fixture(scope='module')
Expand Down Expand Up @@ -123,3 +125,37 @@ def clean_cve_tables_func():
cve_db.clean_all_tables()
yield
cve_db.clean_all_tables()


@pytest.fixture(scope='function')
def prepare_full_scan_environment(metadata):
"""Setup the initial test state.
It cleans the tables before running the test. Then, it mocks a RHEL8 system and inserts the OS information into
the `sys_osinfo` table. Besides, It inserts the packages into the agent's `sys_programs` table.
Finally, It cleans all in the teardown.
Args:
metadata (dict): Test case metadata.
"""
clean_vd_tables()

# Mock RedHat system
set_system('RHEL8')
# Update sys_osinfo
adb.insert_os_info()

for package in metadata['package_names']:
adb.insert_package(name=package, vendor=metadata['package_vendor'],
version=metadata['package_version'], source='NULL')

# Force in order to make the test more stable
adb.update_last_full_scan(0)

yield

for package in metadata['package_names']:
adb.delete_package(package)

clean_vd_tables()
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,12 @@
- general_settings
'''
import os
import time
import pytest

import wazuh_testing.vulnerability_detector as vd
import wazuh_testing.db_interface.agent_db as adb
from wazuh_testing import DB_PATH
import wazuh_testing.modules.vulnerability_detector as vd
from wazuh_testing.tools.time import time_to_seconds
from wazuh_testing.db_interface.cve_db import get_sqlite_query_result
from wazuh_testing.modules.vulnerability_detector import event_monitor as evm
from wazuh_testing.modules.vulnerability_detector.utils import clean_vd_tables
from wazuh_testing.tools import configuration
from wazuh_testing.modules.vulnerability_detector import event_monitor as evm


# Marks
Expand Down Expand Up @@ -94,45 +89,9 @@
configurations = configuration.update_configuration_template(configurations, to_modify, new_values)


@pytest.fixture
def prepare_environment(metadata):
"""Setup the initial test state.
It cleans the tables before running the test. Then, it mocks a RHEL8 system and inserts the OS information into
the `sys_osinfo` table. Besides, It inserts the packages into the agent's `sys_programs` table.
Finally, It cleans all in the teardown.
Args:
metadata (dict): Test case metadata.
"""
clean_vd_tables()
time.sleep(0.5)

# Mock RedHat system
vd.set_system('RHEL8')
# Update sys_osinfo
adb.insert_os_info()

for package in metadata['package_names']:
adb.insert_package(name=package, vendor=metadata['package_vendor'],
version=metadata['package_version'], source='NULL')

# Force in order to make the test more stable
adb.update_last_full_scan(0)

yield

for package in metadata['package_names']:
adb.delete_package(package)

clean_vd_tables()


@pytest.mark.tier(level=0)
@pytest.mark.parametrize('configuration, metadata', zip(configurations, configuration_metadata), ids=test_case_ids)
def test_min_full_scan_interval(configuration, metadata, set_wazuh_configuration, truncate_log_files,
prepare_environment, restart_modulesd_function):
prepare_full_scan_environment, restart_modulesd_function):
'''
description: Checks if the `min_full_scan_interval ` option is working correctly. To do this,
it checks the `ossec.log` file for the message indicating that a full scan is being performed. After
Expand Down Expand Up @@ -177,125 +136,3 @@ def test_min_full_scan_interval(configuration, metadata, set_wazuh_configuration

# Wait for full scan event log completion
evm.check_vulnerability_full_scan_end()


@pytest.mark.tier(level=5)
@pytest.mark.parametrize('configuration, metadata', zip(configurations, configuration_metadata), ids=test_case_ids)
def test_min_full_scan_interval_extended(configuration, metadata, set_wazuh_configuration, truncate_log_files,
prepare_environment, restart_modulesd_function):
'''
description: Checks if the `min_full_scan_interval ` option is working correctly, also checking if the full scan
is performed correctly. To do this, it checks the `ossec.log` file for the message indicating that a
full scan is being performed.
Three packages are inserted, The first of them will be vulnerable, the second will be used to check
that a non-vulnerable package after the check is not present in `vuln_cve` and the last one will be
updated to a non-vulnerable version in order to check that the next full scan removes the obsolete
vulnerability.
Then, checking that every package and its vulnerability generates a correct flow. Checks if it has
been scanned(triaged=1), if vulnerable package is reported, vulnerabilities has valid status and
non-vulnerable packages does not appear in `vuln_cve`.
After this, it waits until the event end alert shows up. Finally, after upgrading a vulnerable package
to a non-vulnerable, it checks that the next full scan removes it.
wazuh_min_version: 4.3.0
parameters:
- configuration:
type: dict
brief: Configuration loaded from `configuration_template`.
- metadata:
type: dict
brief: Test case metadata.
- set_wazuh_configuration:
type: fixture
brief: Set wazuh configuration.
- truncate_log_files:
type: fixture
brief: Truncate all the log files after the test execution.
- prepare_environment:
type: fixture
brief: Setup the initial test state.
- restart_modulesd_function:
type: fixture
brief: Restart wazuh-modulesd daemon before starting a test, and stop it after finishing.
assertions:
- Verify that the full scan starts.
- Verify that the packages has been scanned.
- Verify that the os has been set as triaged.
- Verify that every vulnerable package has the status `VALID` as expected.
- Verify that every new vulnerable package is logged.
- Verify that every package is inserted into its expected vulnerability.
- Verify the non-vulnerable package removal.
- Verify that the full scan ends.
- Verify that the obsolete vulnerability is removed.
input_description: The values specified in `data/test_cases`.
expected_output:
- "A full scan will be run on agent '000'"
- f"The '{package}' package .* from agent .* is vulnerable to '{cve}'"
- f"Package '{package_name}' inserted into the vulnerability '{package_cve}'."
- f"Package '{package}' not vulnerable to '{cve}'."
- "Finished vulnerability assessment for agent '000'"
- f"The vulnerability '{cve}' affecting '{package}' was eliminated"
'''
event_timeout = time_to_seconds(metadata['min_full_scan_interval_value']) + vd.VULN_DETECTOR_SCAN_TIMEOUT
package_names = metadata['package_names']
package_cves = metadata['package_cves']

# Change the package version making it not valid
adb.update_package(package=package_names[1],
version=metadata['package_version_not_vulnerable'])

# Check that the full scan starts
evm.check_vulnerability_full_scan_start(event_timeout)

# Check that every package has been set as triaged
triaged_query = get_sqlite_query_result(os.path.join(DB_PATH, '000.db'), "SELECT triaged, name FROM sys_programs")
for triaged_value in triaged_query:
triaged_value_and_name = triaged_value.split(',')
assert int(triaged_value_and_name[0]) == 1, f"The package {triaged_value_and_name[1]} has not been scanned."

# Check that the OS has been set as triaged
os_triaged_query = get_sqlite_query_result(os.path.join(DB_PATH, '000.db'), "SELECT triaged FROM sys_osinfo")[0]
assert 1 == int(os_triaged_query), 'The OS has not been set as triaged.'

# Check that vulnerable packages are reported by vulnerability detector
evm.check_vulnerability_scan_log(package=package_names[0], cve=package_cves[0])
evm.check_vulnerability_scan_log(package=package_names[2], cve=package_cves[2])

# Check that vulnerabilities has 'VALID' status
packages_query = get_sqlite_query_result(os.path.join(DB_PATH, '000.db'), 'SELECT count(*) FROM vuln_cves '
"WHERE status='VALID';")
assert 2 == int(packages_query[0]), f"The expected packages vulnerabilities have not been updated to 'VALID'"

# Check that the packages are inserted into its vulnerabilities
evm.check_vulnerability_scan_inserted_package(package_names[0], package_cves[0])
evm.check_vulnerability_scan_inserted_package(package_names[2], package_cves[2])

# Wait for the non-vulnerable package removal event
evm.check_vulnerability_scan_remove_vuln_package(package_names[1], package_cves[1])
# Check that non-vulnerable package does not appear in vuln_cve db
package_1_query = get_sqlite_query_result(os.path.join(DB_PATH, '000.db'), "SELECT count(*) FROM vuln_cves "
f"WHERE name='{package_names[1]}';")
assert 0 == int(package_1_query[0]), f"The package {package_names[1]} has not been deleted from vuln_cve table."

# Wait for full scan event log completion
evm.check_vulnerability_full_scan_end()

# Change the package version to a non-vulnerable one, to check if the full scan(next one) erases it
adb.update_package(package=package_names[2],
version=metadata['package_version_not_vulnerable'])

# Wait for the non-vulnerable package removal event
evm.check_vulnerability_scan_remove_vuln_package(package_names[2], package_cves[2])
# Check that non-vulnerable package does not appear in vuln_cve db
package_2_query = get_sqlite_query_result(os.path.join(DB_PATH, '000.db'), "SELECT count(*) FROM vuln_cves "
f"WHERE name='{package_names[2]}';")
assert 0 == int(package_2_query[0]), f"The package {package_names[2]} has not been deleted from vuln_cve table."

# Wait for the vulnerability removal event
evm.check_vulnerability_scan_remove_vuln(package_names[2], package_cves[2])

0 comments on commit 6a6cfc7

Please sign in to comment.