diff --git a/deps/wazuh_testing/wazuh_testing/db_interface/agent_db.py b/deps/wazuh_testing/wazuh_testing/db_interface/agent_db.py new file mode 100644 index 0000000000..b1efe58c80 --- /dev/null +++ b/deps/wazuh_testing/wazuh_testing/db_interface/agent_db.py @@ -0,0 +1,185 @@ +import datetime +from time import time + +from wazuh_testing.db_interface import query_wdb + + +def clean_table(agent_id, table): + """Delete all table entries of the agent DB using wazuh_db. + + Args: + agent_id (str): Agent ID. + table (str): table from the agent DB. + """ + query_string = f"agent {agent_id} sql DELETE FROM {table}" + query_wdb(query_string) + + +def update_last_full_scan(last_scan=0, agent_id='000'): + """Update the last scan of an agent. + + Args: + last_scan (int): Last scan ID. This is compute by casting to int the result of time() + agent_id (str): Agent ID + """ + query_string = f"agent {agent_id} sql UPDATE vuln_metadata SET LAST_FULL_SCAN={last_scan}" + query_wdb(query_string) + + +def insert_hotfix(agent_id='000', scan_id=int(time()), scan_time=datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"), + hotfix='000000', checksum='dummychecksum'): + """Insert a hotfix. + + Args: + agent_id (str): Agent ID. + scan_id (int): Last scan ID. + scan_time (str): Scan date ("%Y/%m/%d %H:%M:%S") + hotfix (str): ID of the hotfix value. + checksum (str): Hotfix checksum + """ + query_string = f"agent {agent_id} sql INSERT INTO sys_hotfixes (scan_id, scan_time, hotfix, checksum) VALUES " \ + f"({scan_id}, '{scan_time}', '{hotfix}', '{checksum}')" + query_wdb(query_string) + + +def insert_os_info(agent_id='000', scan_id=int(time()), scan_time=datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"), + hostname='centos8', architecture='x86_64', os_name='CentOS Linux', os_version='8.4', os_major='8', + os_minor='4', os_build='', version='', os_release='', os_patch='', release='', + checksum='dummychecksum'): + """Insert the OS information in the agent database. + + Args: + agent_id (str): id of the agent + scan_id (int): id of the last scan + scan_time (str): date of the scan with this format "%Y/%m/%d %H:%M:%S" + hostname (str): name of the host + architecture (str): architecture of the host + os_name (str): complete name of the OS + os_version (str): version of the OS + os_major (str): major version of the OS + os_minor (str): minor version of the OS + os_build (str): build id of the OS + version (str): version of the OS + os_release (str): release of the OS + os_patch (str): current patch of the OS + release (str): release of the OS + checksum (str): checksum of the OS + """ + query_string = f"agent {agent_id} sql INSERT OR REPLACE INTO sys_osinfo (scan_id, scan_time, hostname, " \ + 'architecture, os_name, os_version, os_major, os_minor, os_patch, os_build, release, version, ' \ + f"os_release, checksum) VALUES ('{scan_id}', '{scan_time}', '{hostname}', '{architecture}', " \ + f"'{os_name}', '{os_version}', '{os_major}', '{os_minor}', '{os_patch}', '{os_build}', " \ + f"'{release}', '{version}', '{os_release}', '{checksum}')" + query_wdb(query_string) + + +def insert_package(agent_id='000', scan_id=int(time()), format='rpm', name=vd.DEFAULT_PACKAGE_NAME, + priority='', section='Unspecified', size=99, vendor='wazuhintegrationtests', version='1.0.0-1.el7', + architecture='x86_64', multiarch='', description='Wazuh Integration tests mock package', + source='Wazuh Integration tests mock package', location='', triaged=0, + install_time=datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"), + scan_time=datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"), checksum='dummychecksum', + item_id='dummyitemid'): + """Insert a package in the agent DB + + Args: + agent_id (str): Agent ID. + scan_id (int): Last scan ID. + format (str): Package format (deb, rpm, ...) + name (str): Package name + priority (str): Released package priority. + section (str): Package section. + size (int): Package size. + vendor (str): Package vendor. + version (str): Package version. + architecture (str): Package architecture. + multiarch (str): Define if a package may be installed in different architectures. + description (str): Package description. + source (str): Package source. + location (str): Package location + triaged (int): Times that the package has been installed. + install_time (str): Installation timestamp + scan_time (str): Scan timestamp + checksum (str): Package checksum. + item_id (str): Package ID. + """ + arguments = locals() + for key, value in arguments.items(): + if type(value) is str: + if value != 'NULL': + arguments[key] = f"'{value}'" + + query_wdb(f"agent {agent_id} sql INSERT INTO sys_programs (scan_id, scan_time, format, name, priority, section, " + f"size, vendor, install_time, version, architecture, multiarch, source, description, location, triaged," + f"checksum, item_id) VALUES ({arguments['scan_id']}, {arguments['scan_time']}, {arguments['format']}," + f"{arguments['name']}, {arguments['priority']}, {arguments['section']}, {arguments['size']}," + f"{arguments['vendor']}, {arguments['install_time']}, {arguments['version']}," + f"{arguments['architecture']}, {arguments['multiarch']}, {arguments['source']}, " + f"{arguments['description']}, {arguments['location']}, {arguments['triaged']}, {arguments['checksum']}," + f"{arguments['item_id']})") + + +def update_sync_info(agent_id='000', component='syscollector-packages', last_attempt=1, last_completion=1, + n_attempts=0, n_completions=0, last_agent_checksum=''): + """Update the sync_info table of the specified agent for the selected component.""" + query_wdb(f"agent {agent_id} sql UPDATE sync_info SET last_attempt = {last_attempt}," + f"last_completion = {last_completion}, n_attempts = {n_attempts}, n_completions = {n_completions}," + f"last_agent_checksum = '{last_agent_checksum}' where component = '{component}'") + + +def update_package(version, package, agent_id='000'): + """Update version of installed package in database. + + Used to simulate upgrades and downgrades of the package given. + + Args: + version (str): Package version. + package (str): Package name. + agent_id (str): Agent ID. + """ + update_query_string = f'agent {agent_id} sql UPDATE sys_programs SET version="{version}" WHERE name="{package}"' + query_wdb(update_query_string) + + +def delete_package(package, agent_id='000'): + """Remove package from database. + + Used to simulate uninstall of the package given. + + Args: + package (str): Package name + agent_id (str): agent ID. + """ + delete_query_string = f'agent {agent_id} sql DELETE FROM sys_programs WHERE name="{package}"' + query_wdb(delete_query_string) + + +def clean_vulnerabilities_inventory(agent_id='000'): + """Clean the vulnerabilities inventory from database. + + Args: + agent_id (str): Agent ID. + """ + clean_query_string = f"agent {agent_id} sql DELETE from vuln_cves" + query_wdb(clean_query_string) + + +def modify_agent_scan_timestamp(agent_id='000', timestamp=0, full_scan=True): + """Update the timestamp of the agent scans in the vuln_metadata table. + + Args: + agent_id (str): Agent ID. + timestamp (int): Timestamp value to set. + full_scan (bool): True for set LAST_FULL_SCAN or False to set LAST_SCAN. + """ + scan_type = "LAST_FULL_SCAN" if full_scan else "LAST_PARTIAL_SCAN" + query_wdb(f"agent {agent_id} sql UPDATE vuln_metadata SET {scan_type}={timestamp}") + + +def delete_os_info_data(agent_id='000'): + """Delete the sys_osinfo data from a specific agent + + Args: + agent_id (str): Agent ID. + """ + query_wdb(f"agent {agent_id} sql DELETE FROM sys_osinfo") diff --git a/deps/wazuh_testing/wazuh_testing/db_interface/cve_db.py b/deps/wazuh_testing/wazuh_testing/db_interface/cve_db.py new file mode 100644 index 0000000000..544e83004b --- /dev/null +++ b/deps/wazuh_testing/wazuh_testing/db_interface/cve_db.py @@ -0,0 +1,130 @@ +from datetime import datetime +from time import sleep +from sqlite3 import OperationalError + +from wazuh_testing.db_interface import make_sqlite_query, get_sqlite_query_result, CVE_DB_PATH +from wazuh_testing.modules import vulnerability_detector as vd + + +def clean_table(table): + """Delete all table entries from CVE DB. + + Args: + table (str): DB table. + """ + make_sqlite_query(CVE_DB_PATH, [f"DELETE FROM {table}"]) + + +def insert_vulnerability(cveid=vd.DEFAULT_VULNERABILITY_ID, target='RHEL7', target_minor='', + package=vd.DEFAULT_PACKAGE_NAME, operation='less than', operation_value='2.0.0-1.el7', + title='', severity='critical', + published=datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"), updated='', + reference='https://github.com/wazuh/wazuh-qa', target_v='REDHAT', cvss='10.000000', + cvss_vector='AV:N/AC:L/Au:N/C:C/I:C/A:C', rationale='Wazuh integration test vulnerability', + cvss3='', bugzilla_reference='https://github.com/wazuh/wazuh-qa', cwe='WVE-000 -> WVE-001', + advisory='RHSA-2010:0029', ref_target='RHEL'): + """Insert a vulnerability in CVE database. + + Args: + cveid (str): Vulnerability ID + target (str): OS target. + target_minor (str): OS target minor version. + package (str): Package name. + operation (str): Operation to compare the version of the packages. + operation_value (str): Value used to compare the packages. + title (str): Vulnerability title. + severity (str): Vulnerability severity. + published (str): Date when the vulnerability was published. + updated (str): Contain if the package was updated. + reference (str): URL referencing the vulnerability + target_v (str): OS target family. + cvss (str): Common vulnerability scoring system + cvss_vector (str): Representation of the values used to derive the score. + rationale (str): Reasons to describe the vulnerability. + cvss3 (str): Common vulnerability scoring system version 3 + bugzilla_reference (str): URL referencing to bugzilla + cwe (str): CWE ID + advisory (str): advisory ID + ref_target (str): OS target ID. + """ + queries = [ + 'INSERT INTO VULNERABILITIES (cveid, target, target_minor, package, operation, operation_value) VALUES ' + f"('{cveid}', '{target}', '{target_minor}', '{package}', '{operation}', '{operation_value}')", + + 'INSERT INTO VULNERABILITIES_INFO (ID, title, severity, published, updated, target, rationale, cvss, ' + f"cvss_vector, CVSS3, cwe) VALUES ('{cveid}', '{title}', '{severity}', '{published}', '{updated}', " + f"'{target_v}', {rationale}', '{cvss}', '{cvss_vector}', '{cvss3}', '{cwe}')", + + f"INSERT INTO REFERENCES_INFO (id, target, reference) VALUES ('{cveid}', '{ref_target}', " + f"'{bugzilla_reference}')", + + f"INSERT INTO BUGZILLA_REFERENCES_INFO (id, target, bugzilla_reference) VALUES ('{cveid}', '{ref_target}', " + f"'{bugzilla_reference}')", + + f"INSERT INTO ADVISORIES_INFO (id, target, advisory) VALUES ('{cveid}', '{ref_target}', '{advisory}')" + ] + + make_sqlite_query(vd.CVE_DB_PATH, queries) + + +def delete_vulnerability(cveid): + """Remove a vulnerability from the DB. + + Args: + cveid (str): Vulnerability ID. + """ + queries = [ + f"DELETE FROM VULNERABILITIES WHERE cveid='{cveid}'", + f"DELETE FROM VULNERABILITIES_INFO WHERE id='{cveid}'", + f"DELETE FROM REFERENCES_INFO WHERE id='{cveid}'", + f"DELETE FROM BUGZILLA_REFERENCES_INFO WHERE id='{cveid}'", + f"DELETE FROM ADVISORIES_INFO WHERE id='{cveid}'" + ] + + make_sqlite_query(vd.CVE_DB_PATH, queries) + + +def get_num_vulnerabilities(): + """Get the number of vulnerabilities inserted in VULNERABILITIES table of CVE DB. + + Returns: + int: total number of vulnerabilities in the VULNERABILITIES table. + """ + query_string = 'SELECT count(*) from VULNERABILITIES' + query_result = get_sqlite_query_result(vd.CVE_DB_PATH, query_string) + vulnerabilities_number = int(query_result[0]) + + return vulnerabilities_number + + +def modify_metadata_vuldet_feed(feed, timestamp): + """Function to modify the timestamp value of the metadata table for a specific feed. + + Args: + feed (str): Feed name. + timestamp (str): Timestamp value to set. + """ + query_string = f"update METADATA set TIMESTAMP='{timestamp}' where TARGET='{feed}'" + make_sqlite_query(vd.CVE_DB_PATH, [query_string]) + sleep(1) + + +def modify_nvd_metadata_vuldet(timestamp): + """Update the timestamp value of the nvd_metadata table. + + Args: + timestamp (int): The new timestamp value to set. + + Raises: + sqlite3.OperationalError: If could not update the value. + """ + query_string = f"UPDATE NVD_METADATA SET LAST_UPDATE={timestamp};" + + for _ in range(vd.VULN_DETECTOR_GLOBAL_TIMEOUT): + try: + make_sqlite_query(vd.CVE_DB_PATH, [query_string]) + break + except OperationalError: + sleep(1) + else: + raise OperationalError diff --git a/deps/wazuh_testing/wazuh_testing/db_interface/global_db.py b/deps/wazuh_testing/wazuh_testing/db_interface/global_db.py new file mode 100644 index 0000000000..7627bc1884 --- /dev/null +++ b/deps/wazuh_testing/wazuh_testing/db_interface/global_db.py @@ -0,0 +1,61 @@ +from wazuh_testing.db_interface import query_wdb + + +def modify_system(os_name='CentOS Linux', os_major='7', name='centos7', agent_id='000', os_minor='1', os_arch='x86_64', + os_version='7.1', os_platform='centos', version='4.0'): + """Modify the manager or agent system. + + Args: + os_name (str): OS complete name. + os_major (str): OS major version. + name (str): Os name. + agent_id (str): Agent ID. + os_minor (str): OS minor version + os_arch (str): Host architecture. + os_version (str): OS version. + os_platform (str): Os platform e.g (centos, ubuntu, ...) + version (str): OS version + """ + query_string = f"global sql update AGENT set OS_NAME='{os_name}', OS_VERSION='{os_version}', " \ + f"OS_MAJOR='{os_major}', OS_MINOR='{os_minor}', OS_ARCH='{os_arch}', NAME='{name}', " \ + f"OS_PLATFORM='{os_platform}', VERSION='{version}' WHERE id='{int(agent_id)}'" + query_wdb(query_string) + + +def create_or_update_agent(agent_id='001', name='centos8-agent', ip='127.0.0.1', register_ip='127.0.0.1', + internal_key='', os_name='CentOS Linux', os_version='8.4', os_major='8', os_minor='4', + os_codename='centos-8', os_build='4.18.0-147.8.1.el8_1.x86_64', + os_platform='#1 SMP Thu Apr 9 13:49:54 UTC 2020', os_uname='x86_64', os_arch='x86_64', + version='4.2', config_sum='', merged_sum='', manager_host='centos-8', node_name='node01', + date_add='1612942494', last_keepalive='253402300799', group='', sync_status='synced', + connection_status='active'): + """Create an agent or update its info it is already exists (checking agent_id).""" + + query = 'global sql INSERT OR REPLACE INTO AGENT (id, name, ip, register_ip, internal_key, os_name, os_version, ' \ + 'os_major, os_minor, os_codename, os_build, os_platform, os_uname, os_arch, version, config_sum, ' \ + 'manager_host, node_name, date_add, last_keepalive, "group", sync_status, connection_status) VALUES ' \ + f"('{agent_id}', '{name}', '{ip}', '{register_ip}', '{internal_key}', '{os_name}', '{os_version}', " \ + f"'{os_major}', '{os_minor}', '{os_codename}', '{os_build}', '{os_platform}', '{os_uname}', '{os_arch}', " \ + f"'{version}', '{config_sum}', '{merged_sum}', '{manager_host}', '{node_name}', '{date_add}', " \ + f"'{last_keepalive}', '{group}', '{sync_status}', '{connection_status}')" + query_wdb(query) + + +def get_last_agent_id(): + """Get the last agent ID registered in the global DB. + + Returns: + str: Agent ID. + """ + last_id = query_wdb('global sql SELECT id FROM agent order by id desc limit 1') + return last_id[0]['id'] + + +def delete_agent(agent_id): + """Delete an agent from the global.db + + Args: + agent_id (str): Agent ID. + """ + query_wdb(f"global sql DELETE FROM agent where id={int(agent_id)}") +