-
Notifications
You must be signed in to change notification settings - Fork 32
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
4a3f61b
commit 1bb3233
Showing
18 changed files
with
1,084 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
import os | ||
import json | ||
from datetime import datetime, timedelta | ||
from copy import deepcopy | ||
|
||
from wazuh_testing.tools.time import parse_date_time_format | ||
|
||
|
||
# Timeouts | ||
T_5 = 5 | ||
T_10 = 10 | ||
T_15 = 15 | ||
T_20 = 20 | ||
T_60 = 60 | ||
|
||
ANALYSISD_PREFIX = r'.*wazuh-analysisd*' | ||
MAILD_PREFIX = r'.*wazuh-maild*' |
109 changes: 109 additions & 0 deletions
109
deps/wazuh_testing/wazuh_testing/modules/eps/event_monitor.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
import re | ||
from datetime import datetime | ||
|
||
from wazuh_testing.modules import eps as eps | ||
from wazuh_testing.tools import LOG_FILE_PATH, ANALYSISD_STATE, ALERT_LOGS_PATH | ||
from wazuh_testing.tools.monitoring import FileMonitor | ||
|
||
|
||
def make_analysisd_callback(pattern, prefix=eps.ANALYSISD_PREFIX): | ||
"""Create a callback function from a text pattern. | ||
It already contains the vulnerability-detector prefix. | ||
Args: | ||
pattern (str): String to match on the log. | ||
prefix (str): regular expression used as prefix before the pattern. | ||
Returns: | ||
lambda: function that returns if there's a match in the file | ||
Examples: | ||
>>> callback_bionic_update_started = make_vuln_callback("Starting Ubuntu Bionic database update") | ||
""" | ||
pattern = r'\s+'.join(pattern.split()) | ||
regex = re.compile(r'{}{}'.format(prefix, pattern)) | ||
|
||
return lambda line: regex.match(line) is not None | ||
|
||
|
||
def check_analysisd_event(file_monitor=None, callback='', error_message=None, update_position=True, | ||
timeout=eps.T_60, prefix=eps.ANALYSISD_PREFIX, accum_results=1, | ||
file_to_monitor=LOG_FILE_PATH): | ||
"""Check if a vulnerability event occurs | ||
Args: | ||
file_monitor (FileMonitor): FileMonitor object to monitor the file content. | ||
callback (str): log regex to check in Wazuh log | ||
error_message (str): error message to show in case of expected event does not occur | ||
update_position (boolean): filter configuration parameter to search in Wazuh log | ||
timeout (str): timeout to check the event in Wazuh log | ||
prefix (str): log pattern regex | ||
accum_results (int): Accumulation of matches. | ||
""" | ||
file_monitor = FileMonitor(file_to_monitor) if file_monitor is None else file_monitor | ||
error_message = f"Could not find this event in {file_to_monitor}: {callback}" if error_message is None else \ | ||
error_message | ||
|
||
file_monitor.start(timeout=timeout, update_position=update_position, accum_results=accum_results, | ||
callback=make_analysisd_callback(callback, prefix), error_message=error_message) | ||
|
||
|
||
def check_eps_disabled(): | ||
"""Check if the eps module is disabled""" | ||
check_analysisd_event(callback=fr'.*INFO: EPS limit disabled.*', timeout=eps.T_10) | ||
|
||
|
||
def check_eps_enabled(maximun, timeframe): | ||
"""Check if the eps module is enable""" | ||
check_analysisd_event(callback=fr".*INFO: EPS limit enabled, EPS: '{maximun}', timeframe: '{timeframe}'", | ||
timeout=eps.T_10) | ||
|
||
|
||
def check_configuration_error(): | ||
"""Check the configuration error event in ossec.log""" | ||
check_analysisd_event(timeout=eps.T_60, callback=r".* \(\d+\): Configuration error at.*", | ||
error_message="Could not find the event 'Configuration error at 'etc/ossec.conf' " | ||
'in ossec.log', prefix=eps.MAILD_PREFIX) | ||
|
||
|
||
def get_words_from_file(words, filename): | ||
"""Get the words from file | ||
Args: | ||
wordss (str): Word to find in the file | ||
Returns: | ||
str: Line that match in file | ||
""" | ||
with open(filename, 'r') as file: | ||
for index, line in enumerate(file): | ||
# search string | ||
if words in line: | ||
return line | ||
|
||
|
||
def get_analysisd_state(word): | ||
"""Get the value of word in wazuh-analysisd.state | ||
Args: | ||
word (str): Word to find in the file | ||
""" | ||
line = get_words_from_file(word, ANALYSISD_STATE) | ||
return float(line.split("\'")[1::2][0]) | ||
|
||
|
||
def get_alert_timestamp(start_log, end_log): | ||
"""Get the timestamp of the alert if exist in the alerts.log file between two string | ||
Args: | ||
start_log (str): Start message to find | ||
end_log (str): End message to find | ||
""" | ||
with open(ALERT_LOGS_PATH, 'r') as file: | ||
str_file = file.read() | ||
index1 = str_file.find(end_log) | ||
index2 = str_file[0: index1].rfind(start_log) | ||
str_alert = str_file[index2: index1] | ||
timestamp = str_alert[str_alert.find(start_log) + len(start_log):str_alert.find(': ')] | ||
|
||
return datetime.fromtimestamp(float(timestamp)).strftime('%Y-%m-%d %H:%M:%S') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.