-
Notifications
You must be signed in to change notification settings - Fork 295
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
JinjaExpert - modify message fields using jinja2
This commit adds a simple jinja expert bot, that lets you modify message data using jinja2 templates. It add the relevant documentation to the bots file and also a couple of unit tests.
- Loading branch information
Birger Schacht
committed
Aug 23, 2021
1 parent
c843349
commit 91aa829
Showing
5 changed files
with
167 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
# SPDX-FileCopyrightText: 2021 Birger Schacht | ||
# | ||
# SPDX-License-Identifier: AGPL-3.0-or-later | ||
|
||
from intelmq.lib.bot import Bot | ||
from intelmq.lib.exceptions import MissingDependencyError | ||
|
||
import pathlib | ||
import os | ||
from typing import Union, Dict | ||
|
||
try: | ||
from jinja2 import Template, TemplateError | ||
except ImportError: | ||
Template = None | ||
|
||
|
||
class JinjaExpertBot(Bot): | ||
""" | ||
Modify the message using the Jinja templating engine | ||
Example: | ||
fields: | ||
output: The provider is {{ msg['feed.provider'] }}! | ||
feed.url: "{{ msg['feed.url'] | upper }}" | ||
extra.somejinjaoutput: file:///etc/intelmq/somejinjatemplate.j2 | ||
""" | ||
|
||
fields: Dict[str, Union[str, Template]] = {} | ||
overwrite: bool = False | ||
|
||
def init(self): | ||
if not Template: | ||
raise MissingDependencyError("Library 'jinja2' is required, please install it.") | ||
|
||
for field, template in self.fields.items(): | ||
if template.startswith("file:///"): | ||
templatefile = pathlib.Path(template[7:]) | ||
if templatefile.exists() and os.access(templatefile, os.R_OK): | ||
self.fields[field] = templatefile.read_text() | ||
else: | ||
raise ValueError(f"Jinja Template {templatefile} does not exist or is not readable.") | ||
|
||
for field, template in self.fields.items(): | ||
try: | ||
self.fields[field] = Template(template) | ||
except TemplateError as msg: | ||
raise ValueError(f"Error parsing Jinja Template for '{field}': {msg}") | ||
|
||
def process(self): | ||
msg = self.receive_message() | ||
|
||
for field, template in self.fields.items(): | ||
msg.add(field, template.render(msg=msg), overwrite=self.overwrite) | ||
|
||
self.send_message(msg) | ||
self.acknowledge_message() | ||
|
||
|
||
BOT = JinjaExpertBot |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
{ | ||
"@timestamp": {% if msg['time.source'] %}{{ msg['time.source'] }}{% else %}{{ msg['time.observation'] }}{% endif %}, | ||
{%- set fields = { 'event.provider': 'feed.provider', 'server.ip': 'source.ip', 'server.domain': 'source.fqdn', 'event.dataset': 'feed.name' } -%} | ||
{% for key, value in fields.items() %} | ||
{% if msg[value] %} "{{ key }}": {{ msg[value] }}{% if not loop.last %},{% endif %}{% endif %} | ||
{%- endfor %} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
SPDX-FileCopyrightText: 2021 Birger Schacht | ||
SPDX-License-Identifier: AGPL-3.0-or-later |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
# SPDX-FileCopyrightText: 2021 Birger Schacht | ||
# | ||
# SPDX-License-Identifier: AGPL-3.0-or-later | ||
""" | ||
Testing jinja expert | ||
""" | ||
|
||
import unittest | ||
import os | ||
|
||
import pkg_resources | ||
|
||
import intelmq.lib.test as test | ||
from intelmq.bots.experts.jinja.expert import JinjaExpertBot | ||
|
||
EXAMPLE_INPUT = {"__type": "Event", | ||
"source.ip": "192.168.0.1", | ||
"destination.ip": "192.0.43.8", | ||
"time.observation": "2015-01-01T00:00:00+00:00", | ||
"feed.url": "https://cert.at", | ||
} | ||
EXAMPLE_OUTPUT1 = {"__type": "Event", | ||
"source.ip": "192.168.0.1", | ||
"destination.ip": "192.0.43.8", | ||
"time.observation": "2015-01-01T00:00:00+00:00", | ||
"feed.url": "HTTPS://CERT.AT", | ||
} | ||
EXAMPLE_OUTPUT2 = {"__type": "Event", | ||
"source.ip": "192.168.0.1", | ||
"destination.ip": "192.0.43.8", | ||
"time.observation": "2015-01-01T00:00:00+00:00", | ||
"extra.some_text": "Hello World, this is the destination ip: 192.0.43.8! And this is the source ip: 192.168.0.1!", | ||
"feed.url": "https://cert.at", | ||
} | ||
EXAMPLE_OUTPUT3 = {"__type": "Event", | ||
"source.ip": "192.168.0.1", | ||
"destination.ip": "192.0.43.8", | ||
"time.observation": "2015-01-01T00:00:00+00:00", | ||
"extra.some_text": '{\n "@timestamp": 2015-01-01T00:00:00+00:00,\n\n "server.ip": 192.168.0.1,\n\n\n}', | ||
"feed.url": "https://cert.at", | ||
} | ||
|
||
|
||
class TestJinjaExpertBot(test.BotTestCase, unittest.TestCase): | ||
""" | ||
A TestCase for JinjaExpertBot. | ||
""" | ||
|
||
@classmethod | ||
def set_bot(cls): | ||
cls.bot_reference = JinjaExpertBot | ||
|
||
def test_jinja1(self): | ||
self.sysconfig = {'fields': { 'feed.url': "{{ msg['feed.url'] | upper }}" } } | ||
self.input_message = EXAMPLE_INPUT | ||
self.run_bot() | ||
self.assertMessageEqual(0, EXAMPLE_OUTPUT1) | ||
|
||
def test_jinja2(self): | ||
self.sysconfig = {'fields': { 'extra.some_text': "Hello World, this is the destination ip: {{ msg['destination.ip'] }}! And this is the source ip: {{ msg['source.ip'] }}!" } } | ||
self.input_message = EXAMPLE_INPUT | ||
self.run_bot() | ||
self.assertMessageEqual(0, EXAMPLE_OUTPUT2) | ||
|
||
def test_jinja_file1(self): | ||
self.sysconfig = {'fields': { 'extra.some_text': "file:///" + os.path.join(os.path.dirname(__file__)) + "/ecs.j2" } } | ||
self.input_message = EXAMPLE_INPUT | ||
self.run_bot() | ||
self.assertMessageEqual(0, EXAMPLE_OUTPUT3) | ||
|
||
if __name__ == '__main__': # pragma: no cover | ||
unittest.main() |