Skip to content

Commit

Permalink
MultiPart upload since cgi.FieldStorage not available in Python 3.13
Browse files Browse the repository at this point in the history
Based on the work in bottlepy#1438
Signed-off-by: Oz Tiram <[email protected]>
  • Loading branch information
oz123 committed May 25, 2024
1 parent 2f11bc8 commit 9861d04
Show file tree
Hide file tree
Showing 4 changed files with 548 additions and 57 deletions.
1 change: 0 additions & 1 deletion test/test_environ.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,6 @@ def test_multipart(self):
self.assertEqual('value1', request.POST['field1'])
self.assertTrue('field1' not in request.files)
self.assertEqual('value1', request.forms['field1'])
print(request.forms.dict, request.forms.recode_unicode)
self.assertEqual('万难', request.forms['field2'])
self.assertEqual(touni('万难'), request.forms.field2)
# Field (multi)
Expand Down
257 changes: 257 additions & 0 deletions test/test_multipart.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
# -*- coding: utf-8 -*-
import unittest
import base64
import sys, os.path, tempfile
from io import BytesIO

import veilchen

class BaseMultipartTest(unittest.TestCase):
def setUp(self):
self.data = BytesIO()
self.parts = None

def write(self, *lines):
for line in lines:
self.data.write(veilchen.tob(line))

def parse(self, ctype=None, clen=-1, **kwargs):
self.data.seek(0)
h = veilchen._parse_http_header(ctype or "multipart/form-data; boundary=foo")
charset = h[0][1].get("charset", "utf8")
boundary = h[0][1].get("boundary")
parser = veilchen._MultipartParser(self.data, boundary, clen, **kwargs)
return list(parser.parse())

def assertFile(self, name, filename, ctype, data):
for part in self.parts:
if part.name != name: continue
self.assertEqual(part.filename, expected[0])
self.assertEqual(part.content_type, expected[1])
self.assertEqual(part.file.read(), veilchen.tob(expected[2]))
break
else:
self.fail("Field %s not found" % name)

def assertForm(self, name, data):
for part in self.parts:
if part.name != name: continue
self.assertEqual(part.filename, None)
self.assertEqual(part.content_type, None)
self.assertEqual(part.value, data)
break
else:
self.fail("Field %s not found" % name)


class TestHeaderParser(BaseMultipartTest):

def test_options_parser(self):
parse = veilchen._parse_http_header
self.assertEqual(
parse('form-data; name="Test"; filename="Test.txt"'),
[('form-data', {"name": "Test", "filename": "Test.txt"})])
self.assertEqual(parse('form-data; name="Test"; FileName="Te\\"st.txt"'),
[('form-data', {"name": "Test", "filename": "Te\"st.txt"})])
self.assertEqual(parse('form-data; name="Test"; filename="C:\\test\\bla.txt"'),
[('form-data', {"name": "Test", "filename": "C:\\test\\bla.txt"})])
self.assertEqual(parse('form-data; name="Test"; filename="\\\\test\\bla.txt"'),
[('form-data', {"name": "Test", "filename": "\\\\test\\bla.txt"})])


class TestMultipartParser(BaseMultipartTest):

def assertIterline(self, data, *expected, **options):
self.assertEqual(
list(veilchen._MultipartParser(BytesIO(veilchen.tob(data)), 'foo', **options)._lineiter()),
[(veilchen.tob(l), veilchen.tob(nl)) for l,nl in expected])

def test_iterlines(self):
self.assertIterline('abc\ndef\r\nghi', ('abc\ndef','\r\n'), ('ghi', ''))

def test_iterlines_limit(self):
self.assertIterline('abc\ndef\r\nghi', ('abc\ndef','\r\n'), ('g', ''), content_length=10)
self.assertIterline('abc\ndef\r\nghi', ('abc\ndef\r',''), content_length=8)

def test_fuzzy_lineiter(self):
""" Test all possible buffer sizes """
minbuflen = 9 # boundary size of '--foo--\r\n'
data = b'data\rdata\ndata\r\ndata\n\rdata\r\n'.replace(b'data', b'X'*minbuflen*2)
lines = data.split(b"\r\n")[:-1]
for tail in (b"", b"tail"):
for buffer_size in range(minbuflen, len(data+tail)+1):
splits = list(veilchen._MultipartParser(
BytesIO(data+tail), 'foo',
buffer_size=buffer_size)._lineiter())
partial = b""
merged = []
for part, nl in splits:
self.assertTrue(nl in (b"", b"\r\n"))
self.assertTrue(len(part) >= buffer_size or nl or part == tail)
partial += part
if nl:
merged.append(partial)
partial = b""
self.assertEqual(merged, lines)
self.assertEqual(tail, partial)

