Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(framework) Add SuperExec servicer #3606

Merged
merged 12 commits into from
Jun 14, 2024
51 changes: 51 additions & 0 deletions src/py/flwr/superexec/exec_grpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright 2024 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""SuperExec gRPC API."""

from logging import INFO
from typing import Optional, Tuple

import grpc

from flwr.common import GRPC_MAX_MESSAGE_LENGTH
from flwr.common.logger import log
from flwr.proto.exec_pb2_grpc import add_ExecServicer_to_server
from flwr.server.superlink.fleet.grpc_bidi.grpc_server import generic_create_grpc_server

from .exec_servicer import ExecServicer
from .executor import Executor


def run_superexec_api_grpc(
address: str,
executor: Executor,
certificates: Optional[Tuple[bytes, bytes, bytes]],
) -> grpc.Server:
"""Run SuperExec API (gRPC, request-response)."""
exec_servicer: grpc.Server = ExecServicer(
executor=executor,
)
superexec_add_servicer_to_server_fn = add_ExecServicer_to_server
superexec_grpc_server = generic_create_grpc_server(
servicer_and_add_fn=(exec_servicer, superexec_add_servicer_to_server_fn),
server_address=address,
max_message_length=GRPC_MAX_MESSAGE_LENGTH,
certificates=certificates,
)

log(INFO, "Flower ECE: Starting SuperExec API (gRPC-rere) on %s", address)
superexec_grpc_server.start()

return superexec_grpc_server
54 changes: 54 additions & 0 deletions src/py/flwr/superexec/exec_servicer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright 2024 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""SuperExec API servicer."""


from logging import ERROR, INFO
from typing import Dict

import grpc

from flwr.common.logger import log
from flwr.proto import exec_pb2_grpc # pylint: disable=E0611
from flwr.proto.exec_pb2 import ( # pylint: disable=E0611
StartRunRequest,
StartRunResponse,
)

from .executor import Executor, RunTracker


class ExecServicer(exec_pb2_grpc.ExecServicer):
"""SuperExec API servicer."""

def __init__(self, executor: Executor) -> None:
self.executor = executor
self.runs: Dict[int, RunTracker] = {}

def StartRun(
self, request: StartRunRequest, context: grpc.ServicerContext
) -> StartRunResponse:
"""Create run ID."""
log(INFO, "ExecServicer.StartRun")

run = self.executor.start_run(request.fab_file)

if run is None:
log(ERROR, "Executor failed to start run")
return StartRunResponse()

self.runs[run.run_id] = run

return StartRunResponse(run_id=run.run_id)
52 changes: 52 additions & 0 deletions src/py/flwr/superexec/exec_servicer_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright 2024 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Test the SuperExec API servicer."""


import subprocess
from unittest.mock import MagicMock

from flwr.proto.exec_pb2 import StartRunRequest # pylint: disable=E0611

from .exec_servicer import ExecServicer


def test_start_run() -> None:
"""Test StartRun method of ExecServicer."""
run_res = MagicMock()
run_res.run_id = 10
with subprocess.Popen(
["echo", "success"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
) as proc:
run_res.proc = proc

executor = MagicMock()
executor.start_run = lambda _: run_res

context_mock = MagicMock()

request = StartRunRequest()
request.fab_file = b"test"

# Create a instance of FlowerServiceServicer
servicer = ExecServicer(executor=executor)

# Execute
response = servicer.StartRun(request, context_mock)

assert response.run_id == 10
54 changes: 54 additions & 0 deletions src/py/flwr/superexec/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright 2024 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Execute and monitor a Flower run."""

from abc import ABC, abstractmethod
from dataclasses import dataclass
from subprocess import Popen
from typing import Optional


@dataclass
class RunTracker:
"""Track a Flower run (composed of a run_id and the associated process)."""

run_id: int
proc: Popen # type: ignore


class Executor(ABC):
"""Execute and monitor a Flower run."""

@abstractmethod
def start_run(
self,
fab_file: bytes,
) -> Optional[RunTracker]:
"""Start a run using the given Flower FAB ID and version.

This method creates a new run on the SuperLink, returns its run_id
and also starts the run execution.

Parameters
----------
fab_file : bytes
The Flower App Bundle file bytes.

Returns
-------
run_id : Optional[RunTracker]
The run_id and the associated process of the run created by the SuperLink,
or `None` if it fails.
"""