From c49302713b075797c788c3ca5c9d5dee3b682c0f Mon Sep 17 00:00:00 2001 From: Olivier Le Thanh Duong Date: Mon, 15 Apr 2024 13:45:02 +0200 Subject: [PATCH 1/4] Problem: could not start Instances from command line Problem happened when launching with --run-fake-instance Solution: Adapt to new VMPool API that take a loop Also fix benchmarks function --- src/aleph/vm/orchestrator/cli.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/aleph/vm/orchestrator/cli.py b/src/aleph/vm/orchestrator/cli.py index 22bd44147..3b8f9b580 100644 --- a/src/aleph/vm/orchestrator/cli.py +++ b/src/aleph/vm/orchestrator/cli.py @@ -20,6 +20,7 @@ from aleph.vm.pool import VmPool from aleph.vm.version import get_version_from_apt, get_version_from_git +from ..models import VmExecution from . import metrics, supervisor from .pubsub import PubSub from .run import run_code_on_event, run_code_on_request, start_persistent_vm @@ -187,7 +188,8 @@ async def fake_read() -> bytes: bench: list[float] = [] - pool = VmPool() + loop = asyncio.new_event_loop() + pool = VmPool(loop) pool.setup() # Does not make sense in benchmarks @@ -236,25 +238,24 @@ async def fake_read() -> bytes: print("Event result", result) -async def start_instance(item_hash: ItemHash) -> None: +async def start_instance(pool, pubsub: Optional[PubSub], item_hash: ItemHash) -> VmExecution: """Run an instance from an InstanceMessage.""" - pool = VmPool() + return await start_persistent_vm(item_hash, pubsub, pool) + +async def run_instances(instances: list[ItemHash]) -> None: + """Run instances from a list of message identifiers.""" + logger.info(f"Instances to run: {instances}") + loop = asyncio.new_event_loop() + pool = VmPool(loop) # The main program uses a singleton pubsub instance in order to watch for updates. # We create another instance here since that singleton is not initialized yet. # Watching for updates on this instance will therefore not work. pubsub: Optional[PubSub] = None - await start_persistent_vm(item_hash, pubsub, pool) - - -async def run_instances(instances: list[ItemHash]) -> None: - """Run instances from a list of message identifiers.""" - logger.info(f"Instances to run: {instances}") + await asyncio.gather(*[start_instance(pool, pubsub, item_hash=instance_id) for instance_id in instances]) - await asyncio.gather(*[start_instance(item_hash=instance_id) for instance_id in instances]) await asyncio.Event().wait() # wait forever - # TODO : should we really wait forever? @contextlib.contextmanager From ab1e4fc77c86262e80a175c1feddf429acfd3caf Mon Sep 17 00:00:00 2001 From: Olivier Le Thanh Duong Date: Mon, 15 Apr 2024 15:00:38 +0200 Subject: [PATCH 2/4] reuse event loop --- src/aleph/vm/orchestrator/cli.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/aleph/vm/orchestrator/cli.py b/src/aleph/vm/orchestrator/cli.py index 3b8f9b580..9f6b76c9f 100644 --- a/src/aleph/vm/orchestrator/cli.py +++ b/src/aleph/vm/orchestrator/cli.py @@ -188,7 +188,7 @@ async def fake_read() -> bytes: bench: list[float] = [] - loop = asyncio.new_event_loop() + loop = asyncio.get_event_loop() pool = VmPool(loop) pool.setup() @@ -246,7 +246,7 @@ async def start_instance(pool, pubsub: Optional[PubSub], item_hash: ItemHash) -> async def run_instances(instances: list[ItemHash]) -> None: """Run instances from a list of message identifiers.""" logger.info(f"Instances to run: {instances}") - loop = asyncio.new_event_loop() + loop = asyncio.get_event_loop() pool = VmPool(loop) # The main program uses a singleton pubsub instance in order to watch for updates. # We create another instance here since that singleton is not initialized yet. From 57eb709807e97fd9843986c2f85b7ab7ddbd48b4 Mon Sep 17 00:00:00 2001 From: Olivier Le Thanh Duong Date: Mon, 15 Apr 2024 15:01:55 +0200 Subject: [PATCH 3/4] switch to absolute import --- src/aleph/vm/orchestrator/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/aleph/vm/orchestrator/cli.py b/src/aleph/vm/orchestrator/cli.py index 9f6b76c9f..9566cf9c0 100644 --- a/src/aleph/vm/orchestrator/cli.py +++ b/src/aleph/vm/orchestrator/cli.py @@ -17,10 +17,10 @@ from sqlalchemy.ext.asyncio import create_async_engine from aleph.vm.conf import ALLOW_DEVELOPER_SSH_KEYS, make_db_url, settings +from aleph.vm.models import VmExecution from aleph.vm.pool import VmPool from aleph.vm.version import get_version_from_apt, get_version_from_git -from ..models import VmExecution from . import metrics, supervisor from .pubsub import PubSub from .run import run_code_on_event, run_code_on_request, start_persistent_vm From 4e46dcca39ccfe8cc906a2c46f748ade7c4c8c65 Mon Sep 17 00:00:00 2001 From: Olivier Le Thanh Duong Date: Wed, 17 Apr 2024 13:51:03 +0200 Subject: [PATCH 4/4] keep same argument order --- src/aleph/vm/orchestrator/cli.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/aleph/vm/orchestrator/cli.py b/src/aleph/vm/orchestrator/cli.py index 9566cf9c0..65b290ba2 100644 --- a/src/aleph/vm/orchestrator/cli.py +++ b/src/aleph/vm/orchestrator/cli.py @@ -238,7 +238,7 @@ async def fake_read() -> bytes: print("Event result", result) -async def start_instance(pool, pubsub: Optional[PubSub], item_hash: ItemHash) -> VmExecution: +async def start_instance(item_hash: ItemHash, pubsub: Optional[PubSub], pool) -> VmExecution: """Run an instance from an InstanceMessage.""" return await start_persistent_vm(item_hash, pubsub, pool) @@ -253,7 +253,7 @@ async def run_instances(instances: list[ItemHash]) -> None: # Watching for updates on this instance will therefore not work. pubsub: Optional[PubSub] = None - await asyncio.gather(*[start_instance(pool, pubsub, item_hash=instance_id) for instance_id in instances]) + await asyncio.gather(*[start_instance(instance_id, pubsub, pool) for instance_id in instances]) await asyncio.Event().wait() # wait forever