def test_big_file(self):
''' If the size of an uploaded part exceeds memfile_limit,
it is written to disk. '''
test_file = 'abc'*1024
boundary = '---------------------------186454651713519341951581030105'
request = BytesIO(veilchen.tob('\r\n').join(map(veilchen.tob,[
'--' + boundary,
'Content-Disposition: form-data; name="file1"; filename="random.png"',
'Content-Type: image/png', '', test_file, '--' + boundary,
'Content-Disposition: form-data; name="file2"; filename="random.png"',
'Content-Type: image/png', '', test_file + 'a', '--' + boundary,
'Content-Disposition: form-data; name="file3"; filename="random.png"',
'Content-Type: image/png', '', test_file*2, '--'+boundary+'--',''])))
parts = list(veilchen._MultipartParser(request, boundary, memfile_limit=len(test_file)).parse())
p = {p.name: p for p in parts}
try:
self.assertEqual(p.get('file1').file.read(), veilchen.tob(test_file))
self.assertTrue(p.get('file1').is_buffered())
self.assertEqual(p.get('file2').file.read(), veilchen.tob(test_file + 'a'))
self.assertFalse(p.get('file2').is_buffered())
self.assertEqual(p.get('file3').file.read(), veilchen.tob(test_file*2))
self.assertFalse(p.get('file3').is_buffered())
finally:
for part in parts:
part.close()

def test_file_seek(self):
''' The file object should be readable withoud a seek(0). '''
test_file = 'abc'*1024
boundary = '---------------------------186454651713519341951581030105'
request = BytesIO(veilchen.tob('\r\n').join(map(veilchen.tob,[
'--' + boundary,
'Content-Disposition: form-data; name="file1"; filename="random.png"',
'Content-Type: image/png', '', test_file, '--' + boundary + '--',''])))
p = list(veilchen._MultipartParser(request, boundary).parse())
self.assertEqual(p[0].file.read(), veilchen.tob(test_file))
self.assertEqual(p[0].value, test_file)

def test_unicode_value(self):
''' The .value property always returns unicode '''
test_file = 'abc'*1024
boundary = '---------------------------186454651713519341951581030105'
request = BytesIO(veilchen.tob('\r\n').join(map(veilchen.tob,[
'--' + boundary,
'Content-Disposition: form-data; name="file1"; filename="random.png"',
'Content-Type: image/png', '', test_file, '--' + boundary + '--',''])))
p = list(veilchen._MultipartParser(request, boundary).parse())
self.assertEqual(p[0].file.read(), veilchen.tob(test_file))
self.assertEqual(p[0].value, test_file)
self.assertTrue(hasattr(p[0].value, 'encode'))

def test_multiline_header(self):
''' HTTP allows headers to be multiline. '''
test_file = veilchen.tob('abc'*1024)
test_text = u'Test text\n with\r\n ümläuts!'
boundary = '---------------------------186454651713519341951581030105'
request = BytesIO(veilchen.tob('\r\n').join(map(veilchen.tob,[
'--' + boundary,
'Content-Disposition: form-data;',
'\tname="file1"; filename="random.png"',
'Content-Type: image/png', '', test_file, '--' + boundary,
'Content-Disposition: form-data;',
' name="text"', '', test_text,
'--' + boundary + '--',''])))
p = list(veilchen._MultipartParser(request, boundary, charset='utf8').parse())
self.assertEqual(p[0].name, "file1")
self.assertEqual(p[0].file.read(), test_file)
self.assertEqual(p[0].filename, 'random.png')
self.assertEqual(p[1].name, "text")
self.assertEqual(p[1].value, test_text)


class TestBrokenMultipart(BaseMultipartTest):

def assertMPError(self, **ka):
self.assertRaises(veilchen.MultipartError, self.parse, **ka)

def test_big_boundary(self):
self.assertMPError(buffer_size=1024*3)

def test_missing_content_type(self):
self.assertMPError(ctype="")

def test_unsupported_content_type(self):
self.assertMPError(ctype='multipart/fantasy')

def test_missing_boundary(self):
self.assertMPError(ctype="multipart/form-data")

def test_no_terminator(self):
self.write('--foo\r\n',
'Content-Disposition: form-data; name="file1"; filename="random.png"\r\n',
'Content-Type: image/png\r\n', '\r\n', 'abc')
self.assertMPError()

