From 3bc8df6e8db0da0eab483ab26633e391fab18219 Mon Sep 17 00:00:00 2001 From: terrancedejesus Date: Mon, 8 Jan 2024 12:16:53 -0500 Subject: [PATCH] example #1 - post dict conversion; date fields in meta schema; ISO 8601 validation on meta schema --- detection_rules/rule.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/detection_rules/rule.py b/detection_rules/rule.py index 456363a7936..35e18b81e72 100644 --- a/detection_rules/rule.py +++ b/detection_rules/rule.py @@ -16,6 +16,7 @@ from uuid import uuid4 import eql +from datetime import datetime from semver import Version from marko.block import Document as MarkoDocument from marko.ext.gfm import gfm @@ -73,6 +74,22 @@ def get_validation_stack_versions(self) -> Dict[str, dict]: stack_versions = get_stack_schemas(self.min_stack_version) return stack_versions + @validates_schema + def validate_date_format(self, data, **kwargs): + """Validate that the date fields are in the correct ISO 8601 format.""" + invalid_fields = [] + + for field, value in data.items(): + if field.endswith('_date') and value: + try: + datetime.strptime(value, '%Y-%m-%d') + except ValueError: + invalid_fields.append(field) + + if invalid_fields: + raise ValidationError( + f"Invalid date format for {', '.join(invalid_fields)}. Please use ISO 8601 format." + ) @dataclass(frozen=True) class RuleTransform(MarshmallowDataclassMixin): @@ -985,6 +1002,9 @@ def _post_dict_conversion(self, obj: dict) -> dict: # rule type transforms self.data.transform(obj) if hasattr(self.data, 'transform') else False + # rule dates + self._convert_add_date_fields(obj, self.metadata.to_dict()) + return obj def _convert_add_related_integrations(self, obj: dict) -> None: @@ -1089,6 +1109,12 @@ def _convert_get_setup_content(self, note_tree: list) -> str: return "".join(setup).strip() + def _convert_add_date_fields(self, obj: dict, metadata: dict) -> None: + """Add metadata date fields to the obj.""" + for field_name in ["creation_date", "updated_date"]: + if field_name not in obj: + obj.setdefault(field_name, metadata[field_name]) + def check_explicit_restricted_field_version(self, field_name: str) -> bool: """Explicitly check restricted fields against global min and max versions.""" min_stack, max_stack = BUILD_FIELD_VERSIONS[field_name]