Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Encode JSON responses on a thread in C, mk2 #10905

Merged
merged 5 commits into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10905.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up responding with large JSON objects to requests.
72 changes: 57 additions & 15 deletions synapse/http/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import urllib
from http import HTTPStatus
from inspect import isawaitable
from io import BytesIO
from typing import (
Any,
Awaitable,
Expand All @@ -37,15 +36,15 @@
)

import jinja2
from canonicaljson import iterencode_canonical_json
from canonicaljson import encode_canonical_json
from typing_extensions import Protocol
from zope.interface import implementer

from twisted.internet import defer, interfaces
from twisted.python import failure
from twisted.web import resource
from twisted.web.server import NOT_DONE_YET, Request
from twisted.web.static import File, NoRangeStaticProducer
from twisted.web.static import File
from twisted.web.util import redirectTo

from synapse.api.errors import (
Expand All @@ -56,10 +55,11 @@
UnrecognizedRequestError,
)
from synapse.http.site import SynapseRequest
from synapse.logging.context import preserve_fn
from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background
from synapse.logging.opentracing import trace_servlet
from synapse.util import json_encoder
from synapse.util.caches import intern_dict
from synapse.util.iterutils import chunk_seq

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -620,12 +620,11 @@ def stopProducing(self) -> None:
self._request = None


def _encode_json_bytes(json_object: Any) -> Iterator[bytes]:
def _encode_json_bytes(json_object: Any) -> bytes:
"""
Encode an object into JSON. Returns an iterator of bytes.
"""
for chunk in json_encoder.iterencode(json_object):
yield chunk.encode("utf-8")
return json_encoder.encode(json_object).encode("utf-8")


def respond_with_json(
Expand Down Expand Up @@ -659,7 +658,7 @@ def respond_with_json(
return None

if canonical_json:
encoder = iterencode_canonical_json
encoder = encode_canonical_json
else:
encoder = _encode_json_bytes

Expand All @@ -670,7 +669,9 @@ def respond_with_json(
if send_cors:
set_cors_headers(request)

_ByteProducer(request, encoder(json_object))
run_in_background(
_async_write_json_to_request_in_thread, request, encoder, json_object
)
return NOT_DONE_YET


Expand Down Expand Up @@ -706,15 +707,56 @@ def respond_with_json_bytes(
if send_cors:
set_cors_headers(request)

# note that this is zero-copy (the bytesio shares a copy-on-write buffer with
# the original `bytes`).
bytes_io = BytesIO(json_bytes)

producer = NoRangeStaticProducer(request, bytes_io)
producer.start()
_write_bytes_to_request(request, json_bytes)
return NOT_DONE_YET


async def _async_write_json_to_request_in_thread(
request: SynapseRequest,
json_encoder: Callable[[Any], bytes],
json_object: Any,
):
"""Encodes the given JSON object on a thread and then writes it to the
request.

This is done so that encoding large JSON objects doesn't block the reactor
thread.

Note: We don't use JsonEncoder.iterencode here as that falls back to the
Python implementation (rather than the C backend), which is *much* more
expensive.
"""

json_str = await defer_to_thread(request.reactor, json_encoder, json_object)

_write_bytes_to_request(request, json_str)


def _write_bytes_to_request(request: Request, bytes_to_write: bytes) -> None:
"""Writes the bytes to the request using an appropriate producer.

Note: This should be used instead of `Request.write` to correctly handle
large response bodies.
"""

# The problem with dumping all of the response into the `Request` object at
# once (via `Request.write`) is that doing so starts the timeout for the
# next request to be received: so if it takes longer than 60s to stream back
# the response to the client, the client never gets it.
#
# The correct solution is to use a Producer; then the timeout is only
# started once all of the content is sent over the TCP connection.

# To make sure we don't write all of the bytes at once we split it up into
# chunks.
chunk_size = 4096
bytes_generator = chunk_seq(bytes_to_write, chunk_size)

# We use a `_ByteProducer` here rather than `NoRangeStaticProducer` as the
# unit tests can't cope with being given a pull producer.
_ByteProducer(request, bytes_generator)


def set_cors_headers(request: Request):
"""Set the CORS headers so that javascript running in a web browsers can
use this API
Expand Down
2 changes: 1 addition & 1 deletion synapse/push/emailpusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ async def _unsafe_process(self) -> None:

should_notify_at = max(notif_ready_at, room_ready_at)

if should_notify_at < self.clock.time_msec():
if should_notify_at <= self.clock.time_msec():
# one of our notifications is ready for sending, so we send
# *one* email updating the user on their notifications,
# we then consider all previously outstanding notifications
Expand Down
19 changes: 17 additions & 2 deletions synapse/util/iterutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,28 @@
Iterable,
Iterator,
Mapping,
Sequence,
Set,
Sized,
Tuple,
TypeVar,
)

from typing_extensions import Protocol

T = TypeVar("T")
S = TypeVar("S", bound="_SelfSlice")


class _SelfSlice(Sized, Protocol):
"""A helper protocol that matches types where taking a slice results in the
same type being returned.

This is more specific than `Sequence`, which allows another `Sequence` to be
returned.
"""

def __getitem__(self: S, i: slice) -> S:
...


def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]:
Expand All @@ -46,7 +61,7 @@ def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]:
return iter(lambda: tuple(islice(sourceiter, size)), ())


def chunk_seq(iseq: Sequence[T], maxlen: int) -> Iterable[Sequence[T]]:
def chunk_seq(iseq: S, maxlen: int) -> Iterator[S]:
"""Split the given sequence into chunks of the given size

The last chunk may be shorter than the given size.
Expand Down