Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Windows 4659 events tests #648

Merged
merged 11 commits into from
Nov 30, 2021
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# Copyright (C) 2015-2020, Wazuh Inc.
# Created by Wazuh, Inc. <[email protected]>.
# This program is free software; you can redistribute it and/or modify it under the terms of GPLv2

import os
from subprocess import Popen, PIPE, DEVNULL
import re
import json
from json import JSONDecodeError

import pytest

from wazuh_testing import global_parameters
from wazuh_testing.fim import LOG_FILE_PATH, generate_params, create_file, REGULAR, \
callback_detect_event, check_time_travel, validate_event
Molter73 marked this conversation as resolved.
Show resolved Hide resolved
from wazuh_testing.tools import PREFIX
from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test
from wazuh_testing.tools.monitoring import FileMonitor

# Marks

pytestmark = [pytest.mark.win32, pytest.mark.tier(level=0)]

# variables

wazuh_log_monitor = FileMonitor(LOG_FILE_PATH)
test_directories = [os.path.join(
PREFIX, 'testdir1'), os.path.join(PREFIX, 'testdir2')]
Molter73 marked this conversation as resolved.
Show resolved Hide resolved
directory_str = ','.join(test_directories)
for direc in list(test_directories):
test_directories.append(os.path.join(direc, 'subdir'))
test_data_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'data')
Molter73 marked this conversation as resolved.
Show resolved Hide resolved
configurations_path = os.path.join(test_data_path, 'wazuh_conf.yaml')
testdir1, testdir2 = test_directories[2:]

# configurations

conf_params = {'TEST_DIRECTORIES': directory_str, 'MODULE_NAME': __name__}
p, m = generate_params(extra_params=conf_params, modes=['whodata'])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variables with a single letter are not allowed, they should have descriptive names as parameters and metadata

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

configurations = load_wazuh_configurations(
configurations_path, __name__, params=p, metadata=m)
Molter73 marked this conversation as resolved.
Show resolved Hide resolved


# callback
def callback_detect_delete_event(line):
msg = r'.*Sending FIM event: (.+)$'
match = re.match(msg, line)

try:
event = json.loads(match.group(1))
if event['type'] == 'event' and event['data']['type'] == 'deleted' and 'process_name' not in event['data']['audit']:
Molter73 marked this conversation as resolved.
Show resolved Hide resolved
return event
except (AttributeError, JSONDecodeError, KeyError):
pass

return None

# fixtures


@pytest.fixture(scope='module', params=configurations)
def get_configuration(request):
"""Get configurations from the module."""
return request.param


# tests
@pytest.mark.parametrize('folder, file_list, filetype, tags_to_apply', [
(testdir1, ['regular0', 'regular1', 'regular2'], REGULAR, {'ossec_conf'},),
(testdir2, ['regular0', 'regular1', 'regular2'], REGULAR, {'ossec_conf'},)
])
def test_deferred_delete_file(folder, file_list, filetype, tags_to_apply,
get_configuration, configure_environment,
restart_syscheckd, wait_for_initial_scan):
"""
Check if syscheckd detects 'deleted' events from the files contained
in a folder that are deleted in a deferred manner.

We first run the command in order to find the confirmation character in the os,
after that we delete the files

The events generated must not contain the process_name parameter in order to guarantee
it's a 4659 event that generated it

Parameters
----------
folder : str
Directory where the files will be created.
file_list : list
Names of the files.
filetype : str
Type of the files that will be created.
"""
check_apply_test(tags_to_apply, get_configuration['tags'])

# Create files inside subdir folder
for file in file_list:
create_file(filetype, folder, file, content='')

# Wait for the added events
events = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=callback_detect_event,
accum_results=len(file_list), error_message='Did not receive expected '
'"Sending FIM event: ..." event').result()
Molter73 marked this conversation as resolved.
Show resolved Hide resolved

# Delete the files under 'folder'
command = 'del "{}"\n'.format(folder)
# assert command == None
Molter73 marked this conversation as resolved.
Show resolved Hide resolved
cmd = Popen(command, shell=True, stdin=PIPE, stdout=PIPE, stderr=DEVNULL, universal_newlines=True)
try:
stdout = cmd.communicate(timeout=global_parameters.default_timeout)
except TimeoutError:
pass

# Find the windows confirmation character
confirmation = re.search(r'\((\w)\/\w\)\?', stdout[0])
assert confirmation

# Run the command again and this time delete the files
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this time delete the files, we could comment confirm deletion of files

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

cmd = Popen(command, shell=True, stdin=PIPE, stdout=PIPE, stderr=DEVNULL, universal_newlines=True)
try:
stdout = cmd.communicate('{}\n'.format(confirmation.group(1)), timeout=global_parameters.default_timeout)
except TimeoutError:
pass

# Start monitoring
events = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=callback_detect_delete_event,
accum_results=len(file_list), error_message='Did not receive expected '
'"Sending FIM event: ..." event').result()
Molter73 marked this conversation as resolved.
Show resolved Hide resolved