diff --git a/src/py/flwr/superexec/exec_grpc.py b/src/py/flwr/superexec/exec_grpc.py new file mode 100644 index 00000000000..127d5615dd8 --- /dev/null +++ b/src/py/flwr/superexec/exec_grpc.py @@ -0,0 +1,51 @@ +# Copyright 2024 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""SuperExec gRPC API.""" + +from logging import INFO +from typing import Optional, Tuple + +import grpc + +from flwr.common import GRPC_MAX_MESSAGE_LENGTH +from flwr.common.logger import log +from flwr.proto.exec_pb2_grpc import add_ExecServicer_to_server +from flwr.server.superlink.fleet.grpc_bidi.grpc_server import generic_create_grpc_server + +from .exec_servicer import ExecServicer +from .executor import Executor + + +def run_superexec_api_grpc( + address: str, + executor: Executor, + certificates: Optional[Tuple[bytes, bytes, bytes]], +) -> grpc.Server: + """Run SuperExec API (gRPC, request-response).""" + exec_servicer: grpc.Server = ExecServicer( + executor=executor, + ) + superexec_add_servicer_to_server_fn = add_ExecServicer_to_server + superexec_grpc_server = generic_create_grpc_server( + servicer_and_add_fn=(exec_servicer, superexec_add_servicer_to_server_fn), + server_address=address, + max_message_length=GRPC_MAX_MESSAGE_LENGTH, + certificates=certificates, + ) + + log(INFO, "Flower ECE: Starting SuperExec API (gRPC-rere) on %s", address) + superexec_grpc_server.start() + + return superexec_grpc_server diff --git a/src/py/flwr/superexec/exec_servicer.py b/src/py/flwr/superexec/exec_servicer.py new file mode 100644 index 00000000000..aa8172c1870 --- /dev/null +++ b/src/py/flwr/superexec/exec_servicer.py @@ -0,0 +1,54 @@ +# Copyright 2024 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""SuperExec API servicer.""" + + +from logging import ERROR, INFO +from typing import Dict + +import grpc + +from flwr.common.logger import log +from flwr.proto import exec_pb2_grpc # pylint: disable=E0611 +from flwr.proto.exec_pb2 import ( # pylint: disable=E0611 + StartRunRequest, + StartRunResponse, +) + +from .executor import Executor, RunTracker + + +class ExecServicer(exec_pb2_grpc.ExecServicer): + """SuperExec API servicer.""" + + def __init__(self, executor: Executor) -> None: + self.executor = executor + self.runs: Dict[int, RunTracker] = {} + + def StartRun( + self, request: StartRunRequest, context: grpc.ServicerContext + ) -> StartRunResponse: + """Create run ID.""" + log(INFO, "ExecServicer.StartRun") + + run = self.executor.start_run(request.fab_file) + + if run is None: + log(ERROR, "Executor failed to start run") + return StartRunResponse() + + self.runs[run.run_id] = run + + return StartRunResponse(run_id=run.run_id) diff --git a/src/py/flwr/superexec/exec_servicer_test.py b/src/py/flwr/superexec/exec_servicer_test.py new file mode 100644 index 00000000000..41f67b74c48 --- /dev/null +++ b/src/py/flwr/superexec/exec_servicer_test.py @@ -0,0 +1,52 @@ +# Copyright 2024 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Test the SuperExec API servicer.""" + + +import subprocess +from unittest.mock import MagicMock + +from flwr.proto.exec_pb2 import StartRunRequest # pylint: disable=E0611 + +from .exec_servicer import ExecServicer + + +def test_start_run() -> None: + """Test StartRun method of ExecServicer.""" + run_res = MagicMock() + run_res.run_id = 10 + with subprocess.Popen( + ["echo", "success"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) as proc: + run_res.proc = proc + + executor = MagicMock() + executor.start_run = lambda _: run_res + + context_mock = MagicMock() + + request = StartRunRequest() + request.fab_file = b"test" + + # Create a instance of FlowerServiceServicer + servicer = ExecServicer(executor=executor) + + # Execute + response = servicer.StartRun(request, context_mock) + + assert response.run_id == 10 diff --git a/src/py/flwr/superexec/executor.py b/src/py/flwr/superexec/executor.py new file mode 100644 index 00000000000..f85ac4c157f --- /dev/null +++ b/src/py/flwr/superexec/executor.py @@ -0,0 +1,54 @@ +# Copyright 2024 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Execute and monitor a Flower run.""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from subprocess import Popen +from typing import Optional + + +@dataclass +class RunTracker: + """Track a Flower run (composed of a run_id and the associated process).""" + + run_id: int + proc: Popen # type: ignore + + +class Executor(ABC): + """Execute and monitor a Flower run.""" + + @abstractmethod + def start_run( + self, + fab_file: bytes, + ) -> Optional[RunTracker]: + """Start a run using the given Flower FAB ID and version. + + This method creates a new run on the SuperLink, returns its run_id + and also starts the run execution. + + Parameters + ---------- + fab_file : bytes + The Flower App Bundle file bytes. + + Returns + ------- + run_id : Optional[RunTracker] + The run_id and the associated process of the run created by the SuperLink, + or `None` if it fails. + """