Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

This Code Provides a possibility to split large Reports into Chunks #680

Merged
9 commits merged into from
May 10, 2017
4 changes: 4 additions & 0 deletions intelmq/bots/BOTS
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
"description": "Fileinput collector fetches data from a file.",
"module": "intelmq.bots.collectors.file.collector_file",
"parameters": {
"chunk_replicate_header": true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctly if I'm wrong but I think we need to replicate this solution to all collectors, right?

Copy link
Contributor Author

@dmth dmth Jan 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As written in #680 (comment) not every collector might be capable of splitting the data. It depends on the data-format if it can be split.
CSV or some Blocklists can be split this way, whilst XML, JSON, Binary cannot!

The solution can be extended to the collectors which are

  1. known to be capable of handling this data
  2. Currently collecting splitable data in some use-cases.

Candidates are:
HTTP, Mail-Url, Mail-Attachment, File, FTP/FTPs (?)

"chunk_size": null,
"delete_file": false,
"feed": "FileCollector",
"provider": "",
Expand Down Expand Up @@ -33,6 +35,8 @@
"description": "Monitor IMAP mailboxes and fetch files from URLs contained in mail bodies",
"module": "intelmq.bots.collectors.mail.collector_mail_url",
"parameters": {
"chunk_replicate_header": true,
"chunk_size": null,
"feed": "",
"provider": "",
"folder": "INBOX",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,27 @@ events. In combination with the Generic CSV Parser this should work great.

## Parameters:

You can set four parameters:
You need to set these four parameters:
1. `path`: The path were your files are stored
2. `postfix`: The postfix or a File-Extension of your file (e.g. `.csv`)
3. `delete_file`: If this parameter is not empty, the found files will be deleted.
4. `feed`: The name of the feed.

## Chunking:

Additionally, for line-based inputs the bot can split up large reports into
smaller chunks.

This is particularly important for setups that use Redis as a message queue
which has a per-message size limitation of 512 MB.

To configure chunking, set `chunk_size` to a value in bytes.
`chunk_replicate_header` determines whether the header line should be repeated
for each chunk that is passed on to a parser bot.

Specifically, to configure a large file input to work around Redis' size
limitation set `chunk_size` to something like `384000`, i.e., ~384 MB.

## Workflow:

The bot loops over all files in `path` and tests if their filename matches
Expand Down
13 changes: 8 additions & 5 deletions intelmq/bots/collectors/file/collector_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import intelmq.lib.exceptions as exceptions
from intelmq.lib.bot import CollectorBot
from intelmq.lib.message import Report
from intelmq.lib.splitreports import generate_reports


class FileCollectorBot(CollectorBot):
Expand Down Expand Up @@ -53,12 +55,13 @@ def process(self):
if fnmatch.fnmatch(f, '*' + self.parameters.postfix):
self.logger.info("Processing file %r." % filename)

with open(filename, 'r') as f:
template = Report()
template.add("feed.url", "file://localhost%s" % filename)

report = self.new_report()
report.add("raw", f.read())
report.add("feed.url", "file://localhost%s" % filename)
self.send_message(report)
with open(filename, 'rb') as f:
for report in generate_reports(template, f, self.parameters.chunk_size,
self.parameters.chunk_replicate_header):
self.send_message(report)

if self.parameters.delete_file:
try:
Expand Down
25 changes: 25 additions & 0 deletions intelmq/bots/collectors/mail/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Mail collector bots
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove all README.md files on this pull request and add the documentation text to the following location:
https:/certtools/intelmq/blob/master/docs/Bots.md#collectors


This file should contain the documentation for:

* Generic Mail Attachment Fetcher
* Generic Mail URL Fetcher

Currently, no documentation is available.


## Generic Mail URL Fetcher

### Chunking

For line-based inputs the bot can split up large reports into smaller chunks.

This is particularly important for setups that use Redis as a message queue
which has a per-message size limitation of 512 MB.

To configure chunking, set `chunk_size` to a value in bytes.
`chunk_replicate_header` determines whether the header line should be repeated
for each chunk that is passed on to a parser bot.

Specifically, to configure a large file input to work around Redis' size
limitation set `chunk_size` to something like `384000000`, i.e., ~384 MB.
18 changes: 12 additions & 6 deletions intelmq/bots/collectors/mail/collector_mail_url.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
# -*- coding: utf-8 -*-

import re

import io
import requests

from intelmq.lib.bot import CollectorBot

try:
import imbox
except ImportError:
imbox = None

from intelmq.lib.bot import CollectorBot
from intelmq.lib.message import Report
from intelmq.lib.splitreports import generate_reports




class MailURLCollectorBot(CollectorBot):

Expand Down Expand Up @@ -58,9 +62,11 @@ def process(self):

self.logger.info("Report downloaded.")

report = self.new_report()
report.add("raw", resp.content)
self.send_message(report)
template = Report()

for report in generate_reports(template, io.BytesIO(resp.content), self.parameters.chunk_size,
self.parameters.chunk_replicate_header):
self.send_message(report)

# Only mark read if message relevant to this instance,
# so other instances watching this mailbox will still
Expand Down
134 changes: 134 additions & 0 deletions intelmq/lib/splitreports.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# -*- coding: utf-8 -*-
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Support for splitting large raw reports into smaller ones.

The main intention of this module is to help work around limitations in
Redis which limits strings to 512MB. Collector bots can use the
functions in this module to split the incoming data into smaller pieces
which can be sent as separate reports.

Collectors usually don't really know anything about the data they
collect, so the data cannot be reliably split into pieces in all cases.
This module can be used for those cases, though, where users know that
the data is actually a line-based format and can easily be split into
pieces as newline characters. For this to work, some assumptions are
made:

- The data can be split at any newline character

This would not work, for e.g. a CSV based formats which allow
newlines in values as long as they're within quotes.

- The lines are much shorter than the maximum chunk size

Obviously, if this condition does not hold, it may not be possible to
split the data into small enough chunks at newline characters.

Other considerations:

- To accommodate CSV formats, the code can optionally replicate the
first line of the file at the start of all chunks.

- The redis limit applies to the entire IntelMQ report, not just the
raw data. The report has some meta data in addition to the raw data
and the raw data is encoded as base64 in the report. The maximum
chunk size must take this into account, but multiplying the actual
limit by 3/4 and subtracting a generous amount for the meta data.
"""


def split_chunks(chunk, chunk_size):
"""Split a bytestring into chunk_size pieces at ASCII newlines characters.

The return value is a list of bytestring objects. Appending all of
them yields a bytestring equal to the input string. All items in the
list except the last item end in newline. The items are shorter than
chunk_size if possible, but may be longer if the input data has
places where the distance between two neline characters is too long.

Note in particular, that the last item may not end in a newline!
"""
chunks = []

while len(chunk) > chunk_size:
newline_pos = chunk.rfind(b"\n", 0, chunk_size)
if newline_pos == -1:
# no newline available to make chunk smaller than
# chunk_size. Search forward to get a minimum chunk longer
# than chunk_size
newline_pos = chunk.find(b"\n", chunk_size)

if newline_pos == -1:
# no newline in chunk, so this is a leftover that may have
# to be combined with the next data read.
chunks.append(chunk)
chunk = b""
else:
split_pos = newline_pos + 1
chunks.append(chunk[:split_pos])
chunk = chunk[split_pos:]
if chunk:
chunks.append(chunk)

return chunks


def read_delimited_chunks(infile, chunk_size):
"""Yield the contents of infile in chunk_size pieces ending at newlines.
The individual pieces, except for the last one, end in newlines and
are smaller than chunk_size if possible.
"""
leftover = b""

while True:
new_chunk = infile.read(chunk_size)
chunks = split_chunks(leftover + new_chunk, chunk_size)
leftover = b""
# the last item in chunks has to be combined with the next chunk
# read from the file because it may not actually stop at a
# newline and to avoid very small chunks.
if chunks:
leftover = chunks[-1]
chunks = chunks[:-1]
for chunk in chunks:
yield chunk

if not new_chunk:
if leftover:
yield leftover
break


def generate_reports(report_template, infile, chunk_size, copy_header_line):
"""Generate reports from a template and input file, optionally split into chunks.

If chunk_size is None, a single report is generated with the entire
contents of infile as the raw data. Otherwise chunk_size should be
an integer giving the maximum number of bytes in a chunk. The data
read from infile is then split into chunks of this size at newline
characters (see read_delimited_chunks). For each of the chunks, this
function yields a copy of the report_template with that chunk as the
value of the raw attribute.

When splitting the data into chunks, if copy_header_line is true,
the first line the file is read before chunking and then prepended
to each of the chunks. This is particularly useful when splitting
CSV files.

The infile should be a file-like object. generate_reports uses only
two methods, readline and read, with readline only called once and
only if copy_header_line is true. Both methods should return bytes
objects.
"""
if chunk_size is None:
report = report_template.copy()
report.add("raw", infile.read(), force=True)
yield report
else:
header = b""
if copy_header_line:
header = infile.readline()
for chunk in read_delimited_chunks(infile, chunk_size):
report = report_template.copy()
report.add("raw", header + chunk, force=True)
yield report
2 changes: 2 additions & 0 deletions intelmq/tests/bots/collectors/file/test_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ def set_bot(cls):
'postfix': '.txt',
'delete_file': False,
'feed': 'Example feed',
'chunk_size': None,
'chunk_replicate_header': True,
}
cls.default_input_message = {'__type': 'Event'}

Expand Down
Loading