-
Notifications
You must be signed in to change notification settings - Fork 32
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add: Add db interface modules #2462:
- agent_db - cve_db - global_db
- Loading branch information
Showing
3 changed files
with
376 additions
and
0 deletions.
There are no files selected for viewing
185 changes: 185 additions & 0 deletions
185
deps/wazuh_testing/wazuh_testing/db_interface/agent_db.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,185 @@ | ||
import datetime | ||
from time import time | ||
|
||
from wazuh_testing.db_interface import query_wdb | ||
|
||
|
||
def clean_table(agent_id, table): | ||
"""Delete all table entries of the agent DB using wazuh_db. | ||
Args: | ||
agent_id (str): Agent ID. | ||
table (str): table from the agent DB. | ||
""" | ||
query_string = f"agent {agent_id} sql DELETE FROM {table}" | ||
query_wdb(query_string) | ||
|
||
|
||
def update_last_full_scan(last_scan=0, agent_id='000'): | ||
"""Update the last scan of an agent. | ||
Args: | ||
last_scan (int): Last scan ID. This is compute by casting to int the result of time() | ||
agent_id (str): Agent ID | ||
""" | ||
query_string = f"agent {agent_id} sql UPDATE vuln_metadata SET LAST_FULL_SCAN={last_scan}" | ||
query_wdb(query_string) | ||
|
||
|
||
def insert_hotfix(agent_id='000', scan_id=int(time()), scan_time=datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"), | ||
hotfix='000000', checksum='dummychecksum'): | ||
"""Insert a hotfix. | ||
Args: | ||
agent_id (str): Agent ID. | ||
scan_id (int): Last scan ID. | ||
scan_time (str): Scan date ("%Y/%m/%d %H:%M:%S") | ||
hotfix (str): ID of the hotfix value. | ||
checksum (str): Hotfix checksum | ||
""" | ||
query_string = f"agent {agent_id} sql INSERT INTO sys_hotfixes (scan_id, scan_time, hotfix, checksum) VALUES " \ | ||
f"({scan_id}, '{scan_time}', '{hotfix}', '{checksum}')" | ||
query_wdb(query_string) | ||
|
||
|
||
def insert_os_info(agent_id='000', scan_id=int(time()), scan_time=datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"), | ||
hostname='centos8', architecture='x86_64', os_name='CentOS Linux', os_version='8.4', os_major='8', | ||
os_minor='4', os_build='', version='', os_release='', os_patch='', release='', | ||
checksum='dummychecksum'): | ||
"""Insert the OS information in the agent database. | ||
Args: | ||
agent_id (str): id of the agent | ||
scan_id (int): id of the last scan | ||
scan_time (str): date of the scan with this format "%Y/%m/%d %H:%M:%S" | ||
hostname (str): name of the host | ||
architecture (str): architecture of the host | ||
os_name (str): complete name of the OS | ||
os_version (str): version of the OS | ||
os_major (str): major version of the OS | ||
os_minor (str): minor version of the OS | ||
os_build (str): build id of the OS | ||
version (str): version of the OS | ||
os_release (str): release of the OS | ||
os_patch (str): current patch of the OS | ||
release (str): release of the OS | ||
checksum (str): checksum of the OS | ||
""" | ||
query_string = f"agent {agent_id} sql INSERT OR REPLACE INTO sys_osinfo (scan_id, scan_time, hostname, " \ | ||
'architecture, os_name, os_version, os_major, os_minor, os_patch, os_build, release, version, ' \ | ||
f"os_release, checksum) VALUES ('{scan_id}', '{scan_time}', '{hostname}', '{architecture}', " \ | ||
f"'{os_name}', '{os_version}', '{os_major}', '{os_minor}', '{os_patch}', '{os_build}', " \ | ||
f"'{release}', '{version}', '{os_release}', '{checksum}')" | ||
query_wdb(query_string) | ||
|
||
|
||
def insert_package(agent_id='000', scan_id=int(time()), format='rpm', name=vd.DEFAULT_PACKAGE_NAME, | ||
priority='', section='Unspecified', size=99, vendor='wazuhintegrationtests', version='1.0.0-1.el7', | ||
architecture='x86_64', multiarch='', description='Wazuh Integration tests mock package', | ||
source='Wazuh Integration tests mock package', location='', triaged=0, | ||
install_time=datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"), | ||
scan_time=datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"), checksum='dummychecksum', | ||
item_id='dummyitemid'): | ||
"""Insert a package in the agent DB | ||
Args: | ||
agent_id (str): Agent ID. | ||
scan_id (int): Last scan ID. | ||
format (str): Package format (deb, rpm, ...) | ||
name (str): Package name | ||
priority (str): Released package priority. | ||
section (str): Package section. | ||
size (int): Package size. | ||
vendor (str): Package vendor. | ||
version (str): Package version. | ||
architecture (str): Package architecture. | ||
multiarch (str): Define if a package may be installed in different architectures. | ||
description (str): Package description. | ||
source (str): Package source. | ||
location (str): Package location | ||
triaged (int): Times that the package has been installed. | ||
install_time (str): Installation timestamp | ||
scan_time (str): Scan timestamp | ||
checksum (str): Package checksum. | ||
item_id (str): Package ID. | ||
""" | ||
arguments = locals() | ||
for key, value in arguments.items(): | ||
if type(value) is str: | ||
if value != 'NULL': | ||
arguments[key] = f"'{value}'" | ||
|
||
query_wdb(f"agent {agent_id} sql INSERT INTO sys_programs (scan_id, scan_time, format, name, priority, section, " | ||
f"size, vendor, install_time, version, architecture, multiarch, source, description, location, triaged," | ||
f"checksum, item_id) VALUES ({arguments['scan_id']}, {arguments['scan_time']}, {arguments['format']}," | ||
f"{arguments['name']}, {arguments['priority']}, {arguments['section']}, {arguments['size']}," | ||
f"{arguments['vendor']}, {arguments['install_time']}, {arguments['version']}," | ||
f"{arguments['architecture']}, {arguments['multiarch']}, {arguments['source']}, " | ||
f"{arguments['description']}, {arguments['location']}, {arguments['triaged']}, {arguments['checksum']}," | ||
f"{arguments['item_id']})") | ||
|
||
|
||
def update_sync_info(agent_id='000', component='syscollector-packages', last_attempt=1, last_completion=1, | ||
n_attempts=0, n_completions=0, last_agent_checksum=''): | ||
"""Update the sync_info table of the specified agent for the selected component.""" | ||
query_wdb(f"agent {agent_id} sql UPDATE sync_info SET last_attempt = {last_attempt}," | ||
f"last_completion = {last_completion}, n_attempts = {n_attempts}, n_completions = {n_completions}," | ||
f"last_agent_checksum = '{last_agent_checksum}' where component = '{component}'") | ||
|
||
|
||
def update_package(version, package, agent_id='000'): | ||
"""Update version of installed package in database. | ||
Used to simulate upgrades and downgrades of the package given. | ||
Args: | ||
version (str): Package version. | ||
package (str): Package name. | ||
agent_id (str): Agent ID. | ||
""" | ||
update_query_string = f'agent {agent_id} sql UPDATE sys_programs SET version="{version}" WHERE name="{package}"' | ||
query_wdb(update_query_string) | ||
|
||
|
||
def delete_package(package, agent_id='000'): | ||
"""Remove package from database. | ||
Used to simulate uninstall of the package given. | ||
Args: | ||
package (str): Package name | ||
agent_id (str): agent ID. | ||
""" | ||
delete_query_string = f'agent {agent_id} sql DELETE FROM sys_programs WHERE name="{package}"' | ||
query_wdb(delete_query_string) | ||
|
||
|
||
def clean_vulnerabilities_inventory(agent_id='000'): | ||
"""Clean the vulnerabilities inventory from database. | ||
Args: | ||
agent_id (str): Agent ID. | ||
""" | ||
clean_query_string = f"agent {agent_id} sql DELETE from vuln_cves" | ||
query_wdb(clean_query_string) | ||
|
||
|
||
def modify_agent_scan_timestamp(agent_id='000', timestamp=0, full_scan=True): | ||
"""Update the timestamp of the agent scans in the vuln_metadata table. | ||
Args: | ||
agent_id (str): Agent ID. | ||
timestamp (int): Timestamp value to set. | ||
full_scan (bool): True for set LAST_FULL_SCAN or False to set LAST_SCAN. | ||
""" | ||
scan_type = "LAST_FULL_SCAN" if full_scan else "LAST_PARTIAL_SCAN" | ||
query_wdb(f"agent {agent_id} sql UPDATE vuln_metadata SET {scan_type}={timestamp}") | ||
|
||
|
||
def delete_os_info_data(agent_id='000'): | ||
"""Delete the sys_osinfo data from a specific agent | ||
Args: | ||
agent_id (str): Agent ID. | ||
""" | ||
query_wdb(f"agent {agent_id} sql DELETE FROM sys_osinfo") |
130 changes: 130 additions & 0 deletions
130
deps/wazuh_testing/wazuh_testing/db_interface/cve_db.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
from datetime import datetime | ||
from time import sleep | ||
from sqlite3 import OperationalError | ||
|
||
from wazuh_testing.db_interface import make_sqlite_query, get_sqlite_query_result, CVE_DB_PATH | ||
from wazuh_testing.modules import vulnerability_detector as vd | ||
|
||
|
||
def clean_table(table): | ||
"""Delete all table entries from CVE DB. | ||
Args: | ||
table (str): DB table. | ||
""" | ||
make_sqlite_query(CVE_DB_PATH, [f"DELETE FROM {table}"]) | ||
|
||
|
||
def insert_vulnerability(cveid=vd.DEFAULT_VULNERABILITY_ID, target='RHEL7', target_minor='', | ||
package=vd.DEFAULT_PACKAGE_NAME, operation='less than', operation_value='2.0.0-1.el7', | ||
title='', severity='critical', | ||
published=datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"), updated='', | ||
reference='https:/wazuh/wazuh-qa', target_v='REDHAT', cvss='10.000000', | ||
cvss_vector='AV:N/AC:L/Au:N/C:C/I:C/A:C', rationale='Wazuh integration test vulnerability', | ||
cvss3='', bugzilla_reference='https:/wazuh/wazuh-qa', cwe='WVE-000 -> WVE-001', | ||
advisory='RHSA-2010:0029', ref_target='RHEL'): | ||
"""Insert a vulnerability in CVE database. | ||
Args: | ||
cveid (str): Vulnerability ID | ||
target (str): OS target. | ||
target_minor (str): OS target minor version. | ||
package (str): Package name. | ||
operation (str): Operation to compare the version of the packages. | ||
operation_value (str): Value used to compare the packages. | ||
title (str): Vulnerability title. | ||
severity (str): Vulnerability severity. | ||
published (str): Date when the vulnerability was published. | ||
updated (str): Contain if the package was updated. | ||
reference (str): URL referencing the vulnerability | ||
target_v (str): OS target family. | ||
cvss (str): Common vulnerability scoring system | ||
cvss_vector (str): Representation of the values used to derive the score. | ||
rationale (str): Reasons to describe the vulnerability. | ||
cvss3 (str): Common vulnerability scoring system version 3 | ||
bugzilla_reference (str): URL referencing to bugzilla | ||
cwe (str): CWE ID | ||
advisory (str): advisory ID | ||
ref_target (str): OS target ID. | ||
""" | ||
queries = [ | ||
'INSERT INTO VULNERABILITIES (cveid, target, target_minor, package, operation, operation_value) VALUES ' | ||
f"('{cveid}', '{target}', '{target_minor}', '{package}', '{operation}', '{operation_value}')", | ||
|
||
'INSERT INTO VULNERABILITIES_INFO (ID, title, severity, published, updated, target, rationale, cvss, ' | ||
f"cvss_vector, CVSS3, cwe) VALUES ('{cveid}', '{title}', '{severity}', '{published}', '{updated}', " | ||
f"'{target_v}', {rationale}', '{cvss}', '{cvss_vector}', '{cvss3}', '{cwe}')", | ||
|
||
f"INSERT INTO REFERENCES_INFO (id, target, reference) VALUES ('{cveid}', '{ref_target}', " | ||
f"'{bugzilla_reference}')", | ||
|
||
f"INSERT INTO BUGZILLA_REFERENCES_INFO (id, target, bugzilla_reference) VALUES ('{cveid}', '{ref_target}', " | ||
f"'{bugzilla_reference}')", | ||
|
||
f"INSERT INTO ADVISORIES_INFO (id, target, advisory) VALUES ('{cveid}', '{ref_target}', '{advisory}')" | ||
] | ||
|
||
make_sqlite_query(vd.CVE_DB_PATH, queries) | ||
|
||
|
||
def delete_vulnerability(cveid): | ||
"""Remove a vulnerability from the DB. | ||
Args: | ||
cveid (str): Vulnerability ID. | ||
""" | ||
queries = [ | ||
f"DELETE FROM VULNERABILITIES WHERE cveid='{cveid}'", | ||
f"DELETE FROM VULNERABILITIES_INFO WHERE id='{cveid}'", | ||
f"DELETE FROM REFERENCES_INFO WHERE id='{cveid}'", | ||
f"DELETE FROM BUGZILLA_REFERENCES_INFO WHERE id='{cveid}'", | ||
f"DELETE FROM ADVISORIES_INFO WHERE id='{cveid}'" | ||
] | ||
|
||
make_sqlite_query(vd.CVE_DB_PATH, queries) | ||
|
||
|
||
def get_num_vulnerabilities(): | ||
"""Get the number of vulnerabilities inserted in VULNERABILITIES table of CVE DB. | ||
Returns: | ||
int: total number of vulnerabilities in the VULNERABILITIES table. | ||
""" | ||
query_string = 'SELECT count(*) from VULNERABILITIES' | ||
query_result = get_sqlite_query_result(vd.CVE_DB_PATH, query_string) | ||
vulnerabilities_number = int(query_result[0]) | ||
|
||
return vulnerabilities_number | ||
|
||
|
||
def modify_metadata_vuldet_feed(feed, timestamp): | ||
"""Function to modify the timestamp value of the metadata table for a specific feed. | ||
Args: | ||
feed (str): Feed name. | ||
timestamp (str): Timestamp value to set. | ||
""" | ||
query_string = f"update METADATA set TIMESTAMP='{timestamp}' where TARGET='{feed}'" | ||
make_sqlite_query(vd.CVE_DB_PATH, [query_string]) | ||
sleep(1) | ||
|
||
|
||
def modify_nvd_metadata_vuldet(timestamp): | ||
"""Update the timestamp value of the nvd_metadata table. | ||
Args: | ||
timestamp (int): The new timestamp value to set. | ||
Raises: | ||
sqlite3.OperationalError: If could not update the value. | ||
""" | ||
query_string = f"UPDATE NVD_METADATA SET LAST_UPDATE={timestamp};" | ||
|
||
for _ in range(vd.VULN_DETECTOR_GLOBAL_TIMEOUT): | ||
try: | ||
make_sqlite_query(vd.CVE_DB_PATH, [query_string]) | ||
break | ||
except OperationalError: | ||
sleep(1) | ||
else: | ||
raise OperationalError |
61 changes: 61 additions & 0 deletions
61
deps/wazuh_testing/wazuh_testing/db_interface/global_db.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
from wazuh_testing.db_interface import query_wdb | ||
|
||
|
||
def modify_system(os_name='CentOS Linux', os_major='7', name='centos7', agent_id='000', os_minor='1', os_arch='x86_64', | ||
os_version='7.1', os_platform='centos', version='4.0'): | ||
"""Modify the manager or agent system. | ||
Args: | ||
os_name (str): OS complete name. | ||
os_major (str): OS major version. | ||
name (str): Os name. | ||
agent_id (str): Agent ID. | ||
os_minor (str): OS minor version | ||
os_arch (str): Host architecture. | ||
os_version (str): OS version. | ||
os_platform (str): Os platform e.g (centos, ubuntu, ...) | ||
version (str): OS version | ||
""" | ||
query_string = f"global sql update AGENT set OS_NAME='{os_name}', OS_VERSION='{os_version}', " \ | ||
f"OS_MAJOR='{os_major}', OS_MINOR='{os_minor}', OS_ARCH='{os_arch}', NAME='{name}', " \ | ||
f"OS_PLATFORM='{os_platform}', VERSION='{version}' WHERE id='{int(agent_id)}'" | ||
query_wdb(query_string) | ||
|
||
|
||
def create_or_update_agent(agent_id='001', name='centos8-agent', ip='127.0.0.1', register_ip='127.0.0.1', | ||
internal_key='', os_name='CentOS Linux', os_version='8.4', os_major='8', os_minor='4', | ||
os_codename='centos-8', os_build='4.18.0-147.8.1.el8_1.x86_64', | ||
os_platform='#1 SMP Thu Apr 9 13:49:54 UTC 2020', os_uname='x86_64', os_arch='x86_64', | ||
version='4.2', config_sum='', merged_sum='', manager_host='centos-8', node_name='node01', | ||
date_add='1612942494', last_keepalive='253402300799', group='', sync_status='synced', | ||
connection_status='active'): | ||
"""Create an agent or update its info it is already exists (checking agent_id).""" | ||
|
||
query = 'global sql INSERT OR REPLACE INTO AGENT (id, name, ip, register_ip, internal_key, os_name, os_version, ' \ | ||
'os_major, os_minor, os_codename, os_build, os_platform, os_uname, os_arch, version, config_sum, ' \ | ||
'manager_host, node_name, date_add, last_keepalive, "group", sync_status, connection_status) VALUES ' \ | ||
f"('{agent_id}', '{name}', '{ip}', '{register_ip}', '{internal_key}', '{os_name}', '{os_version}', " \ | ||
f"'{os_major}', '{os_minor}', '{os_codename}', '{os_build}', '{os_platform}', '{os_uname}', '{os_arch}', " \ | ||
f"'{version}', '{config_sum}', '{merged_sum}', '{manager_host}', '{node_name}', '{date_add}', " \ | ||
f"'{last_keepalive}', '{group}', '{sync_status}', '{connection_status}')" | ||
query_wdb(query) | ||
|
||
|
||
def get_last_agent_id(): | ||
"""Get the last agent ID registered in the global DB. | ||
Returns: | ||
str: Agent ID. | ||
""" | ||
last_id = query_wdb('global sql SELECT id FROM agent order by id desc limit 1') | ||
return last_id[0]['id'] | ||
|
||
|
||
def delete_agent(agent_id): | ||
"""Delete an agent from the global.db | ||
Args: | ||
agent_id (str): Agent ID. | ||
""" | ||
query_wdb(f"global sql DELETE FROM agent where id={int(agent_id)}") | ||
|