def test_no_newline_after_content(self):
self.write('--foo\r\n',
'Content-Disposition: form-data; name="file1"; filename="random.png"\r\n',
'Content-Type: image/png\r\n', '\r\n', 'abc', '--foo--')
self.assertMPError()

def test_no_newline_after_middle_content(self):
self.write('--foo\r\n',
'Content-Disposition: form-data; name="file1"; filename="random.png"\r\n',
'Content-Type: image/png\r\n', '\r\n', 'abc', '--foo\r\n'
'Content-Disposition: form-data; name="file2"; filename="random.png"\r\n',
'Content-Type: image/png\r\n', '\r\n', 'abc\r\n', '--foo--')
parts = self.parse()
self.assertEqual(len(parts), 1)
self.assertTrue('name="file2"' in parts[0].value)

def test_preamble_before_start_boundary(self):
parts = self.write('Preamble\r\n', '--foo\r\n'
'Content-Disposition: form-data; name="file1"; filename="random.png"\r\n',
'Content-Type: image/png\r\n', '\r\n', 'abc\r\n', '--foo--')
parts = self.parse()
self.assertEqual(parts[0].file.read(), veilchen.tob('abc'))
self.assertEqual(parts[0].filename, 'random.png')
self.assertEqual(parts[0].name, 'file1')
self.assertEqual(parts[0].content_type, 'image/png')

def test_no_start_boundary(self):
self.write('--bar\r\n','--nonsense\r\n'
'Content-Disposition: form-data; name="file1"; filename="random.png"\r\n',
'Content-Type: image/png\r\n', '\r\n', 'abc\r\n', '--nonsense--')
self.assertMPError()

def test_disk_limit(self):
self.write('--foo\r\n',
'Content-Disposition: form-data; name="file1"; filename="random.png"\r\n',
'Content-Type: image/png\r\n', '\r\n', 'abc'*1024+'\r\n', '--foo--')
self.assertMPError(memfile_limit=0, disk_limit=1024)

def test_mem_limit(self):
self.write('--foo\r\n',
'Content-Disposition: form-data; name="file1"; filename="random.png"\r\n',
'Content-Type: image/png\r\n', '\r\n', 'abc'*1024+'\r\n', '--foo\r\n',
'Content-Disposition: form-data; name="file2"; filename="random.png"\r\n',
'Content-Type: image/png\r\n', '\r\n', 'abc'*1024+'\r\n', '--foo--')
self.assertMPError(mem_limit=1024*3)

def test_invalid_header(self):
self.write('--foo\r\n',
'Content-Disposition: form-data; name="file1"; filename="random.png"\r\n',
'Content-Type: image/png\r\n',
'Bad header\r\n', '\r\n', 'abc'*1024+'\r\n', '--foo--')
self.assertMPError()

def test_content_length_to_small(self):
self.write('--foo\r\n',
'Content-Disposition: form-data; name="file1"; filename="random.png"\r\n',
'Content-Type: image/png\r\n',
'Content-Length: 111\r\n', '\r\n', 'abc'*1024+'\r\n', '--foo--')
self.assertMPError()

def test_no_disposition_header(self):
self.write('--foo\r\n',
'Content-Type: image/png\r\n', '\r\n', 'abc'*1024+'\r\n', '--foo--')
self.assertMPError()

16 changes: 8 additions & 8 deletions test/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,17 @@ def multipart_environ(fields, files):
boundary = '--' + boundary
body = ''
for name, value in fields:
body += boundary + '\n'
body += 'Content-Disposition: form-data; name="%s"\n\n' % name
body += value + '\n'
body += boundary + '\r\n'
body += 'Content-Disposition: form-data; name="%s"\r\n\r\n' % name
body += value + '\r\n'
for name, filename, content in files:
mimetype = str(mimetypes.guess_type(filename)[0]) or 'application/octet-stream'
body += boundary + '\n'
body += 'Content-Disposition: file; name="%s"; filename="%s"\n' % \
body += boundary + '\r\n'
body += 'Content-Disposition: file; name="%s"; filename="%s"\r\n' % \
(name, filename)
body += 'Content-Type: %s\n\n' % mimetype
body += content + '\n'
body += boundary + '--\n'
body += 'Content-Type: %s\r\n\r\n' % mimetype
body += content + '\r\n'
body += boundary + '--\r\n'
if isinstance(body, str):
body = body.encode('utf8')
env['CONTENT_LENGTH'] = str(len(body))
Expand Down
Loading

0 comments on commit 9861d04

Please sign in to comment.