From 0244ef3be3f8c7d0b832a84c9a195496ca635770 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 17 Nov 2021 13:56:36 +0000 Subject: [PATCH 01/15] Track ongoing event fetches correctly (again) The previous fix for the ongoing event fetches counter (8eec25a1d9d656905db18a2c62a5552e63db2667) was both insufficient and incorrect. When the database is unreachable, `_do_fetch` never gets run and so `_event_fetch_ongoing` is never decremented. The previous fix also moved the `_event_fetch_ongoing` decrement outside of the `_event_fetch_lock` which allowed race conditions to corrupt the counter. --- changelog.d/11376.bugfix | 1 + .../storage/databases/main/events_worker.py | 77 ++++++----- .../databases/main/test_events_worker.py | 120 +++++++++++++++++- 3 files changed, 168 insertions(+), 30 deletions(-) create mode 100644 changelog.d/11376.bugfix diff --git a/changelog.d/11376.bugfix b/changelog.d/11376.bugfix new file mode 100644 index 000000000000..639e48b59b0d --- /dev/null +++ b/changelog.d/11376.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where all requests that read events from the database could get stuck as a result of losing the database connection, for real this time. Also fix a race condition introduced in the previous insufficient fix in 1.47.0. diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index c6bf316d5bf3..31587e857116 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -602,7 +602,7 @@ async def _get_events_from_cache_or_db( # already due to `_get_events_from_db`). fetching_deferred: ObservableDeferred[ Dict[str, _EventCacheEntry] - ] = ObservableDeferred(defer.Deferred()) + ] = ObservableDeferred(defer.Deferred(), consumeErrors=True) for event_id in missing_events_ids: self._current_event_fetches[event_id] = fetching_deferred @@ -736,35 +736,56 @@ async def get_stripped_room_state_from_event_context( for e in state_to_include.values() ] - def _do_fetch(self, conn: Connection) -> None: + async def _do_fetch(self) -> None: + """Services requests for events from the `_event_fetch_list` queue.""" + try: + await self.db_pool.runWithConnection(self._do_fetch_txn) + except BaseException as e: + with self._event_fetch_lock: + self._event_fetch_ongoing -= 1 + event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) + + if self._event_fetch_ongoing == 0 and self._event_fetch_list: + # We are the last remaining fetcher and we have just failed. + # Fail any outstanding event fetches, since no one else will process + # them. + failed_event_list = self._event_fetch_list + self._event_fetch_list = [] + else: + failed_event_list = [] + + for _, deferred in failed_event_list: + if not deferred.called: + with PreserveLoggingContext(): + deferred.errback(e) + + def _do_fetch_txn(self, conn: Connection) -> None: """Takes a database connection and waits for requests for events from the _event_fetch_list queue. """ - try: - i = 0 - while True: - with self._event_fetch_lock: - event_list = self._event_fetch_list - self._event_fetch_list = [] + i = 0 + while True: + with self._event_fetch_lock: + event_list = self._event_fetch_list + self._event_fetch_list = [] + + if not event_list: + single_threaded = self.database_engine.single_threaded + if ( + not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING + or single_threaded + or i > EVENT_QUEUE_ITERATIONS + ): + self._event_fetch_ongoing -= 1 + event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) + return + else: + self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) + i += 1 + continue + i = 0 - if not event_list: - single_threaded = self.database_engine.single_threaded - if ( - not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING - or single_threaded - or i > EVENT_QUEUE_ITERATIONS - ): - break - else: - self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) - i += 1 - continue - i = 0 - - self._fetch_event_list(conn, event_list) - finally: - self._event_fetch_ongoing -= 1 - event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) + self._fetch_event_list(conn, event_list) def _fetch_event_list( self, conn: Connection, event_list: List[Tuple[List[str], defer.Deferred]] @@ -994,9 +1015,7 @@ async def _enqueue_events(self, events: Iterable[str]) -> Dict[str, _EventRow]: should_start = False if should_start: - run_as_background_process( - "fetch_events", self.db_pool.runWithConnection, self._do_fetch - ) + run_as_background_process("fetch_events", self._do_fetch) logger.debug("Loading %d events: %s", len(events), events) with PreserveLoggingContext(): diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index a649e8c61872..ac1cbcd61917 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -12,11 +12,24 @@ # See the License for the specific language governing permissions and # limitations under the License. import json +from contextlib import contextmanager +from typing import Generator +from twisted.enterprise.adbapi import ConnectionPool +from twisted.internet.defer import ensureDeferred +from twisted.test.proto_helpers import MemoryReactor + +from synapse.api.room_versions import EventFormatVersions, RoomVersions from synapse.logging.context import LoggingContext from synapse.rest import admin from synapse.rest.client import login, room -from synapse.storage.databases.main.events_worker import EventsWorkerStore +from synapse.server import HomeServer +from synapse.storage.databases.main.events_worker import ( + EVENT_QUEUE_THREADS, + EventsWorkerStore, +) +from synapse.storage.types import Connection +from synapse.util import Clock from synapse.util.async_helpers import yieldable_gather_results from tests import unittest @@ -144,3 +157,108 @@ def test_dedupe(self): # We should have fetched the event from the DB self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1) + + +class DatabaseOutageTestCase(unittest.HomeserverTestCase): + """Test event fetching during a database outage.""" + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer): + self.store: EventsWorkerStore = hs.get_datastore() + + self.room_id = f"!room:{hs.hostname}" + self.event_ids = [f"event{i}" for i in range(20)] + + self._populate_events() + + def _populate_events(self) -> None: + """Ensure that there are test events in the database.""" + self.get_success( + self.store.db_pool.simple_upsert( + "rooms", + {"room_id": self.room_id}, + {"room_version": RoomVersions.V4.identifier}, + ) + ) + + self.event_ids = [f"event{i}" for i in range(20)] + for idx, event_id in enumerate(self.event_ids): + self.get_success( + self.store.db_pool.simple_upsert( + "events", + {"event_id": event_id}, + { + "event_id": event_id, + "room_id": self.room_id, + "topological_ordering": idx, + "stream_ordering": idx, + "type": "test", + "processed": True, + "outlier": False, + }, + ) + ) + self.get_success( + self.store.db_pool.simple_upsert( + "event_json", + {"event_id": event_id}, + { + "room_id": self.room_id, + "json": json.dumps({"type": "test", "room_id": self.room_id}), + "internal_metadata": "{}", + "format_version": EventFormatVersions.V3, + }, + ) + ) + + @contextmanager + def _outage(self) -> Generator[None, None, None]: + """Simulate a database outage.""" + connection_pool = self.store.db_pool._db_pool + connection_pool.close() + connection_pool.start() + original_connection_factory = connection_pool.connectionFactory + + def connection_factory(_pool: ConnectionPool) -> Connection: + raise Exception("Could not connect to the database.") + + connection_pool.connectionFactory = connection_factory # type: ignore[assignment] + try: + yield + finally: + connection_pool.connectionFactory = original_connection_factory + + # If the in-memory SQLite database is being used, all the events are gone. + # Restore the test data. + self._populate_events() + + def test_failure(self) -> None: + """Test that event fetches do not get stuck during a database outage.""" + with self._outage(): + failure = self.get_failure( + self.store.get_event(self.event_ids[0]), Exception + ) + self.assertEqual(str(failure.value), "Could not connect to the database.") + + def test_recovery(self) -> None: + """Test that event fetchers recover after a database outage.""" + with self._outage(): + # Kick off a bunch of event fetches but do not pump the reactor + event_deferreds = [] + for event_id in self.event_ids: + event_deferreds.append(ensureDeferred(self.store.get_event(event_id))) + + # We should have maxed out on event fetcher threads + self.assertEqual(self.store._event_fetch_ongoing, EVENT_QUEUE_THREADS) + + # All the event fetchers will fail + self.pump() + self.assertEqual(self.store._event_fetch_ongoing, 0) + + for event_deferred in event_deferreds: + failure = self.get_failure(event_deferred, Exception) + self.assertEqual( + str(failure.value), "Could not connect to the database." + ) + + # This next event fetch should succeed + self.get_success(self.store.get_event(self.event_ids[0])) From e88eb38d1ffda9229548ee84a8f117e3b72cc959 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 17 Nov 2021 19:59:21 +0000 Subject: [PATCH 02/15] Attempt to fix double decrement when runWithConnection fails after _do_fetch_txn returns --- .../storage/databases/main/events_worker.py | 35 +++++++++++++------ 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 31587e857116..ba2826d1142b 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -741,23 +741,40 @@ async def _do_fetch(self) -> None: try: await self.db_pool.runWithConnection(self._do_fetch_txn) except BaseException as e: + failed_event_list = [] with self._event_fetch_lock: - self._event_fetch_ongoing -= 1 - event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) - - if self._event_fetch_ongoing == 0 and self._event_fetch_list: + if self._event_fetch_ongoing == 1 and self._event_fetch_list: # We are the last remaining fetcher and we have just failed. - # Fail any outstanding event fetches, since no one else will process - # them. + # Fail any outstanding fetches, since they won't get processed + # otherwise. failed_event_list = self._event_fetch_list self._event_fetch_list = [] - else: - failed_event_list = [] + + self._event_fetch_ongoing -= 1 + event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) for _, deferred in failed_event_list: if not deferred.called: with PreserveLoggingContext(): deferred.errback(e) + else: + should_restart = False + with self._event_fetch_lock: + if self._event_fetch_ongoing == 1 and self._event_fetch_list: + # An event fetch has been queued, but we're the last remaining + # fetcher and have already decided to terminate. + # Start a new fetcher. + should_restart = True + + # `_event_fetch_ongoing` ought to be decremented for ourself and + # incremented for the new fetcher. These cancel out, so we leave it + # alone. + else: + self._event_fetch_ongoing -= 1 + event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) + + if should_restart: + run_as_background_process("fetch_events", self._do_fetch) def _do_fetch_txn(self, conn: Connection) -> None: """Takes a database connection and waits for requests for events from @@ -776,8 +793,6 @@ def _do_fetch_txn(self, conn: Connection) -> None: or single_threaded or i > EVENT_QUEUE_ITERATIONS ): - self._event_fetch_ongoing -= 1 - event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) return else: self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) From baf16f47d455492036b5f20b5ee76267028eafd8 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 17 Nov 2021 20:34:48 +0000 Subject: [PATCH 03/15] Reraise the exception --- synapse/storage/databases/main/events_worker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index ba2826d1142b..b88bda926c9d 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -757,6 +757,8 @@ async def _do_fetch(self) -> None: if not deferred.called: with PreserveLoggingContext(): deferred.errback(e) + + raise else: should_restart = False with self._event_fetch_lock: From 218a934072f3048caaf1cc2b2117142404e3f4bf Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Thu, 18 Nov 2021 00:22:08 +0000 Subject: [PATCH 04/15] Don't catch GeneratorExit --- synapse/storage/databases/main/events_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index b88bda926c9d..9ebe155acb16 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -740,7 +740,7 @@ async def _do_fetch(self) -> None: """Services requests for events from the `_event_fetch_list` queue.""" try: await self.db_pool.runWithConnection(self._do_fetch_txn) - except BaseException as e: + except Exception as e: failed_event_list = [] with self._event_fetch_lock: if self._event_fetch_ongoing == 1 and self._event_fetch_list: From 5cb123788c6ed70c0d7abedb4c92698356ef2a64 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Thu, 18 Nov 2021 12:16:07 +0000 Subject: [PATCH 05/15] Refactor --- .../storage/databases/main/events_worker.py | 92 +++++++++---------- 1 file changed, 45 insertions(+), 47 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 9ebe155acb16..39789e4c8160 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -736,49 +736,56 @@ async def get_stripped_room_state_from_event_context( for e in state_to_include.values() ] - async def _do_fetch(self) -> None: - """Services requests for events from the `_event_fetch_list` queue.""" - try: - await self.db_pool.runWithConnection(self._do_fetch_txn) - except Exception as e: - failed_event_list = [] - with self._event_fetch_lock: - if self._event_fetch_ongoing == 1 and self._event_fetch_list: - # We are the last remaining fetcher and we have just failed. - # Fail any outstanding fetches, since they won't get processed - # otherwise. - failed_event_list = self._event_fetch_list - self._event_fetch_list = [] - - self._event_fetch_ongoing -= 1 + def _start_fetch_thread(self) -> None: + """Starts an event fetcher.""" + with self._event_fetch_lock: + if ( + self._event_fetch_list + and self._event_fetch_ongoing < EVENT_QUEUE_THREADS + ): + self._event_fetch_ongoing += 1 event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) + should_start = True + else: + should_start = False - for _, deferred in failed_event_list: - if not deferred.called: - with PreserveLoggingContext(): - deferred.errback(e) - - raise - else: - should_restart = False - with self._event_fetch_lock: - if self._event_fetch_ongoing == 1 and self._event_fetch_list: - # An event fetch has been queued, but we're the last remaining - # fetcher and have already decided to terminate. - # Start a new fetcher. - should_restart = True - - # `_event_fetch_ongoing` ought to be decremented for ourself and - # incremented for the new fetcher. These cancel out, so we leave it - # alone. - else: + async def _do_fetch() -> None: + """Services requests for events from the `_event_fetch_list` queue.""" + exc = None + try: + await self.db_pool.runWithConnection(self._do_fetch) + except Exception as e: + exc = e + raise + finally: + should_restart = False + failed_event_list = [] + with self._event_fetch_lock: self._event_fetch_ongoing -= 1 event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) - if should_restart: - run_as_background_process("fetch_events", self._do_fetch) + if self._event_fetch_ongoing == 0 and self._event_fetch_list: + # We are the last remaining fetcher and we are about to + # go away. Deal with any outstanding fetches. + if exc is None: + should_restart = True + else: + failed_event_list = self._event_fetch_list + self._event_fetch_list = [] + + if should_restart: + self._start_fetch_thread() + + if exc is not None: + for _, deferred in failed_event_list: + if not deferred.called: + with PreserveLoggingContext(): + deferred.errback(exc) + + if should_start: + run_as_background_process("fetch_events", _do_fetch) - def _do_fetch_txn(self, conn: Connection) -> None: + def _do_fetch(self, conn: Connection) -> None: """Takes a database connection and waits for requests for events from the _event_fetch_list queue. """ @@ -1021,18 +1028,9 @@ async def _enqueue_events(self, events: Iterable[str]) -> Dict[str, _EventRow]: events_d = defer.Deferred() with self._event_fetch_lock: self._event_fetch_list.append((events, events_d)) - self._event_fetch_lock.notify() - if self._event_fetch_ongoing < EVENT_QUEUE_THREADS: - self._event_fetch_ongoing += 1 - event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) - should_start = True - else: - should_start = False - - if should_start: - run_as_background_process("fetch_events", self._do_fetch) + self._start_fetch_thread() logger.debug("Loading %d events: %s", len(events), events) with PreserveLoggingContext(): From 3371c21b8255f5c067df5c7a7e39f966198b2181 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 19 Nov 2021 17:12:00 +0000 Subject: [PATCH 06/15] Rename _start_fetch_thread to _maybe_start_fetch_thread --- synapse/storage/databases/main/events_worker.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 39789e4c8160..4111870e37c1 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -736,8 +736,8 @@ async def get_stripped_room_state_from_event_context( for e in state_to_include.values() ] - def _start_fetch_thread(self) -> None: - """Starts an event fetcher.""" + def _maybe_start_fetch_thread(self) -> None: + """Starts an event fetch thread if we are not yet at the maximum number.""" with self._event_fetch_lock: if ( self._event_fetch_list @@ -774,7 +774,7 @@ async def _do_fetch() -> None: self._event_fetch_list = [] if should_restart: - self._start_fetch_thread() + self._maybe_start_fetch_thread() if exc is not None: for _, deferred in failed_event_list: @@ -1030,7 +1030,7 @@ async def _enqueue_events(self, events: Iterable[str]) -> Dict[str, _EventRow]: self._event_fetch_list.append((events, events_d)) self._event_fetch_lock.notify() - self._start_fetch_thread() + self._maybe_start_fetch_thread() logger.debug("Loading %d events: %s", len(events), events) with PreserveLoggingContext(): From 6993247beb50fea3536dd0549102881bed04894c Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 19 Nov 2021 17:17:10 +0000 Subject: [PATCH 07/15] Rename _do_fetch and _do_fetch --- synapse/storage/databases/main/events_worker.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 4111870e37c1..d92633ca4719 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -72,7 +72,7 @@ logger = logging.getLogger(__name__) -# These values are used in the `enqueus_event` and `_do_fetch` methods to +# These values are used in the `enqueue_event` and `_fetch_loop` methods to # control how we batch/bulk fetch events from the database. # The values are plucked out of thing air to make initial sync run faster # on jki.re @@ -749,11 +749,11 @@ def _maybe_start_fetch_thread(self) -> None: else: should_start = False - async def _do_fetch() -> None: + async def _fetch_thread() -> None: """Services requests for events from the `_event_fetch_list` queue.""" exc = None try: - await self.db_pool.runWithConnection(self._do_fetch) + await self.db_pool.runWithConnection(self._fetch_loop) except Exception as e: exc = e raise @@ -783,9 +783,9 @@ async def _do_fetch() -> None: deferred.errback(exc) if should_start: - run_as_background_process("fetch_events", _do_fetch) + run_as_background_process("fetch_events", _fetch_thread) - def _do_fetch(self, conn: Connection) -> None: + def _fetch_loop(self, conn: Connection) -> None: """Takes a database connection and waits for requests for events from the _event_fetch_list queue. """ From efb5574fe4faaf05b29cf1116ffc8fc89d22d663 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 19 Nov 2021 18:54:21 +0000 Subject: [PATCH 08/15] Address PR feedback for events_worker.py --- .../storage/databases/main/events_worker.py | 69 +++++++++++++------ 1 file changed, 49 insertions(+), 20 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index d92633ca4719..df03754d0de9 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -750,37 +750,65 @@ def _maybe_start_fetch_thread(self) -> None: should_start = False async def _fetch_thread() -> None: - """Services requests for events from the `_event_fetch_list` queue.""" + """Services requests for events from `_event_fetch_list`.""" exc = None try: await self.db_pool.runWithConnection(self._fetch_loop) - except Exception as e: + except BaseException as e: exc = e raise finally: should_restart = False - failed_event_list = [] + event_fetches_to_fail = [] with self._event_fetch_lock: self._event_fetch_ongoing -= 1 event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) - if self._event_fetch_ongoing == 0 and self._event_fetch_list: - # We are the last remaining fetcher and we are about to - # go away. Deal with any outstanding fetches. + # There may still be work remaining in `_event_fetch_list` if we + # failed, or it was added in between us deciding to exit and + # decrementing `_event_fetch_ongoing`. + if self._event_fetch_list: if exc is None: + # We decided to exit, but then some more work was added + # before `_event_fetch_ongoing` was decremented. + # If a new event fetch thread was not started, we should + # restart ourselves since the remaining event fetch threads + # may take a while to get around to the new work. + # Unfortunately it is not possible to tell whether a new + # event fetch thread was started, so we restart + # unconditionally. If we are unlucky, we will end up with + # extra idle threads holding database connections for up to + # `EVENT_QUEUE_ITERATIONS * EVENT_QUEUE_TIMEOUT_S` seconds. should_restart = True + elif isinstance(exc, Exception): + if self._event_fetch_ongoing == 0: + # We were the last remaining fetcher and failed. + # Fail any outstanding fetches since no one else will + # handle them. + event_fetches_to_fail = self._event_fetch_list + self._event_fetch_list = [] + else: + # We weren't the last remaining fetcher, so another + # fetcher will pick up the work. This will either happen + # after their existing work, however long that takes, + # or after at most `EVENT_QUEUE_TIMEOUT_S` seconds if + # they are idle. + pass else: - failed_event_list = self._event_fetch_list - self._event_fetch_list = [] + # The exception is a `SystemExit`, `KeyboardInterrupt` or + # `GeneratorExit`. Don't try to do anything clever here. + pass if should_restart: + # We exited cleanly but noticed more work. self._maybe_start_fetch_thread() - if exc is not None: - for _, deferred in failed_event_list: - if not deferred.called: - with PreserveLoggingContext(): - deferred.errback(exc) + if exc is not None and event_fetches_to_fail: + # We were the last remaining fetcher and failed. + # Fail any outstanding fetches since no one else will handle them. + with PreserveLoggingContext(): + for _, deferred in event_fetches_to_fail: + deferred.errback(exc) if should_start: run_as_background_process("fetch_events", _fetch_thread) @@ -796,6 +824,9 @@ def _fetch_loop(self, conn: Connection) -> None: self._event_fetch_list = [] if not event_list: + # There are no requests waiting. If we haven't yet reached the + # maximum iteration limit, wait for some more requests to turn up. + # Otherwise, bail out. single_threaded = self.database_engine.single_threaded if ( not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING @@ -803,10 +834,10 @@ def _fetch_loop(self, conn: Connection) -> None: or i > EVENT_QUEUE_ITERATIONS ): return - else: - self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) - i += 1 - continue + + self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) + i += 1 + continue i = 0 self._fetch_event_list(conn, event_list) @@ -851,9 +882,7 @@ def fire(): # We only want to resolve deferreds from the main thread def fire(evs, exc): for _, d in evs: - if not d.called: - with PreserveLoggingContext(): - d.errback(exc) + d.errback(exc) with PreserveLoggingContext(): self.hs.get_reactor().callFromThread(fire, event_list, e) From 59763714ddfb944f5c71848a29e30f6ce86fd7f7 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 19 Nov 2021 20:20:58 +0000 Subject: [PATCH 09/15] Improve comments in tests --- tests/storage/databases/main/test_events_worker.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index ac1cbcd61917..7ed639041a4e 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -212,10 +212,20 @@ def _populate_events(self) -> None: @contextmanager def _outage(self) -> Generator[None, None, None]: - """Simulate a database outage.""" + """Simulate a database outage. + + Returns: + A context manager. While the context is active, any attempts to connect to + the database will fail. + """ connection_pool = self.store.db_pool._db_pool + + # Close all connections and shut down the database `ThreadPool`. connection_pool.close() + + # Restart the database `ThreadPool`. connection_pool.start() + original_connection_factory = connection_pool.connectionFactory def connection_factory(_pool: ConnectionPool) -> Connection: From 65fd8791f778416038fd86cb3670f9e4ec8d03ba Mon Sep 17 00:00:00 2001 From: Sean Quah <8349537+squahtx@users.noreply.github.com> Date: Thu, 25 Nov 2021 20:56:21 +0000 Subject: [PATCH 10/15] Update synapse/storage/databases/main/events_worker.py Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/storage/databases/main/events_worker.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index df03754d0de9..0658bf877d9d 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -777,8 +777,9 @@ async def _fetch_thread() -> None: # Unfortunately it is not possible to tell whether a new # event fetch thread was started, so we restart # unconditionally. If we are unlucky, we will end up with - # extra idle threads holding database connections for up to - # `EVENT_QUEUE_ITERATIONS * EVENT_QUEUE_TIMEOUT_S` seconds. + # an idle fetch thread, but it will time out after + # `EVENT_QUEUE_ITERATIONS * EVENT_QUEUE_TIMEOUT_S` seconds + # in any case. should_restart = True elif isinstance(exc, Exception): if self._event_fetch_ongoing == 0: From d1be5ce739c2cf6bb810743b6188ad89d3c5e073 Mon Sep 17 00:00:00 2001 From: Sean Quah <8349537+squahtx@users.noreply.github.com> Date: Thu, 25 Nov 2021 20:56:43 +0000 Subject: [PATCH 11/15] Update synapse/storage/databases/main/events_worker.py Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/storage/databases/main/events_worker.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 0658bf877d9d..d1d168cfea6b 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -804,9 +804,10 @@ async def _fetch_thread() -> None: # We exited cleanly but noticed more work. self._maybe_start_fetch_thread() - if exc is not None and event_fetches_to_fail: + if event_fetches_to_fail: # We were the last remaining fetcher and failed. # Fail any outstanding fetches since no one else will handle them. + assert exc is not None with PreserveLoggingContext(): for _, deferred in event_fetches_to_fail: deferred.errback(exc) From 8a0b066490b1fd641e3191cc33e086710ae6f436 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Thu, 25 Nov 2021 21:03:25 +0000 Subject: [PATCH 12/15] Update comment about idle fetch thread --- synapse/storage/databases/main/events_worker.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index d1d168cfea6b..8609e4f4c3fd 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -774,12 +774,16 @@ async def _fetch_thread() -> None: # If a new event fetch thread was not started, we should # restart ourselves since the remaining event fetch threads # may take a while to get around to the new work. + # # Unfortunately it is not possible to tell whether a new # event fetch thread was started, so we restart # unconditionally. If we are unlucky, we will end up with - # an idle fetch thread, but it will time out after + # an idle fetch thread, but it will time out after # `EVENT_QUEUE_ITERATIONS * EVENT_QUEUE_TIMEOUT_S` seconds # in any case. + # + # Note that multiple fetch threads may run down this path at + # the same time. should_restart = True elif isinstance(exc, Exception): if self._event_fetch_ongoing == 0: From 9bb33eb3b2fa9f197b03911460c7e5eb30e85049 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Thu, 25 Nov 2021 21:07:43 +0000 Subject: [PATCH 13/15] Un-inner function `_fetch_thread` --- .../storage/databases/main/events_worker.py | 135 +++++++++--------- 1 file changed, 68 insertions(+), 67 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 8609e4f4c3fd..c6bcfe1c32e2 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -745,79 +745,80 @@ def _maybe_start_fetch_thread(self) -> None: ): self._event_fetch_ongoing += 1 event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) + # `_event_fetch_ongoing` is decremented in `_fetch_thread`. should_start = True else: should_start = False - async def _fetch_thread() -> None: - """Services requests for events from `_event_fetch_list`.""" - exc = None - try: - await self.db_pool.runWithConnection(self._fetch_loop) - except BaseException as e: - exc = e - raise - finally: - should_restart = False - event_fetches_to_fail = [] - with self._event_fetch_lock: - self._event_fetch_ongoing -= 1 - event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) - - # There may still be work remaining in `_event_fetch_list` if we - # failed, or it was added in between us deciding to exit and - # decrementing `_event_fetch_ongoing`. - if self._event_fetch_list: - if exc is None: - # We decided to exit, but then some more work was added - # before `_event_fetch_ongoing` was decremented. - # If a new event fetch thread was not started, we should - # restart ourselves since the remaining event fetch threads - # may take a while to get around to the new work. - # - # Unfortunately it is not possible to tell whether a new - # event fetch thread was started, so we restart - # unconditionally. If we are unlucky, we will end up with - # an idle fetch thread, but it will time out after - # `EVENT_QUEUE_ITERATIONS * EVENT_QUEUE_TIMEOUT_S` seconds - # in any case. - # - # Note that multiple fetch threads may run down this path at - # the same time. - should_restart = True - elif isinstance(exc, Exception): - if self._event_fetch_ongoing == 0: - # We were the last remaining fetcher and failed. - # Fail any outstanding fetches since no one else will - # handle them. - event_fetches_to_fail = self._event_fetch_list - self._event_fetch_list = [] - else: - # We weren't the last remaining fetcher, so another - # fetcher will pick up the work. This will either happen - # after their existing work, however long that takes, - # or after at most `EVENT_QUEUE_TIMEOUT_S` seconds if - # they are idle. - pass + if should_start: + run_as_background_process("fetch_events", self._fetch_thread) + + async def _fetch_thread(self) -> None: + """Services requests for events from `_event_fetch_list`.""" + exc = None + try: + await self.db_pool.runWithConnection(self._fetch_loop) + except BaseException as e: + exc = e + raise + finally: + should_restart = False + event_fetches_to_fail = [] + with self._event_fetch_lock: + self._event_fetch_ongoing -= 1 + event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) + + # There may still be work remaining in `_event_fetch_list` if we + # failed, or it was added in between us deciding to exit and + # decrementing `_event_fetch_ongoing`. + if self._event_fetch_list: + if exc is None: + # We decided to exit, but then some more work was added + # before `_event_fetch_ongoing` was decremented. + # If a new event fetch thread was not started, we should + # restart ourselves since the remaining event fetch threads + # may take a while to get around to the new work. + # + # Unfortunately it is not possible to tell whether a new + # event fetch thread was started, so we restart + # unconditionally. If we are unlucky, we will end up with + # an idle fetch thread, but it will time out after + # `EVENT_QUEUE_ITERATIONS * EVENT_QUEUE_TIMEOUT_S` seconds + # in any case. + # + # Note that multiple fetch threads may run down this path at + # the same time. + should_restart = True + elif isinstance(exc, Exception): + if self._event_fetch_ongoing == 0: + # We were the last remaining fetcher and failed. + # Fail any outstanding fetches since no one else will + # handle them. + event_fetches_to_fail = self._event_fetch_list + self._event_fetch_list = [] else: - # The exception is a `SystemExit`, `KeyboardInterrupt` or - # `GeneratorExit`. Don't try to do anything clever here. + # We weren't the last remaining fetcher, so another + # fetcher will pick up the work. This will either happen + # after their existing work, however long that takes, + # or after at most `EVENT_QUEUE_TIMEOUT_S` seconds if + # they are idle. pass - - if should_restart: - # We exited cleanly but noticed more work. - self._maybe_start_fetch_thread() - - if event_fetches_to_fail: - # We were the last remaining fetcher and failed. - # Fail any outstanding fetches since no one else will handle them. - assert exc is not None - with PreserveLoggingContext(): - for _, deferred in event_fetches_to_fail: - deferred.errback(exc) - - if should_start: - run_as_background_process("fetch_events", _fetch_thread) + else: + # The exception is a `SystemExit`, `KeyboardInterrupt` or + # `GeneratorExit`. Don't try to do anything clever here. + pass + + if should_restart: + # We exited cleanly but noticed more work. + self._maybe_start_fetch_thread() + + if event_fetches_to_fail: + # We were the last remaining fetcher and failed. + # Fail any outstanding fetches since no one else will handle them. + assert exc is not None + with PreserveLoggingContext(): + for _, deferred in event_fetches_to_fail: + deferred.errback(exc) def _fetch_loop(self, conn: Connection) -> None: """Takes a database connection and waits for requests for events from From a404bfdccbdb38bc0dd7a4ee9ce5e2747518d239 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Thu, 25 Nov 2021 21:12:35 +0000 Subject: [PATCH 14/15] Update docstring for `_populate_events` --- tests/storage/databases/main/test_events_worker.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index 7ed639041a4e..44a73c4a4a42 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -171,7 +171,16 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer): self._populate_events() def _populate_events(self) -> None: - """Ensure that there are test events in the database.""" + """Ensure that there are test events in the database. + + When testing with the in-memory SQLite database is being used, all the events + are lost during the simulated outage. + + To ensure consistency between `room_id`s and `event_id`s before and after the + outage, rows are built and inserted manually. + + Upserts are used to handle the non-SQLite case where events are not lost. + """ self.get_success( self.store.db_pool.simple_upsert( "rooms", From 37a2e94868ec22155edaf5d45323bf64cf13e975 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 26 Nov 2021 13:20:17 +0000 Subject: [PATCH 15/15] Correct typo in comment --- tests/storage/databases/main/test_events_worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index 44a73c4a4a42..5ae491ff5a52 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -173,8 +173,8 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer): def _populate_events(self) -> None: """Ensure that there are test events in the database. - When testing with the in-memory SQLite database is being used, all the events - are lost during the simulated outage. + When testing with the in-memory SQLite database, all the events are lost during + the simulated outage. To ensure consistency between `room_id`s and `event_id`s before and after the outage, rows are built and inserted manually.