Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(framework) Support running app directories directly via flower-simulation #3810

Merged
merged 24 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
679a783
init
jafermarq Jul 15, 2024
29ae904
Merge branch 'main' into support-fab-in-flower-simulation
jafermarq Jul 15, 2024
6b7e88d
fix args checks
jafermarq Jul 15, 2024
4d4b049
basic functionality; no override; no from .fab
jafermarq Jul 15, 2024
6d99d90
feat(framework:skip) Add config function for fusing dicts
charlesbvll Jul 16, 2024
8430e28
simplify
jafermarq Jul 16, 2024
97c0614
Merge branch 'add-config-function-overrides' into support-fab-in-flow…
jafermarq Jul 16, 2024
11086ba
using fused config -- but not in `ClientApp` yet
jafermarq Jul 16, 2024
e709b08
Merge branch 'main' into support-fab-in-flower-simulation
jafermarq Jul 16, 2024
46244de
updates
jafermarq Jul 16, 2024
da0d8f1
init
jafermarq Jul 16, 2024
e2d2347
fix
jafermarq Jul 16, 2024
122cb31
Apply suggestions from code review
jafermarq Jul 16, 2024
307c29e
updates
jafermarq Jul 16, 2024
952572c
Merge branch 'main' into register-context-using-fab-dir
jafermarq Jul 16, 2024
3ec626d
updates
jafermarq Jul 16, 2024
f65dcbf
Merge branch 'register-context-using-fab-dir' into support-fab-in-flo…
jafermarq Jul 16, 2024
909d410
clientapp gets runconfig from `pyproject.toml`
jafermarq Jul 16, 2024
07e4d3e
Merge branch 'main' into support-fab-in-flower-simulation
danieljanes Jul 16, 2024
7d1c429
Merge branch 'main' into support-fab-in-flower-simulation
danieljanes Jul 16, 2024
f4a10ed
updates from review
jafermarq Jul 16, 2024
28791b1
Apply suggestions from code review
danieljanes Jul 16, 2024
8f89927
Apply suggestions from code review
jafermarq Jul 16, 2024
a7a68f6
update
jafermarq Jul 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions src/py/flwr/server/superlink/fleet/vce/vce_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ def _register_nodes(


def _register_node_states(
nodes_mapping: NodeToPartitionMapping, run: Run
nodes_mapping: NodeToPartitionMapping,
run: Run,
app_dir: Optional[str] = None,
) -> Dict[int, NodeState]:
"""Create NodeState objects and pre-register the context for the run."""
node_states: Dict[int, NodeState] = {}
Expand All @@ -76,7 +78,9 @@ def _register_node_states(
)

# Pre-register Context objects
node_states[node_id].register_context(run_id=run.run_id, run=run)
node_states[node_id].register_context(
run_id=run.run_id, run=run, app_dir=app_dir
)

return node_states

Expand Down Expand Up @@ -256,6 +260,7 @@ def start_vce(
backend_name: str,
backend_config_json_stream: str,
app_dir: str,
is_app: bool,
f_stop: threading.Event,
run: Run,
flwr_dir: Optional[str] = None,
Expand Down Expand Up @@ -309,7 +314,9 @@ def start_vce(
)

# Construct mapping of NodeStates
node_states = _register_node_states(nodes_mapping=nodes_mapping, run=run)
node_states = _register_node_states(
nodes_mapping=nodes_mapping, run=run, app_dir=app_dir if is_app else None
)

# Load backend config
log(DEBUG, "Supported backends: %s", list(supported_backends.keys()))
Expand Down
1 change: 1 addition & 0 deletions src/py/flwr/server/superlink/fleet/vce/vce_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ def start_and_shutdown(
backend_config_json_stream=backend_config,
state_factory=state_factory,
app_dir=app_dir,
is_app=False,
f_stop=f_stop,
run=run,
existing_nodes_mapping=nodes_mapping,
Expand Down
163 changes: 149 additions & 14 deletions src/py/flwr/simulation/run_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@
import asyncio
import json
import logging
import sys
import threading
import traceback
from argparse import Namespace
from logging import DEBUG, ERROR, INFO, WARNING
from pathlib import Path
from time import sleep
from typing import Dict, Optional
from typing import Dict, List, Optional

from flwr.cli.config_utils import load_and_validate
from flwr.client import ClientApp
from flwr.common import EventType, event, log
from flwr.common.config import get_fused_config_from_dir, parse_config_args
from flwr.common.constant import RUN_ID_NUM_BYTES
from flwr.common.logger import set_logger_propagation, update_console_handler
from flwr.common.typing import Run
Expand All @@ -41,28 +46,130 @@
)


def _check_args_do_not_interfere(args: Namespace) -> bool:
"""Ensure decoupling of flags for different ways to start the simulation."""
mode_one_args = ["app", "run_config"]
mode_two_args = ["client_app", "server_app"]

def _resolve_message(conflict_keys: List[str]) -> str:
return ",".join([f"`--{key}`".replace("_", "-") for key in conflict_keys])

# When passing `--app`, `--app-dir` is ignored
if args.app and args.app_dir:
log(ERROR, "Either `--app` or `--app-dir` can be set, but not both.")
return False

if any(getattr(args, key) for key in mode_one_args):
if any(getattr(args, key) for key in mode_two_args):
log(
ERROR,
"Passing any of {%s} alongside with any of {%s}",
_resolve_message(mode_one_args),
_resolve_message(mode_two_args),
)
return False

if not args.app:
log(ERROR, "You need to pass --app")
return False

return True

# Ensure all args are set (required for the non-FAB mode of execution)
if not all(getattr(args, key) for key in mode_two_args):
log(
ERROR,
"Passing all of %s keys are required.",
_resolve_message(mode_two_args),
)
return False

return True


# Entry point from CLI
# pylint: disable=too-many-locals
def run_simulation_from_cli() -> None:
"""Run Simulation Engine from the CLI."""
args = _parse_args_run_simulation().parse_args()

# We are supporting two modes for the CLI entrypoint:
# 1) Running a FAB or FAB-like dir containing a pyproject.toml
danieljanes marked this conversation as resolved.
Show resolved Hide resolved
# 2) Running any ClientApp and SeverApp w/o pyproject.toml being present
# For 2) some CLI args are cumpolsory but these aren't for 1)
danieljanes marked this conversation as resolved.
Show resolved Hide resolved
# We first do these checks
args_check_pass = _check_args_do_not_interfere(args)
if not args_check_pass:
sys.exit("Simulation Engine cannot start.")

run_id = (
generate_rand_int_from_bytes(RUN_ID_NUM_BYTES)
if args.run_id is None
else args.run_id
)
if args.app:
# mode 1
jafermarq marked this conversation as resolved.
Show resolved Hide resolved
app_path = Path(args.app)
if app_path.is_dir():
# Load pyproject.toml
config, errors, warnings = load_and_validate(app_path / "pyproject.toml")
danieljanes marked this conversation as resolved.
Show resolved Hide resolved
if errors:
raise ValueError(errors)

if warnings:
log(WARNING, warnings)

if config is None:
raise ValueError(
"Config extracted from FAB's pyproject.toml is not valid"
)

# Get ClientApp and SeverApp components
flower_components = config["tool"]["flwr"]["app"]["components"]
client_app_attr = flower_components["clientapp"]
server_app_attr = flower_components["serverapp"]
jafermarq marked this conversation as resolved.
Show resolved Hide resolved

else:
log(ERROR, "--app is not a directory")
sys.exit("Simulation Engine cannot start.")
jafermarq marked this conversation as resolved.
Show resolved Hide resolved

override_config = parse_config_args(args.run_config)
fused_config = get_fused_config_from_dir(app_path, override_config)
app_dir = args.app
is_app = True

else:
# mode 2
jafermarq marked this conversation as resolved.
Show resolved Hide resolved
client_app_attr = args.client_app
server_app_attr = args.server_app
override_config = {}
fused_config = None
app_dir = args.app_dir
is_app = False

# Create run
run = Run(
run_id=run_id,
fab_id="",
fab_version="",
override_config=override_config,
)

# Load JSON config
backend_config_dict = json.loads(args.backend_config)

_run_simulation(
server_app_attr=args.server_app,
client_app_attr=args.client_app,
server_app_attr=server_app_attr,
client_app_attr=client_app_attr,
num_supernodes=args.num_supernodes,
backend_name=args.backend,
backend_config=backend_config_dict,
app_dir=args.app_dir,
run=(
Run(run_id=args.run_id, fab_id="", fab_version="", override_config={})
if args.run_id
else None
),
app_dir=app_dir,
run=run,
enable_tf_gpu_growth=args.enable_tf_gpu_growth,
verbose_logging=args.verbose,
server_app_run_config=fused_config,
is_app=is_app,
)


Expand Down Expand Up @@ -205,13 +312,15 @@ def _main_loop(
backend_name: str,
backend_config_stream: str,
app_dir: str,
is_app: bool,
enable_tf_gpu_growth: bool,
run: Run,
flwr_dir: Optional[str] = None,
client_app: Optional[ClientApp] = None,
client_app_attr: Optional[str] = None,
server_app: Optional[ServerApp] = None,
server_app_attr: Optional[str] = None,
server_app_run_config: Optional[Dict[str, str]] = None,
) -> None:
"""Launch SuperLink with Simulation Engine, then ServerApp on a separate thread."""
# Initialize StateFactory
Expand All @@ -225,7 +334,9 @@ def _main_loop(
# Register run
log(DEBUG, "Pre-registering run with id %s", run.run_id)
state_factory.state().run_ids[run.run_id] = run # type: ignore
server_app_run_config: Dict[str, str] = {}

if server_app_run_config is None:
server_app_run_config = {}

# Initialize Driver
driver = InMemoryDriver(run_id=run.run_id, state_factory=state_factory)
Expand All @@ -251,6 +362,7 @@ def _main_loop(
backend_name=backend_name,
backend_config_json_stream=backend_config_stream,
app_dir=app_dir,
is_app=is_app,
state_factory=state_factory,
f_stop=f_stop,
run=run,
Expand Down Expand Up @@ -284,11 +396,13 @@ def _run_simulation(
backend_config: Optional[BackendConfig] = None,
client_app_attr: Optional[str] = None,
server_app_attr: Optional[str] = None,
server_app_run_config: Optional[Dict[str, str]] = None,
app_dir: str = "",
flwr_dir: Optional[str] = None,
run: Optional[Run] = None,
enable_tf_gpu_growth: bool = False,
verbose_logging: bool = False,
is_app: bool = False,
) -> None:
r"""Launch the Simulation Engine.

Expand Down Expand Up @@ -317,14 +431,18 @@ def _run_simulation(
parameters. Values supported in <value> are those included by
`flwr.common.typing.ConfigsRecordValues`.

client_app_attr : str
client_app_attr : Optional[str]
A path to a `ClientApp` module to be loaded: For example: `client:app` or
`project.package.module:wrapper.app`."

server_app_attr : str
server_app_attr : Optional[str]
A path to a `ServerApp` module to be loaded: For example: `server:app` or
`project.package.module:wrapper.app`."

server_app_run_config : Optional[Dict[str, str]]
Config dictionary that parameterizes the run config. It will be made accesible
to the ServerApp.

app_dir : str
Add specified directory to the PYTHONPATH and load `ClientApp` from there.
(Default: current working directory.)
Expand All @@ -346,6 +464,11 @@ def _run_simulation(
verbose_logging : bool (default: False)
When disabled, only INFO, WARNING and ERROR log messages will be shown. If
enabled, DEBUG-level logs will be displayed.

is_app : bool (default: False)
A flag that indicates whether the simulation is running an app or not. This is
needed in order to attempt loading an app's pyproject.toml when nodes register
a context object.
"""
if backend_config is None:
backend_config = {}
Expand Down Expand Up @@ -381,13 +504,15 @@ def _run_simulation(
backend_name,
backend_config_stream,
app_dir,
is_app,
enable_tf_gpu_growth,
run,
flwr_dir,
client_app,
client_app_attr,
server_app,
server_app_attr,
server_app_run_config,
)
# Detect if there is an Asyncio event loop already running.
# If yes, disable logger propagation. In environmnets
Expand Down Expand Up @@ -419,12 +544,10 @@ def _parse_args_run_simulation() -> argparse.ArgumentParser:
)
parser.add_argument(
"--server-app",
required=True,
help="For example: `server:app` or `project.package.module:wrapper.app`",
)
parser.add_argument(
"--client-app",
required=True,
help="For example: `client:app` or `project.package.module:wrapper.app`",
)
parser.add_argument(
Expand All @@ -433,6 +556,18 @@ def _parse_args_run_simulation() -> argparse.ArgumentParser:
required=True,
jafermarq marked this conversation as resolved.
Show resolved Hide resolved
help="Number of simulated SuperNodes.",
)
parser.add_argument(
"--app",
danieljanes marked this conversation as resolved.
Show resolved Hide resolved
type=str,
default=None,
help="Path to a directory containing a FAB-like structure with a "
"pyproject.toml.",
)
parser.add_argument(
"--run-config",
default=None,
help="Override configuration key-value pairs.",
)
parser.add_argument(
"--backend",
default="ray",
Expand Down