Skip to content

Commit

Permalink
Retry some http replication failures (matrix-org#12182)
Browse files Browse the repository at this point in the history
This allows for the target process to be down for around a minute
which provides time for restarts during synapse upgrades/config updates.

Closes: matrix-org#12178

Signed off by Nick Mills-Barrett [email protected]
  • Loading branch information
Fizzadar committed Mar 9, 2022
1 parent d866ab2 commit 323c4ca
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 11 deletions.
1 change: 1 addition & 0 deletions changelog.d/12182.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Retry HTTP replication failures, this should prevent 502's when restarting stateful workers (main, event persisters, stream writers). Contributed by Nick @ Beeper.
47 changes: 36 additions & 11 deletions synapse/replication/http/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from prometheus_client import Counter, Gauge

from twisted.internet.error import ConnectError, DNSLookupError
from twisted.web.server import Request

from synapse.api.errors import HttpResponseException, SynapseError
Expand Down Expand Up @@ -87,13 +88,19 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
`_handle_request` must return a Deferred.
RETRY_ON_TIMEOUT(bool): Whether or not to retry the request when a 504
is received.
RETRY_ON_CONNECT_ERROR (bool): Whether or not to retry the request when
a connection error is received.
RETRY_ON_CONNECT_ERROR_ATTEMPTS (int): Number of attempts to retry when
receiving connection errors, each will backoff exponentially longer.
"""

NAME: str = abc.abstractproperty() # type: ignore
PATH_ARGS: Tuple[str, ...] = abc.abstractproperty() # type: ignore
METHOD = "POST"
CACHE = True
RETRY_ON_TIMEOUT = True
RETRY_ON_CONNECT_ERROR = True
RETRY_ON_CONNECT_ERROR_ATTEMPTS = 5 # =63s (2^6-1)

def __init__(self, hs: "HomeServer"):
if self.CACHE:
Expand Down Expand Up @@ -236,30 +243,48 @@ async def send_request(*, instance_name: str = "master", **kwargs: Any) -> Any:
"/".join(url_args),
)

headers: Dict[bytes, List[bytes]] = {}
# Add an authorization header, if configured.
if replication_secret:
headers[b"Authorization"] = [b"Bearer " + replication_secret]
opentracing.inject_header_dict(headers, check_destination=False)

try:
# Keep track of attempts made so we can bail if we don't manage to
# connect to the target after N tries.
attempts = 0
# We keep retrying the same request for timeouts. This is so that we
# have a good idea that the request has either succeeded or failed
# on the master, and so whether we should clean up or not.
while True:
headers: Dict[bytes, List[bytes]] = {}
# Add an authorization header, if configured.
if replication_secret:
headers[b"Authorization"] = [
b"Bearer " + replication_secret
]
opentracing.inject_header_dict(headers, check_destination=False)
try:
result = await request_func(uri, data, headers=headers)
break
except RequestTimedOutError:
if not cls.RETRY_ON_TIMEOUT:
raise

logger.warning("%s request timed out; retrying", cls.NAME)
logger.warning("%s request timed out; retrying", cls.NAME)

# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
await clock.sleep(1)
except (ConnectError, DNSLookupError) as e:
if not cls.RETRY_ON_CONNECT_ERROR:
raise
if attempts > cls.RETRY_ON_CONNECT_ERROR_ATTEMPTS:
raise

delay = 2 ** attempts
logger.warning(
"%s request connection failed; retrying in %ds: %r",
cls.NAME,
delay,
e,
)

# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
await clock.sleep(1)
await clock.sleep(delay)
attempts += 1
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the main process that we should send to the client. (And
Expand Down

0 comments on commit 323c4ca

Please sign in to comment.