Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(framework) Introduce double queue mechanism for Simulation Engine #3468

Merged
merged 52 commits into from
Jul 10, 2024
Merged
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
57032f2
wip
jafermarq Apr 22, 2024
b885f22
w/ previous
jafermarq Apr 22, 2024
33ada3e
default `create_run`
jafermarq Apr 22, 2024
54539d9
w/ previous
jafermarq Apr 22, 2024
ea946b7
Merge branch 'main' into in-memory-driver
jafermarq Apr 23, 2024
83ff757
Merge branch 'main' into in-memory-driver
jafermarq Apr 24, 2024
13b0e7b
init
jafermarq Apr 25, 2024
242883e
clientdriverproxytests good
jafermarq Apr 25, 2024
6138968
unfusing
jafermarq Apr 25, 2024
0f3ab98
Merge branch 'main' into in-memory-driver
jafermarq Apr 25, 2024
a8ac6ff
Merge branch 'fuse-grpc-driver-and-helper' into in-memory-driver
jafermarq Apr 25, 2024
49ea5df
flower-simulation uses `InMemoryDriver`
jafermarq Apr 25, 2024
8e808f8
Merge branch 'main' into fuse-grpc-driver-and-helper
jafermarq Apr 25, 2024
c2b2f1e
Merge branch 'fuse-grpc-driver-and-helper' into in-memory-driver
jafermarq Apr 25, 2024
46d0268
Merge branch 'main' into in-memory-driver
jafermarq Apr 29, 2024
91443ce
revert and merge w/ main
jafermarq Apr 29, 2024
91a7b31
use InMemoryDriver
jafermarq Apr 29, 2024
e63fd1c
Merge branch 'main' into in-memory-driver
jafermarq Apr 29, 2024
6455809
Merge branch 'main' into in-memory-driver
jafermarq Apr 30, 2024
b2dcc86
updated /w fab
jafermarq Apr 30, 2024
966b55a
wip
jafermarq Apr 30, 2024
2302dc3
completed unittest
jafermarq Apr 30, 2024
944d8f6
Merge branch 'main' into in-memory-driver
jafermarq Apr 30, 2024
a150844
Merge branch 'in-memory-driver' into use-inmemory-driver-in-simulations
jafermarq May 1, 2024
68e77c6
Merge branch 'main' into in-memory-driver
jafermarq May 1, 2024
7e211c6
Merge branch 'main' into in-memory-driver
panh99 May 1, 2024
ece1774
ensure tasks are deleted after pull
jafermarq May 1, 2024
7eb41c5
w/ previous
jafermarq May 1, 2024
9e6fc95
fix
jafermarq May 1, 2024
da9ed0b
Merge branch 'in-memory-driver' into use-inmemory-driver-in-simulations
jafermarq May 1, 2024
c49df89
Merge branch 'main' into in-memory-driver
jafermarq May 6, 2024
e94874e
Merge branch 'in-memory-driver' into use-inmemory-driver-in-simulations
jafermarq May 6, 2024
f8963d3
state-agnostic driver definition
jafermarq May 6, 2024
f4b244c
minor tweak
jafermarq May 6, 2024
5768b08
Merge branch 'main' into in-memory-driver
jafermarq May 6, 2024
96b4c6a
Merge branch 'in-memory-driver' into use-inmemory-driver-in-simulations
jafermarq May 6, 2024
b3e4407
Merge branch 'main' into in-memory-driver
jafermarq May 8, 2024
f1a59c7
Merge branch 'in-memory-driver' into use-inmemory-driver-in-simulations
jafermarq May 8, 2024
76eab14
Merge branch 'main' into in-memory-driver
jafermarq May 8, 2024
9b1599d
Merge branch 'in-memory-driver' into use-inmemory-driver-in-simulations
jafermarq May 8, 2024
a2c2239
Merge branch 'main' into in-memory-driver
jafermarq May 16, 2024
202e2f4
Merge branch 'main' into in-memory-driver
jafermarq May 18, 2024
69349df
Merge branch 'in-memory-driver' into use-inmemory-driver-in-simulations
jafermarq May 18, 2024
b08ba2c
Merge branch 'main' into use-inmemory-driver-in-simulations
danieljanes May 19, 2024
12be6fe
Merge branch 'main' into use-inmemory-driver-in-simulations
jafermarq May 19, 2024
5f803e6
init
jafermarq May 19, 2024
1427958
Merge branch 'main' into simplify-sim-double-queue
jafermarq Jun 10, 2024
5548ce2
Merge branch 'main' into simplify-sim-double-queue
jafermarq Jun 11, 2024
b0a47bd
Merge branch 'main' into simplify-sim-double-queue
jafermarq Jun 14, 2024
aafb61b
Merge branch 'main' into simplify-sim-double-queue
jafermarq Jul 2, 2024
39f7ba1
fix
jafermarq Jul 2, 2024
555ef18
Merge branch 'main' into simplify-sim-double-queue
jafermarq Jul 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 43 additions & 13 deletions src/py/flwr/server/superlink/fleet/vce/vce_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from flwr.common.message import Error
from flwr.common.object_ref import load_app
from flwr.common.serde import message_from_taskins, message_to_taskres
from flwr.proto.task_pb2 import TaskIns # pylint: disable=E0611
from flwr.proto.task_pb2 import TaskIns, TaskRes # pylint: disable=E0611
from flwr.server.superlink.state import StateFactory

