Skip to content

Commit

Permalink
feat(framework) Support running app directories directly via `flower-…
Browse files Browse the repository at this point in the history
…simulation` (#3810)

Co-authored-by: Charles Beauville <[email protected]>
Co-authored-by: Daniel J. Beutel <[email protected]>
  • Loading branch information
3 people authored Jul 16, 2024
1 parent 1f3fe0f commit 87de8ae
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 17 deletions.
13 changes: 10 additions & 3 deletions src/py/flwr/server/superlink/fleet/vce/vce_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ def _register_nodes(


def _register_node_states(
nodes_mapping: NodeToPartitionMapping, run: Run
nodes_mapping: NodeToPartitionMapping,
run: Run,
app_dir: Optional[str] = None,
) -> Dict[int, NodeState]:
"""Create NodeState objects and pre-register the context for the run."""
node_states: Dict[int, NodeState] = {}
Expand All @@ -76,7 +78,9 @@ def _register_node_states(
)

# Pre-register Context objects
node_states[node_id].register_context(run_id=run.run_id, run=run)
node_states[node_id].register_context(
run_id=run.run_id, run=run, app_dir=app_dir
)

return node_states

Expand Down Expand Up @@ -256,6 +260,7 @@ def start_vce(
backend_name: str,
backend_config_json_stream: str,
app_dir: str,
is_app: bool,
f_stop: threading.Event,
run: Run,
flwr_dir: Optional[str] = None,
Expand Down Expand Up @@ -309,7 +314,9 @@ def start_vce(
)

# Construct mapping of NodeStates
node_states = _register_node_states(nodes_mapping=nodes_mapping, run=run)
node_states = _register_node_states(
nodes_mapping=nodes_mapping, run=run, app_dir=app_dir if is_app else None
)

# Load backend config
log(DEBUG, "Supported backends: %s", list(supported_backends.keys()))
Expand Down
1 change: 1 addition & 0 deletions src/py/flwr/server/superlink/fleet/vce/vce_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ def start_and_shutdown(
backend_config_json_stream=backend_config,
state_factory=state_factory,
app_dir=app_dir,
is_app=False,
f_stop=f_stop,
run=run,
existing_nodes_mapping=nodes_mapping,
Expand Down
162 changes: 148 additions & 14 deletions src/py/flwr/simulation/run_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@
import asyncio
import json
import logging
import sys
import threading
import traceback
from argparse import Namespace
from logging import DEBUG, ERROR, INFO, WARNING
from pathlib import Path
from time import sleep
from typing import Dict, Optional
from typing import Dict, List, Optional

from flwr.cli.config_utils import load_and_validate
from flwr.client import ClientApp
from flwr.common import EventType, event, log
from flwr.common.config import get_fused_config_from_dir, parse_config_args
from flwr.common.constant import RUN_ID_NUM_BYTES
from flwr.common.logger import set_logger_propagation, update_console_handler
from flwr.common.typing import Run
Expand All @@ -41,28 +46,129 @@
)


def _check_args_do_not_interfere(args: Namespace) -> bool:
"""Ensure decoupling of flags for different ways to start the simulation."""
mode_one_args = ["app", "run_config"]
mode_two_args = ["client_app", "server_app"]

def _resolve_message(conflict_keys: List[str]) -> str:
return ",".join([f"`--{key}`".replace("_", "-") for key in conflict_keys])

# When passing `--app`, `--app-dir` is ignored
if args.app and args.app_dir:
log(ERROR, "Either `--app` or `--app-dir` can be set, but not both.")
return False

if any(getattr(args, key) for key in mode_one_args):
if any(getattr(args, key) for key in mode_two_args):
log(
ERROR,
"Passing any of {%s} alongside with any of {%s}",
_resolve_message(mode_one_args),
_resolve_message(mode_two_args),
)
return False

if not args.app:
log(ERROR, "You need to pass --app")
return False

return True

# Ensure all args are set (required for the non-FAB mode of execution)
if not all(getattr(args, key) for key in mode_two_args):
log(
ERROR,
"Passing all of %s keys are required.",
_resolve_message(mode_two_args),
)
return False

return True


# Entry point from CLI
# pylint: disable=too-many-locals
def run_simulation_from_cli() -> None:
"""Run Simulation Engine from the CLI."""
args = _parse_args_run_simulation().parse_args()

# We are supporting two modes for the CLI entrypoint:
# 1) Running an app dir containing a `pyproject.toml`
# 2) Running any ClientApp and SeverApp w/o pyproject.toml being present
# For 2), some CLI args are compulsory, but they are not required for 1)
# We first do these checks
args_check_pass = _check_args_do_not_interfere(args)
if not args_check_pass:
sys.exit("Simulation Engine cannot start.")

run_id = (
generate_rand_int_from_bytes(RUN_ID_NUM_BYTES)
if args.run_id is None
else args.run_id
)
if args.app:
# Mode 1
app_path = Path(args.app)
if not app_path.is_dir():
log(ERROR, "--app is not a directory")
sys.exit("Simulation Engine cannot start.")

# Load pyproject.toml
config, errors, warnings = load_and_validate(
app_path / "pyproject.toml", check_module=False
)
if errors:
raise ValueError(errors)

if warnings:
log(WARNING, warnings)

if config is None:
raise ValueError("Config extracted from FAB's pyproject.toml is not valid")

# Get ClientApp and SeverApp components
app_components = config["tool"]["flwr"]["app"]["components"]
client_app_attr = app_components["clientapp"]
server_app_attr = app_components["serverapp"]

override_config = parse_config_args(args.run_config)
fused_config = get_fused_config_from_dir(app_path, override_config)
app_dir = args.app
is_app = True

else:
# Mode 2
client_app_attr = args.client_app
server_app_attr = args.server_app
override_config = {}
fused_config = None
app_dir = args.app_dir
is_app = False

# Create run
run = Run(
run_id=run_id,
fab_id="",
fab_version="",
override_config=override_config,
)

# Load JSON config
backend_config_dict = json.loads(args.backend_config)

_run_simulation(
server_app_attr=args.server_app,
client_app_attr=args.client_app,
server_app_attr=server_app_attr,
client_app_attr=client_app_attr,
num_supernodes=args.num_supernodes,
backend_name=args.backend,
backend_config=backend_config_dict,
app_dir=args.app_dir,
run=(
Run(run_id=args.run_id, fab_id="", fab_version="", override_config={})
if args.run_id
else None
),
app_dir=app_dir,
run=run,
enable_tf_gpu_growth=args.enable_tf_gpu_growth,
verbose_logging=args.verbose,
server_app_run_config=fused_config,
is_app=is_app,
)


Expand Down Expand Up @@ -205,13 +311,15 @@ def _main_loop(
backend_name: str,
backend_config_stream: str,
app_dir: str,
is_app: bool,
enable_tf_gpu_growth: bool,
run: Run,
flwr_dir: Optional[str] = None,
client_app: Optional[ClientApp] = None,
client_app_attr: Optional[str] = None,
server_app: Optional[ServerApp] = None,
server_app_attr: Optional[str] = None,
server_app_run_config: Optional[Dict[str, str]] = None,
) -> None:
"""Launch SuperLink with Simulation Engine, then ServerApp on a separate thread."""
# Initialize StateFactory
Expand All @@ -225,7 +333,9 @@ def _main_loop(
# Register run
log(DEBUG, "Pre-registering run with id %s", run.run_id)
state_factory.state().run_ids[run.run_id] = run # type: ignore
server_app_run_config: Dict[str, str] = {}

if server_app_run_config is None:
server_app_run_config = {}

# Initialize Driver
driver = InMemoryDriver(run_id=run.run_id, state_factory=state_factory)
Expand All @@ -251,6 +361,7 @@ def _main_loop(
backend_name=backend_name,
backend_config_json_stream=backend_config_stream,
app_dir=app_dir,
is_app=is_app,
state_factory=state_factory,
f_stop=f_stop,
run=run,
Expand Down Expand Up @@ -284,11 +395,13 @@ def _run_simulation(
backend_config: Optional[BackendConfig] = None,
client_app_attr: Optional[str] = None,
server_app_attr: Optional[str] = None,
server_app_run_config: Optional[Dict[str, str]] = None,
app_dir: str = "",
flwr_dir: Optional[str] = None,
run: Optional[Run] = None,
enable_tf_gpu_growth: bool = False,
verbose_logging: bool = False,
is_app: bool = False,
) -> None:
r"""Launch the Simulation Engine.
Expand Down Expand Up @@ -317,14 +430,18 @@ def _run_simulation(
parameters. Values supported in <value> are those included by
`flwr.common.typing.ConfigsRecordValues`.
client_app_attr : str
client_app_attr : Optional[str]
A path to a `ClientApp` module to be loaded: For example: `client:app` or
`project.package.module:wrapper.app`."
server_app_attr : str
server_app_attr : Optional[str]
A path to a `ServerApp` module to be loaded: For example: `server:app` or
`project.package.module:wrapper.app`."
server_app_run_config : Optional[Dict[str, str]]
Config dictionary that parameterizes the run config. It will be made accesible
to the ServerApp.
app_dir : str
Add specified directory to the PYTHONPATH and load `ClientApp` from there.
(Default: current working directory.)
Expand All @@ -346,6 +463,11 @@ def _run_simulation(
verbose_logging : bool (default: False)
When disabled, only INFO, WARNING and ERROR log messages will be shown. If
enabled, DEBUG-level logs will be displayed.
is_app : bool (default: False)
A flag that indicates whether the simulation is running an app or not. This is
needed in order to attempt loading an app's pyproject.toml when nodes register
a context object.
"""
if backend_config is None:
backend_config = {}
Expand Down Expand Up @@ -381,13 +503,15 @@ def _run_simulation(
backend_name,
backend_config_stream,
app_dir,
is_app,
enable_tf_gpu_growth,
run,
flwr_dir,
client_app,
client_app_attr,
server_app,
server_app_attr,
server_app_run_config,
)
# Detect if there is an Asyncio event loop already running.
# If yes, disable logger propagation. In environmnets
Expand Down Expand Up @@ -419,12 +543,10 @@ def _parse_args_run_simulation() -> argparse.ArgumentParser:
)
parser.add_argument(
"--server-app",
required=True,
help="For example: `server:app` or `project.package.module:wrapper.app`",
)
parser.add_argument(
"--client-app",
required=True,
help="For example: `client:app` or `project.package.module:wrapper.app`",
)
parser.add_argument(
Expand All @@ -433,6 +555,18 @@ def _parse_args_run_simulation() -> argparse.ArgumentParser:
required=True,
help="Number of simulated SuperNodes.",
)
parser.add_argument(
"--app",
type=str,
default=None,
help="Path to a directory containing a FAB-like structure with a "
"pyproject.toml.",
)
parser.add_argument(
"--run-config",
default=None,
help="Override configuration key-value pairs.",
)
parser.add_argument(
"--backend",
default="ray",
Expand Down

0 comments on commit 87de8ae

Please sign in to comment.