-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
1,084 additions
and
102 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
249 changes: 249 additions & 0 deletions
249
src/wazuh_qa_framework/generic_modules/tools/configuration.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,249 @@ | ||
import xml.etree.ElementTree as ET | ||
import yaml | ||
from typing import List | ||
|
||
|
||
unlimited_sections = ['localfile', 'command', 'active-response'] | ||
xml_configuration_files = ['ossec.conf', 'agent.conf'] | ||
|
||
|
||
# customize _serialize_xml to avoid lexicographical order in XML attributes | ||
def _serialize_xml(write, elem, qnames, namespaces, | ||
short_empty_elements, **kwargs): | ||
tag = elem.tag | ||
text = elem.text | ||
if tag is ET.Comment: | ||
write("<!--%s-->" % text) | ||
elif tag is ET.ProcessingInstruction: | ||
write("<?%s?>" % text) | ||
else: | ||
tag = qnames[tag] | ||
if tag is None: | ||
if text: | ||
write(ET._escape_cdata(text)) | ||
for e in elem: | ||
_serialize_xml(write, e, qnames, None, | ||
short_empty_elements=short_empty_elements) | ||
else: | ||
write("<" + tag) | ||
items = list(elem.items()) | ||
if items or namespaces: | ||
if namespaces: | ||
for v, k in sorted(namespaces.items(), | ||
key=lambda x: x[1]): # sort on prefix | ||
if k: | ||
k = ":" + k | ||
write(" xmlns%s=\"%s\"" % ( | ||
k, | ||
ET._escape_attrib(v) | ||
)) | ||
for k, v in items: # avoid lexicographical order for XML attributes | ||
if isinstance(k, ET.QName): | ||
k = k.text | ||
if isinstance(v, ET.QName): | ||
v = qnames[v.text] | ||
else: | ||
v = ET._escape_attrib(v) | ||
write(" %s=\"%s\"" % (qnames[k], v)) | ||
if text or len(elem) or not short_empty_elements: | ||
write(">") | ||
if text: | ||
write(ET._escape_cdata(text)) | ||
for e in elem: | ||
_serialize_xml(write, e, qnames, None, | ||
short_empty_elements=short_empty_elements) | ||
write("</" + tag + ">") | ||
else: | ||
write(" />") | ||
if elem.tail: | ||
write(ET._escape_cdata(elem.tail)) | ||
|
||
|
||
def set_section_wazuh_conf(sections, template=None): | ||
""" | ||
Set a configuration in a section of Wazuh. It replaces the content if it exists. | ||
Args: | ||
sections (list): List of dicts with section and new elements | ||
section (str, optional): Section of Wazuh configuration to replace. Default `'syscheck'` | ||
new_elements (list, optional) : List with dictionaries for settings elements in the section. Default `None` | ||
template (list of string, optional): File content template | ||
Returns: | ||
List of str: List of str with the custom Wazuh configuration. | ||
""" | ||
|
||
def create_elements(section: ET.Element, elements: List): | ||
""" | ||
Insert new elements in a Wazuh configuration section. | ||
Args: | ||
section (ET.Element): Section where the element will be inserted. | ||
elements (list): List with the new elements to be inserted. | ||
Returns: | ||
ET.ElementTree: Modified Wazuh configuration. | ||
""" | ||
tag = None | ||
for element in elements: | ||
for tag_name, properties in element.items(): | ||
tag = ET.SubElement(section, tag_name) | ||
new_elements = properties.get('elements') | ||
attributes = properties.get('attributes') | ||
if attributes is not None: | ||
for attribute in attributes: | ||
if isinstance(attribute, dict): # noqa: E501 | ||
for attr_name, attr_value in attribute.items(): | ||
tag.attrib[attr_name] = str(attr_value) | ||
if new_elements: | ||
create_elements(tag, new_elements) | ||
else: | ||
tag.text = str(properties.get('value')) | ||
attributes = properties.get('attributes') | ||
if attributes: | ||
for attribute in attributes: | ||
if attribute is not None and isinstance(attribute, dict): # noqa: E501 | ||
for attr_name, attr_value in attribute.items(): | ||
tag.attrib[attr_name] = str(attr_value) | ||
tag.tail = "\n " | ||
tag.tail = "\n " | ||
|
||
def purge_multiple_root_elements(str_list: List[str], root_delimeter: str = "</ossec_config>") -> List[str]: | ||
""" | ||
Remove from the list all the lines located after the root element ends. | ||
This operation is needed before attempting to convert the list to ElementTree because if the ossec.conf had more | ||
than one `<ossec_config>` element as root the conversion would fail. | ||
Args: | ||
str_list (list or str): The content of the ossec.conf file in a list of str. | ||
root_delimeter (str, optional: The expected string to identify when the first root element ends, | ||
by default "</ossec_config>" | ||
Returns: | ||
list of str : The first N lines of the specified str_list until the root_delimeter is found. The rest of | ||
the list will be ignored. | ||
""" | ||
line_counter = 0 | ||
for line in str_list: | ||
line_counter += 1 | ||
if root_delimeter in line: | ||
return str_list[0:line_counter] | ||
else: | ||
return str_list | ||
|
||
def to_element_tree(str_list: List[str], root_delimeter: str = "</ossec_config>") -> ET.ElementTree: | ||
""" | ||
Turn a list of str into an ElementTree object. | ||
As ElementTree does not support xml with more than one root element this function will parse the list first with | ||
`purge_multiple_root_elements` to ensure there is only one root element. | ||
Args: | ||
str_list (list of str): A list of strings with every line of the ossec conf. | ||
Returns: | ||
ElementTree: A ElementTree object with the data of the `str_list` | ||
""" | ||
str_list = purge_multiple_root_elements(str_list, root_delimeter) | ||
return ET.ElementTree(ET.fromstringlist(str_list)) | ||
|
||
def to_str_list(elementTree: ET.ElementTree) -> List[str]: | ||
""" | ||
Turn an ElementTree object into a list of str. | ||
Args: | ||
elementTree (ElementTree): A ElementTree object with all the data of the ossec.conf. | ||
Returns: | ||
(list of str): A list of str containing all the lines of the ossec.conf. | ||
""" | ||
return ET.tostringlist(elementTree.getroot(), encoding="unicode") | ||
|
||
def find_module_config(wazuh_conf: ET.ElementTree, section: str, attributes: List[dict]) -> ET.ElementTree: | ||
r""" | ||
Check if a certain configuration section exists in ossec.conf and returns the corresponding block if exists. | ||
(This extra function has been necessary to implement it to configure the wodle blocks, since they have the same | ||
section but different attributes). | ||
Args: | ||
wazuh_conf (ElementTree): An ElementTree object with all the data of the ossec.conf | ||
section (str): Name of the tag or configuration section to search for. For example: vulnerability_detector | ||
attributes (list of dict): List with section attributes. Needed to check if the section exists with all the | ||
searched attributes and values. For example (wodle section) [{'name': 'syscollector'}] | ||
Returns: | ||
ElementTree: An ElementTree object with the section data found in ossec.conf. None if nothing was found. | ||
""" | ||
if attributes is None: | ||
return wazuh_conf.find(section) | ||
else: | ||
attributes_query = ''.join([f"[@{attribute}='{value}']" for index, _ in enumerate(attributes) | ||
for attribute, value in attributes[index].items()]) | ||
query = f"{section}{attributes_query}" | ||
|
||
try: | ||
return wazuh_conf.find(query) | ||
except AttributeError: | ||
return None | ||
|
||
# Generate a ElementTree representation of the previous list to work with its sections | ||
root_delimeter = '</agent_config>' if '<agent_config>' in template else '</ossec_config>' | ||
wazuh_conf = to_element_tree(template, root_delimeter) | ||
for section in sections: | ||
attributes = section.get('attributes') | ||
section_conf = find_module_config(wazuh_conf, section['section'], attributes) | ||
# Create section if it does not exist, clean otherwise | ||
if not section_conf: | ||
section_conf = ET.SubElement(wazuh_conf.getroot(), section['section']) | ||
section_conf.text = '\n ' | ||
section_conf.tail = '\n\n ' | ||
else: | ||
# Add section if there is no limit | ||
if section_conf.tag in unlimited_sections: | ||
section_conf = ET.SubElement(wazuh_conf.getroot(), section['section']) | ||
section_conf.text = '\n ' | ||
section_conf.tail = '\n\n ' | ||
else: | ||
prev_text = section_conf.text | ||
prev_tail = section_conf.tail | ||
section_conf.clear() | ||
section_conf.text = prev_text | ||
section_conf.tail = prev_tail | ||
|
||
# Insert section attributes | ||
if attributes: | ||
for attribute in attributes: | ||
if attribute is not None and isinstance(attribute, dict): # noqa: E501 | ||
for attr_name, attr_value in attribute.items(): | ||
section_conf.attrib[attr_name] = str(attr_value) | ||
|
||
# Insert elements | ||
new_elements = section.get('elements', list()) | ||
if new_elements: | ||
create_elements(section_conf, new_elements) | ||
|
||
return to_str_list(wazuh_conf) | ||
|
||
|
||
def configure_local_internal_options(new_conf): | ||
local_internal_configuration_string = '' | ||
for option_name, option_value in new_conf.items(): | ||
local_internal_configuration_string += f"{str(option_name)}={str(option_value)}\n" | ||
return local_internal_configuration_string | ||
|
||
|
||
def configure_ossec_conf(new_conf, template): | ||
new_configuration = ''.join(set_section_wazuh_conf(new_conf, template)) | ||
return new_configuration | ||
|
||
|
||
def configure_api_yaml(new_conf): | ||
new_configuration = yaml.dump(new_conf) | ||
return new_configuration | ||
|
||
|
||
conf_functions = { | ||
'local_internal_options.conf': configure_local_internal_options, | ||
'ossec.conf': configure_ossec_conf, | ||
'agent.conf': configure_ossec_conf, | ||
'api.yaml': configure_api_yaml | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.