from .backend import Backend, error_messages_backends, supported_backends
Expand All @@ -54,17 +54,16 @@ def _register_nodes(
# pylint: disable=too-many-arguments,too-many-locals
async def worker(
app_fn: Callable[[], ClientApp],
queue: "asyncio.Queue[TaskIns]",
taskins_queue: "asyncio.Queue[TaskIns]",
taskres_queue: "asyncio.Queue[TaskRes]",
node_states: Dict[int, NodeState],
state_factory: StateFactory,
backend: Backend,
) -> None:
"""Get TaskIns from queue and pass it to an actor in the pool to execute it."""
state = state_factory.state()
while True:
out_mssg = None
try:
task_ins: TaskIns = await queue.get()
task_ins: TaskIns = await taskins_queue.get()
node_id = task_ins.task.consumer.node_id

# Register and retrieve runstate
Expand Down Expand Up @@ -111,7 +110,7 @@ async def worker(
task_res = message_to_taskres(out_mssg)
# Store TaskRes in state
task_res.task.pushed_at = time.time()
state.store_task_res(task_res)
await taskres_queue.put(task_res)


async def add_taskins_to_queue(
Expand Down Expand Up @@ -162,6 +161,21 @@ async def add_taskins_to_queue(
log(DEBUG, "Async producer: Stopped pulling from StateFactory.")


async def put_taskres_into_state(
queue: "asyncio.Queue[TaskRes]",
state_factory: StateFactory,
f_stop: asyncio.Event,
) -> None:
"""Remove TaskRes from queue and add into State."""
state = state_factory.state()
while not f_stop.is_set():
if queue.qsize():
task_res = await queue.get()
state.store_task_res(task_res)
else:
await asyncio.sleep(0.1)


async def run(
app_fn: Callable[[], ClientApp],
backend_fn: Callable[[], Backend],
Expand All @@ -171,7 +185,8 @@ async def run(
f_stop: asyncio.Event,
) -> None:
"""Run the VCE async."""
queue: "asyncio.Queue[TaskIns]" = asyncio.Queue(128)
taskins_queue: "asyncio.Queue[TaskIns]" = asyncio.Queue(128)
taskres_queue: "asyncio.Queue[TaskRes]" = asyncio.Queue(128)

try:

Expand All @@ -184,22 +199,37 @@ async def run(
# Add workers (they submit Messages to Backend)
worker_tasks = [
asyncio.create_task(
worker(app_fn, queue, node_states, state_factory, backend)
worker(
app_fn,
taskins_queue,
taskres_queue,
node_states,
backend,
)
)
for _ in range(backend.num_workers)
]
# Create producer (adds TaskIns into Queue)
producer = asyncio.create_task(
taskins_producer = asyncio.create_task(
add_taskins_to_queue(
queue, state_factory, nodes_mapping, backend, worker_tasks, f_stop
taskins_queue,
state_factory,
nodes_mapping,
backend,
worker_tasks,
f_stop,
)
)

# Wait for producer to finish
# The producer runs forever until f_stop is set or until
taskres_consumer = asyncio.create_task(
put_taskres_into_state(taskres_queue, state_factory, f_stop)
)

# Wait for asyncio taks pulling/pushing TaskIns/TaskRes.
# These run forever until f_stop is set or until
# all worker (consumer) coroutines are completed. Workers
# also run forever and only end if an exception is raised.
await asyncio.gather(producer)
await asyncio.gather(*(taskins_producer, taskres_consumer))

except Exception as ex:

Expand Down