From 3124d912bc52f32ebebd9998a646c9e2905d1ab8 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Sat, 17 Aug 2024 07:25:56 +0100 Subject: [PATCH 01/24] rename --- src/py/flwr/client/app.py | 2 +- src/py/flwr/client/{process => clientapp}/__init__.py | 0 .../client/{process => clientapp}/clientappio_servicer.py | 0 .../{process => clientapp}/clientappio_servicer_test.py | 0 src/py/flwr/client/{process => clientapp}/process.py | 0 src/py/flwr/client/{process => clientapp}/utils.py | 0 src/py/flwr/client/supernode/app.py | 4 ++-- src/py/flwr/server/superlink/fleet/vce/vce_api.py | 2 +- 8 files changed, 4 insertions(+), 4 deletions(-) rename src/py/flwr/client/{process => clientapp}/__init__.py (100%) rename src/py/flwr/client/{process => clientapp}/clientappio_servicer.py (100%) rename src/py/flwr/client/{process => clientapp}/clientappio_servicer_test.py (100%) rename src/py/flwr/client/{process => clientapp}/process.py (100%) rename src/py/flwr/client/{process => clientapp}/utils.py (100%) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index e324d6c2b82..66e86693107 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -58,7 +58,7 @@ from .message_handler.message_handler import handle_control_message from .node_state import NodeState from .numpy_client import NumPyClient -from .process.clientappio_servicer import ClientAppIoInputs, ClientAppIoServicer +from .clientapp.clientappio_servicer import ClientAppIoInputs, ClientAppIoServicer ADDRESS_CLIENTAPPIO_API_GRPC_RERE = "0.0.0.0:9094" diff --git a/src/py/flwr/client/process/__init__.py b/src/py/flwr/client/clientapp/__init__.py similarity index 100% rename from src/py/flwr/client/process/__init__.py rename to src/py/flwr/client/clientapp/__init__.py diff --git a/src/py/flwr/client/process/clientappio_servicer.py b/src/py/flwr/client/clientapp/clientappio_servicer.py similarity index 100% rename from src/py/flwr/client/process/clientappio_servicer.py rename to src/py/flwr/client/clientapp/clientappio_servicer.py diff --git a/src/py/flwr/client/process/clientappio_servicer_test.py b/src/py/flwr/client/clientapp/clientappio_servicer_test.py similarity index 100% rename from src/py/flwr/client/process/clientappio_servicer_test.py rename to src/py/flwr/client/clientapp/clientappio_servicer_test.py diff --git a/src/py/flwr/client/process/process.py b/src/py/flwr/client/clientapp/process.py similarity index 100% rename from src/py/flwr/client/process/process.py rename to src/py/flwr/client/clientapp/process.py diff --git a/src/py/flwr/client/process/utils.py b/src/py/flwr/client/clientapp/utils.py similarity index 100% rename from src/py/flwr/client/process/utils.py rename to src/py/flwr/client/clientapp/utils.py diff --git a/src/py/flwr/client/supernode/app.py b/src/py/flwr/client/supernode/app.py index b299aa46839..1feaf655f1f 100644 --- a/src/py/flwr/client/supernode/app.py +++ b/src/py/flwr/client/supernode/app.py @@ -38,8 +38,8 @@ from flwr.common.logger import log, warn_deprecated_feature from ..app import start_client_internal -from ..process.process import run_clientapp -from ..process.utils import get_load_client_app_fn +from ..clientapp.process import run_clientapp +from ..clientapp.utils import get_load_client_app_fn ADDRESS_FLEET_API_GRPC_RERE = "0.0.0.0:9092" diff --git a/src/py/flwr/server/superlink/fleet/vce/vce_api.py b/src/py/flwr/server/superlink/fleet/vce/vce_api.py index aa726f902d4..aceb3c9dc30 100644 --- a/src/py/flwr/server/superlink/fleet/vce/vce_api.py +++ b/src/py/flwr/server/superlink/fleet/vce/vce_api.py @@ -28,7 +28,7 @@ from flwr.client.client_app import ClientApp, ClientAppException, LoadClientAppError from flwr.client.node_state import NodeState -from flwr.client.process.utils import get_load_client_app_fn +from flwr.client.clientapp.utils import get_load_client_app_fn from flwr.common.constant import ( NUM_PARTITIONS_KEY, PARTITION_ID_KEY, From fb254e7e952431b651dec01cbee231f9d719f2cf Mon Sep 17 00:00:00 2001 From: jafermarq Date: Sat, 17 Aug 2024 07:27:19 +0100 Subject: [PATCH 02/24] format --- src/py/flwr/client/app.py | 2 +- src/py/flwr/client/clientapp/clientappio_servicer_test.py | 2 +- src/py/flwr/server/superlink/fleet/vce/vce_api.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 66e86693107..8b7cab79ef5 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -52,13 +52,13 @@ from flwr.server.superlink.fleet.grpc_bidi.grpc_server import generic_create_grpc_server from flwr.server.superlink.state.utils import generate_rand_int_from_bytes +from .clientapp.clientappio_servicer import ClientAppIoInputs, ClientAppIoServicer from .grpc_adapter_client.connection import grpc_adapter from .grpc_client.connection import grpc_connection from .grpc_rere_client.connection import grpc_request_response from .message_handler.message_handler import handle_control_message from .node_state import NodeState from .numpy_client import NumPyClient -from .clientapp.clientappio_servicer import ClientAppIoInputs, ClientAppIoServicer ADDRESS_CLIENTAPPIO_API_GRPC_RERE = "0.0.0.0:9094" diff --git a/src/py/flwr/client/clientapp/clientappio_servicer_test.py b/src/py/flwr/client/clientapp/clientappio_servicer_test.py index 4831825acf3..b1083e60f49 100644 --- a/src/py/flwr/client/clientapp/clientappio_servicer_test.py +++ b/src/py/flwr/client/clientapp/clientappio_servicer_test.py @@ -17,7 +17,7 @@ import unittest from unittest.mock import Mock, patch -from flwr.client.process.process import pull_message, push_message +from flwr.client.clientapp.process import pull_message, push_message from flwr.common import Context, Message, typing from flwr.common.serde import ( clientappstatus_from_proto, diff --git a/src/py/flwr/server/superlink/fleet/vce/vce_api.py b/src/py/flwr/server/superlink/fleet/vce/vce_api.py index aceb3c9dc30..81eb6cb6569 100644 --- a/src/py/flwr/server/superlink/fleet/vce/vce_api.py +++ b/src/py/flwr/server/superlink/fleet/vce/vce_api.py @@ -27,8 +27,8 @@ from typing import Callable, Dict, Optional from flwr.client.client_app import ClientApp, ClientAppException, LoadClientAppError -from flwr.client.node_state import NodeState from flwr.client.clientapp.utils import get_load_client_app_fn +from flwr.client.node_state import NodeState from flwr.common.constant import ( NUM_PARTITIONS_KEY, PARTITION_ID_KEY, From aa51b956f91697265b8a731c08012ef37623c242 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Sat, 17 Aug 2024 07:31:36 +0100 Subject: [PATCH 03/24] renamed process.py to app.py --- src/py/flwr/client/clientapp/{process.py => app.py} | 0 src/py/flwr/client/clientapp/clientappio_servicer_test.py | 2 +- src/py/flwr/client/supernode/app.py | 2 +- 3 files changed, 2 insertions(+), 2 deletions(-) rename src/py/flwr/client/clientapp/{process.py => app.py} (100%) diff --git a/src/py/flwr/client/clientapp/process.py b/src/py/flwr/client/clientapp/app.py similarity index 100% rename from src/py/flwr/client/clientapp/process.py rename to src/py/flwr/client/clientapp/app.py diff --git a/src/py/flwr/client/clientapp/clientappio_servicer_test.py b/src/py/flwr/client/clientapp/clientappio_servicer_test.py index b1083e60f49..71245f4a859 100644 --- a/src/py/flwr/client/clientapp/clientappio_servicer_test.py +++ b/src/py/flwr/client/clientapp/clientappio_servicer_test.py @@ -17,7 +17,7 @@ import unittest from unittest.mock import Mock, patch -from flwr.client.clientapp.process import pull_message, push_message +from flwr.client.clientapp.app import pull_message, push_message from flwr.common import Context, Message, typing from flwr.common.serde import ( clientappstatus_from_proto, diff --git a/src/py/flwr/client/supernode/app.py b/src/py/flwr/client/supernode/app.py index 1feaf655f1f..0f8d9ab675b 100644 --- a/src/py/flwr/client/supernode/app.py +++ b/src/py/flwr/client/supernode/app.py @@ -38,7 +38,7 @@ from flwr.common.logger import log, warn_deprecated_feature from ..app import start_client_internal -from ..clientapp.process import run_clientapp +from ..clientapp.app import run_clientapp from ..clientapp.utils import get_load_client_app_fn ADDRESS_FLEET_API_GRPC_RERE = "0.0.0.0:9092" From 2e0848c23c31b382510d849a1020a689eb6d1b62 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Sat, 17 Aug 2024 07:37:45 +0100 Subject: [PATCH 04/24] init --- pyproject.toml | 2 +- src/py/flwr/client/clientapp/__init__.py | 7 ++++++ src/py/flwr/client/clientapp/app.py | 27 ++++++++++++++++++++++++ src/py/flwr/client/supernode/__init__.py | 2 -- src/py/flwr/client/supernode/app.py | 27 ------------------------ 5 files changed, 35 insertions(+), 30 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 1369ee4e968..0d0138a5689 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,7 +59,7 @@ flower-supernode = "flwr.client:run_supernode" flower-client-app = "flwr.client:run_client_app" flower-server-app = "flwr.server:run_server_app" flower-simulation = "flwr.simulation.run_simulation:run_simulation_from_cli" -flwr-clientapp = "flwr.client.supernode:flwr_clientapp" +flwr-clientapp = "flwr.client.clientapp:flwr_clientapp" [tool.poetry.dependencies] python = "^3.8" diff --git a/src/py/flwr/client/clientapp/__init__.py b/src/py/flwr/client/clientapp/__init__.py index 653cee434c1..e877ee22db1 100644 --- a/src/py/flwr/client/clientapp/__init__.py +++ b/src/py/flwr/client/clientapp/__init__.py @@ -13,3 +13,10 @@ # limitations under the License. # ============================================================================== """Flower AppIO service.""" + + +from .app import flwr_clientapp as flwr_clientapp + +__all__ = [ + "flwr_clientapp", +] diff --git a/src/py/flwr/client/clientapp/app.py b/src/py/flwr/client/clientapp/app.py index 18766d2dd08..18602a4bb7a 100644 --- a/src/py/flwr/client/clientapp/app.py +++ b/src/py/flwr/client/clientapp/app.py @@ -14,6 +14,7 @@ # ============================================================================== """Flower ClientApp process.""" +import argparse from logging import DEBUG, ERROR, INFO from typing import Tuple @@ -46,6 +47,32 @@ from .utils import get_load_client_app_fn +def flwr_clientapp() -> None: + """Run process-isolated Flower ClientApp.""" + log(INFO, "Starting Flower ClientApp") + + parser = argparse.ArgumentParser( + description="Run a Flower ClientApp", + ) + parser.add_argument( + "--supernode", + help="Address of SuperNode ClientAppIo gRPC servicer", + ) + parser.add_argument( + "--token", + help="Unique token generated by SuperNode for each ClientApp execution", + ) + args = parser.parse_args() + log( + DEBUG, + "Staring isolated `ClientApp` connected to SuperNode ClientAppIo at %s " + "with the token %s", + args.supernode, + args.token, + ) + run_clientapp(supernode=args.supernode, token=int(args.token)) + + def on_channel_state_change(channel_connectivity: str) -> None: """Log channel connectivity.""" log(DEBUG, channel_connectivity) diff --git a/src/py/flwr/client/supernode/__init__.py b/src/py/flwr/client/supernode/__init__.py index 128d0286d62..bc505f69387 100644 --- a/src/py/flwr/client/supernode/__init__.py +++ b/src/py/flwr/client/supernode/__init__.py @@ -15,12 +15,10 @@ """Flower SuperNode.""" -from .app import flwr_clientapp as flwr_clientapp from .app import run_client_app as run_client_app from .app import run_supernode as run_supernode __all__ = [ - "flwr_clientapp", "run_client_app", "run_supernode", ] diff --git a/src/py/flwr/client/supernode/app.py b/src/py/flwr/client/supernode/app.py index 0f8d9ab675b..ef17b9375d7 100644 --- a/src/py/flwr/client/supernode/app.py +++ b/src/py/flwr/client/supernode/app.py @@ -38,7 +38,6 @@ from flwr.common.logger import log, warn_deprecated_feature from ..app import start_client_internal -from ..clientapp.app import run_clientapp from ..clientapp.utils import get_load_client_app_fn ADDRESS_FLEET_API_GRPC_RERE = "0.0.0.0:9092" @@ -115,32 +114,6 @@ def run_client_app() -> None: register_exit_handlers(event_type=EventType.RUN_CLIENT_APP_LEAVE) -def flwr_clientapp() -> None: - """Run process-isolated Flower ClientApp.""" - log(INFO, "Starting Flower ClientApp") - - parser = argparse.ArgumentParser( - description="Run a Flower ClientApp", - ) - parser.add_argument( - "--supernode", - help="Address of SuperNode ClientAppIo gRPC servicer", - ) - parser.add_argument( - "--token", - help="Unique token generated by SuperNode for each ClientApp execution", - ) - args = parser.parse_args() - log( - DEBUG, - "Staring isolated `ClientApp` connected to SuperNode ClientAppIo at %s " - "with the token %s", - args.supernode, - args.token, - ) - run_clientapp(supernode=args.supernode, token=int(args.token)) - - def _warn_deprecated_server_arg(args: argparse.Namespace) -> None: """Warn about the deprecated argument `--server`.""" if args.server != ADDRESS_FLEET_API_GRPC_RERE: From 31ef0a615062ae69ce1eb46a4acba57fe661ef56 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Sat, 17 Aug 2024 07:39:51 +0100 Subject: [PATCH 05/24] test fix --- src/py/flwr/client/clientapp/clientappio_servicer_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/client/clientapp/clientappio_servicer_test.py b/src/py/flwr/client/clientapp/clientappio_servicer_test.py index 71245f4a859..f2efcef14e8 100644 --- a/src/py/flwr/client/clientapp/clientappio_servicer_test.py +++ b/src/py/flwr/client/clientapp/clientappio_servicer_test.py @@ -50,7 +50,7 @@ def setUp(self) -> None: self.maker = RecordMaker() self.mock_stub = Mock() self.patcher = patch( - "flwr.client.process.process.ClientAppIoStub", return_value=self.mock_stub + "flwr.client.clientapp.app.ClientAppIoStub", return_value=self.mock_stub ) self.patcher.start() From 634b5488d59ca80a5f89b8e938c3c05a44eaece1 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Sat, 17 Aug 2024 08:20:22 +0100 Subject: [PATCH 06/24] init --- src/proto/flwr/proto/clientappio.proto | 6 ++++ src/py/flwr/client/clientapp/app.py | 10 +++++- .../client/clientapp/clientappio_servicer.py | 16 +++++++++ .../clientapp/clientappio_servicer_test.py | 19 ++++++++++- src/py/flwr/proto/clientappio_pb2.py | 14 +++++--- src/py/flwr/proto/clientappio_pb2.pyi | 17 ++++++++++ src/py/flwr/proto/clientappio_pb2_grpc.py | 34 +++++++++++++++++++ src/py/flwr/proto/clientappio_pb2_grpc.pyi | 13 +++++++ 8 files changed, 122 insertions(+), 7 deletions(-) diff --git a/src/proto/flwr/proto/clientappio.proto b/src/proto/flwr/proto/clientappio.proto index d73ed086f40..a001a81b60c 100644 --- a/src/proto/flwr/proto/clientappio.proto +++ b/src/proto/flwr/proto/clientappio.proto @@ -14,6 +14,9 @@ service ClientAppIo { // Send updated Message and Context rpc PushClientAppOutputs(PushClientAppOutputsRequest) returns (PushClientAppOutputsResponse) {} + + // Get token + rpc GetToken(GetTokenRequest) returns (GetTokenResponse) {} } enum ClientAppOutputCode { @@ -38,3 +41,6 @@ message PushClientAppOutputsRequest { Context context = 3; } message PushClientAppOutputsResponse { ClientAppOutputStatus status = 1; } + +message GetTokenRequest {} +message GetTokenResponse { sint64 token = 1; } diff --git a/src/py/flwr/client/clientapp/app.py b/src/py/flwr/client/clientapp/app.py index 18602a4bb7a..03a3a84af03 100644 --- a/src/py/flwr/client/clientapp/app.py +++ b/src/py/flwr/client/clientapp/app.py @@ -16,7 +16,7 @@ import argparse from logging import DEBUG, ERROR, INFO -from typing import Tuple +from typing import Optional, Tuple import grpc @@ -37,6 +37,8 @@ # pylint: disable=E0611 from flwr.proto.clientappio_pb2 import ( + GetTokenRequest, + GetTokenResponse, PullClientAppInputsRequest, PullClientAppInputsResponse, PushClientAppOutputsRequest, @@ -145,6 +147,12 @@ def run_clientapp( # pylint: disable=R0914 channel.close() +def get_token(stub: grpc.Channel) -> Optional[int]: + """Get a token from SuperNode.""" + res: GetTokenResponse = stub.GetToken(GetTokenRequest()) + return res.token + + def pull_message(stub: grpc.Channel, token: int) -> Tuple[Message, Context, Run]: """Pull message from SuperNode to ClientApp.""" res: PullClientAppInputsResponse = stub.PullClientAppInputs( diff --git a/src/py/flwr/client/clientapp/clientappio_servicer.py b/src/py/flwr/client/clientapp/clientappio_servicer.py index 9f009f2c8a9..33601403a3c 100644 --- a/src/py/flwr/client/clientapp/clientappio_servicer.py +++ b/src/py/flwr/client/clientapp/clientappio_servicer.py @@ -36,6 +36,8 @@ # pylint: disable=E0611 from flwr.proto import clientappio_pb2_grpc from flwr.proto.clientappio_pb2 import ( # pylint: disable=E0401 + GetTokenRequest, + GetTokenResponse, PullClientAppInputsRequest, PullClientAppInputsResponse, PushClientAppOutputsRequest, @@ -68,6 +70,7 @@ class ClientAppIoServicer(clientappio_pb2_grpc.ClientAppIoServicer): def __init__(self) -> None: self.clientapp_input: Optional[ClientAppIoInputs] = None self.clientapp_output: Optional[ClientAppIoOutputs] = None + self.token: Optional[int] = None def PullClientAppInputs( self, request: PullClientAppInputsRequest, context: grpc.ServicerContext @@ -122,6 +125,19 @@ def PushClientAppOutputs( proto_status = clientappstatus_to_proto(status=status) return PushClientAppOutputsResponse(status=proto_status) + def GetToken( + self, request: GetTokenRequest, context: grpc.ServicerContext + ) -> GetTokenResponse: + """Get token.""" + log(DEBUG, "ClientAppIo.GetToken") + res = GetTokenResponse() + if self.token: + # If token is set, use it in response + res.token = self.token + # Resetting token + self.token = None + return res + def set_inputs(self, clientapp_input: ClientAppIoInputs) -> None: """Set ClientApp inputs.""" log(DEBUG, "ClientAppIo.SetInputs") diff --git a/src/py/flwr/client/clientapp/clientappio_servicer_test.py b/src/py/flwr/client/clientapp/clientappio_servicer_test.py index f2efcef14e8..94db8cf5aeb 100644 --- a/src/py/flwr/client/clientapp/clientappio_servicer_test.py +++ b/src/py/flwr/client/clientapp/clientappio_servicer_test.py @@ -17,8 +17,9 @@ import unittest from unittest.mock import Mock, patch -from flwr.client.clientapp.app import pull_message, push_message +from flwr.client.clientapp.app import get_token, pull_message, push_message from flwr.common import Context, Message, typing +from flwr.common.constant import RUN_ID_NUM_BYTES from flwr.common.serde import ( clientappstatus_from_proto, clientappstatus_to_proto, @@ -28,11 +29,13 @@ # pylint:disable=E0611 from flwr.proto.clientappio_pb2 import ( + GetTokenResponse, PullClientAppInputsResponse, PushClientAppOutputsResponse, ) from flwr.proto.message_pb2 import Context as ProtoContext from flwr.proto.run_pb2 import Run as ProtoRun +from flwr.server.superlink.state.utils import generate_rand_int_from_bytes from .clientappio_servicer import ( ClientAppIoInputs, @@ -194,3 +197,17 @@ def test_push_clientapp_outputs(self) -> None: # Assert self.mock_stub.PushClientAppOutputs.assert_called_once() self.assertEqual(status.message, "SUCCESS") + + def test_get_token(self) -> None: + """Test getting a token from SuperNode.""" + # Prepare + token: int = generate_rand_int_from_bytes(RUN_ID_NUM_BYTES) + mock_response = GetTokenResponse(token=token) + self.mock_stub.GetToken.return_value = mock_response + + # Execute + res = get_token(stub=self.mock_stub) + + # Assert + self.mock_stub.GetToken.assert_called_once() + self.assertEqual(res, token) diff --git a/src/py/flwr/proto/clientappio_pb2.py b/src/py/flwr/proto/clientappio_pb2.py index 2234e3c2a8a..7679bcb3618 100644 --- a/src/py/flwr/proto/clientappio_pb2.py +++ b/src/py/flwr/proto/clientappio_pb2.py @@ -17,15 +17,15 @@ from flwr.proto import message_pb2 as flwr_dot_proto_dot_message__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1c\x66lwr/proto/clientappio.proto\x12\nflwr.proto\x1a\x14\x66lwr/proto/fab.proto\x1a\x14\x66lwr/proto/run.proto\x1a\x18\x66lwr/proto/message.proto\"W\n\x15\x43lientAppOutputStatus\x12-\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x1f.flwr.proto.ClientAppOutputCode\x12\x0f\n\x07message\x18\x02 \x01(\t\"+\n\x1aPullClientAppInputsRequest\x12\r\n\x05token\x18\x01 \x01(\x12\"\x87\x01\n\x1bPullClientAppInputsResponse\x12$\n\x07message\x18\x01 \x01(\x0b\x32\x13.flwr.proto.Message\x12$\n\x07\x63ontext\x18\x02 \x01(\x0b\x32\x13.flwr.proto.Context\x12\x1c\n\x03run\x18\x03 \x01(\x0b\x32\x0f.flwr.proto.Run\"x\n\x1bPushClientAppOutputsRequest\x12\r\n\x05token\x18\x01 \x01(\x12\x12$\n\x07message\x18\x02 \x01(\x0b\x32\x13.flwr.proto.Message\x12$\n\x07\x63ontext\x18\x03 \x01(\x0b\x32\x13.flwr.proto.Context\"Q\n\x1cPushClientAppOutputsResponse\x12\x31\n\x06status\x18\x01 \x01(\x0b\x32!.flwr.proto.ClientAppOutputStatus*L\n\x13\x43lientAppOutputCode\x12\x0b\n\x07SUCCESS\x10\x00\x12\x15\n\x11\x44\x45\x41\x44LINE_EXCEEDED\x10\x01\x12\x11\n\rUNKNOWN_ERROR\x10\x02\x32\xe4\x01\n\x0b\x43lientAppIo\x12h\n\x13PullClientAppInputs\x12&.flwr.proto.PullClientAppInputsRequest\x1a\'.flwr.proto.PullClientAppInputsResponse\"\x00\x12k\n\x14PushClientAppOutputs\x12\'.flwr.proto.PushClientAppOutputsRequest\x1a(.flwr.proto.PushClientAppOutputsResponse\"\x00\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1c\x66lwr/proto/clientappio.proto\x12\nflwr.proto\x1a\x14\x66lwr/proto/fab.proto\x1a\x14\x66lwr/proto/run.proto\x1a\x18\x66lwr/proto/message.proto\"W\n\x15\x43lientAppOutputStatus\x12-\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x1f.flwr.proto.ClientAppOutputCode\x12\x0f\n\x07message\x18\x02 \x01(\t\"+\n\x1aPullClientAppInputsRequest\x12\r\n\x05token\x18\x01 \x01(\x12\"\x87\x01\n\x1bPullClientAppInputsResponse\x12$\n\x07message\x18\x01 \x01(\x0b\x32\x13.flwr.proto.Message\x12$\n\x07\x63ontext\x18\x02 \x01(\x0b\x32\x13.flwr.proto.Context\x12\x1c\n\x03run\x18\x03 \x01(\x0b\x32\x0f.flwr.proto.Run\"x\n\x1bPushClientAppOutputsRequest\x12\r\n\x05token\x18\x01 \x01(\x12\x12$\n\x07message\x18\x02 \x01(\x0b\x32\x13.flwr.proto.Message\x12$\n\x07\x63ontext\x18\x03 \x01(\x0b\x32\x13.flwr.proto.Context\"Q\n\x1cPushClientAppOutputsResponse\x12\x31\n\x06status\x18\x01 \x01(\x0b\x32!.flwr.proto.ClientAppOutputStatus\"\x11\n\x0fGetTokenRequest\"!\n\x10GetTokenResponse\x12\r\n\x05token\x18\x01 \x01(\x12*L\n\x13\x43lientAppOutputCode\x12\x0b\n\x07SUCCESS\x10\x00\x12\x15\n\x11\x44\x45\x41\x44LINE_EXCEEDED\x10\x01\x12\x11\n\rUNKNOWN_ERROR\x10\x02\x32\xad\x02\n\x0b\x43lientAppIo\x12h\n\x13PullClientAppInputs\x12&.flwr.proto.PullClientAppInputsRequest\x1a\'.flwr.proto.PullClientAppInputsResponse\"\x00\x12k\n\x14PushClientAppOutputs\x12\'.flwr.proto.PushClientAppOutputsRequest\x1a(.flwr.proto.PushClientAppOutputsResponse\"\x00\x12G\n\x08GetToken\x12\x1b.flwr.proto.GetTokenRequest\x1a\x1c.flwr.proto.GetTokenResponse\"\x00\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'flwr.proto.clientappio_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None - _globals['_CLIENTAPPOUTPUTCODE']._serialized_start=591 - _globals['_CLIENTAPPOUTPUTCODE']._serialized_end=667 + _globals['_CLIENTAPPOUTPUTCODE']._serialized_start=645 + _globals['_CLIENTAPPOUTPUTCODE']._serialized_end=721 _globals['_CLIENTAPPOUTPUTSTATUS']._serialized_start=114 _globals['_CLIENTAPPOUTPUTSTATUS']._serialized_end=201 _globals['_PULLCLIENTAPPINPUTSREQUEST']._serialized_start=203 @@ -36,6 +36,10 @@ _globals['_PUSHCLIENTAPPOUTPUTSREQUEST']._serialized_end=506 _globals['_PUSHCLIENTAPPOUTPUTSRESPONSE']._serialized_start=508 _globals['_PUSHCLIENTAPPOUTPUTSRESPONSE']._serialized_end=589 - _globals['_CLIENTAPPIO']._serialized_start=670 - _globals['_CLIENTAPPIO']._serialized_end=898 + _globals['_GETTOKENREQUEST']._serialized_start=591 + _globals['_GETTOKENREQUEST']._serialized_end=608 + _globals['_GETTOKENRESPONSE']._serialized_start=610 + _globals['_GETTOKENRESPONSE']._serialized_end=643 + _globals['_CLIENTAPPIO']._serialized_start=724 + _globals['_CLIENTAPPIO']._serialized_end=1025 # @@protoc_insertion_point(module_scope) diff --git a/src/py/flwr/proto/clientappio_pb2.pyi b/src/py/flwr/proto/clientappio_pb2.pyi index 31c9dc4c6d1..aacf3c6a232 100644 --- a/src/py/flwr/proto/clientappio_pb2.pyi +++ b/src/py/flwr/proto/clientappio_pb2.pyi @@ -108,3 +108,20 @@ class PushClientAppOutputsResponse(google.protobuf.message.Message): def HasField(self, field_name: typing_extensions.Literal["status",b"status"]) -> builtins.bool: ... def ClearField(self, field_name: typing_extensions.Literal["status",b"status"]) -> None: ... global___PushClientAppOutputsResponse = PushClientAppOutputsResponse + +class GetTokenRequest(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + def __init__(self, + ) -> None: ... +global___GetTokenRequest = GetTokenRequest + +class GetTokenResponse(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + TOKEN_FIELD_NUMBER: builtins.int + token: builtins.int + def __init__(self, + *, + token: builtins.int = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["token",b"token"]) -> None: ... +global___GetTokenResponse = GetTokenResponse diff --git a/src/py/flwr/proto/clientappio_pb2_grpc.py b/src/py/flwr/proto/clientappio_pb2_grpc.py index b244ef4a5b1..0780a208e3a 100644 --- a/src/py/flwr/proto/clientappio_pb2_grpc.py +++ b/src/py/flwr/proto/clientappio_pb2_grpc.py @@ -24,6 +24,11 @@ def __init__(self, channel): request_serializer=flwr_dot_proto_dot_clientappio__pb2.PushClientAppOutputsRequest.SerializeToString, response_deserializer=flwr_dot_proto_dot_clientappio__pb2.PushClientAppOutputsResponse.FromString, ) + self.GetToken = channel.unary_unary( + '/flwr.proto.ClientAppIo/GetToken', + request_serializer=flwr_dot_proto_dot_clientappio__pb2.GetTokenRequest.SerializeToString, + response_deserializer=flwr_dot_proto_dot_clientappio__pb2.GetTokenResponse.FromString, + ) class ClientAppIoServicer(object): @@ -43,6 +48,13 @@ def PushClientAppOutputs(self, request, context): context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') + def GetToken(self, request, context): + """Get token + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + def add_ClientAppIoServicer_to_server(servicer, server): rpc_method_handlers = { @@ -56,6 +68,11 @@ def add_ClientAppIoServicer_to_server(servicer, server): request_deserializer=flwr_dot_proto_dot_clientappio__pb2.PushClientAppOutputsRequest.FromString, response_serializer=flwr_dot_proto_dot_clientappio__pb2.PushClientAppOutputsResponse.SerializeToString, ), + 'GetToken': grpc.unary_unary_rpc_method_handler( + servicer.GetToken, + request_deserializer=flwr_dot_proto_dot_clientappio__pb2.GetTokenRequest.FromString, + response_serializer=flwr_dot_proto_dot_clientappio__pb2.GetTokenResponse.SerializeToString, + ), } generic_handler = grpc.method_handlers_generic_handler( 'flwr.proto.ClientAppIo', rpc_method_handlers) @@ -99,3 +116,20 @@ def PushClientAppOutputs(request, flwr_dot_proto_dot_clientappio__pb2.PushClientAppOutputsResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def GetToken(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/flwr.proto.ClientAppIo/GetToken', + flwr_dot_proto_dot_clientappio__pb2.GetTokenRequest.SerializeToString, + flwr_dot_proto_dot_clientappio__pb2.GetTokenResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/src/py/flwr/proto/clientappio_pb2_grpc.pyi b/src/py/flwr/proto/clientappio_pb2_grpc.pyi index 4503e11f17a..9e34759a7b2 100644 --- a/src/py/flwr/proto/clientappio_pb2_grpc.pyi +++ b/src/py/flwr/proto/clientappio_pb2_grpc.pyi @@ -18,6 +18,11 @@ class ClientAppIoStub: flwr.proto.clientappio_pb2.PushClientAppOutputsResponse] """Send updated Message and Context""" + GetToken: grpc.UnaryUnaryMultiCallable[ + flwr.proto.clientappio_pb2.GetTokenRequest, + flwr.proto.clientappio_pb2.GetTokenResponse] + """Get token""" + class ClientAppIoServicer(metaclass=abc.ABCMeta): @abc.abstractmethod @@ -36,5 +41,13 @@ class ClientAppIoServicer(metaclass=abc.ABCMeta): """Send updated Message and Context""" pass + @abc.abstractmethod + def GetToken(self, + request: flwr.proto.clientappio_pb2.GetTokenRequest, + context: grpc.ServicerContext, + ) -> flwr.proto.clientappio_pb2.GetTokenResponse: + """Get token""" + pass + def add_ClientAppIoServicer_to_server(servicer: ClientAppIoServicer, server: grpc.Server) -> None: ... From f9474f4ba29cbc5f3d6672cc535b7a78e71a61e4 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Sat, 17 Aug 2024 08:40:59 +0100 Subject: [PATCH 07/24] set_token() method --- src/py/flwr/client/clientapp/clientappio_servicer.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/py/flwr/client/clientapp/clientappio_servicer.py b/src/py/flwr/client/clientapp/clientappio_servicer.py index 33601403a3c..b8de9c246fe 100644 --- a/src/py/flwr/client/clientapp/clientappio_servicer.py +++ b/src/py/flwr/client/clientapp/clientappio_servicer.py @@ -138,6 +138,11 @@ def GetToken( self.token = None return res + def set_token(self, token: int) -> None: + """Set the token.""" + log(DEBUG, "ClientAppIo.set_token") + self.token = token + def set_inputs(self, clientapp_input: ClientAppIoInputs) -> None: """Set ClientApp inputs.""" log(DEBUG, "ClientAppIo.SetInputs") From 08e27087528310f8d4da1ddc2549b6b787c2290b Mon Sep 17 00:00:00 2001 From: jafermarq Date: Sat, 17 Aug 2024 08:43:39 +0100 Subject: [PATCH 08/24] using `get_token` functionality --- src/py/flwr/client/app.py | 5 +++-- src/py/flwr/client/clientapp/app.py | 13 ++++++++++--- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 8b7cab79ef5..ab144127ab6 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -458,6 +458,9 @@ def _on_backoff(retry_state: RetryState) -> None: # Generate SuperNode token token: int = generate_rand_int_from_bytes(RUN_ID_NUM_BYTES) + # Set token + clientappio_servicer.set_token(token) + # Share Message and Context with servicer clientappio_servicer.set_inputs( ClientAppIoInputs( @@ -473,8 +476,6 @@ def _on_backoff(retry_state: RetryState) -> None: "flwr-clientapp", "--supernode", supernode_address, - "--token", - str(token), ] subprocess.run( command, diff --git a/src/py/flwr/client/clientapp/app.py b/src/py/flwr/client/clientapp/app.py index 03a3a84af03..c7047b57931 100644 --- a/src/py/flwr/client/clientapp/app.py +++ b/src/py/flwr/client/clientapp/app.py @@ -62,6 +62,7 @@ def flwr_clientapp() -> None: ) parser.add_argument( "--token", + required=False, help="Unique token generated by SuperNode for each ClientApp execution", ) args = parser.parse_args() @@ -72,7 +73,7 @@ def flwr_clientapp() -> None: args.supernode, args.token, ) - run_clientapp(supernode=args.supernode, token=int(args.token)) + run_clientapp(supernode=args.supernode, token=args.token) def on_channel_state_change(channel_connectivity: str) -> None: @@ -82,7 +83,7 @@ def on_channel_state_change(channel_connectivity: str) -> None: def run_clientapp( # pylint: disable=R0914 supernode: str, - token: int, + token: Optional[int] = None, ) -> None: """Run Flower ClientApp process. @@ -90,7 +91,7 @@ def run_clientapp( # pylint: disable=R0914 ---------- supernode : str Address of SuperNode - token : int + token : Optional[int] (default: None) Unique SuperNode token for ClientApp-SuperNode authentication """ channel = create_channel( @@ -98,10 +99,15 @@ def run_clientapp( # pylint: disable=R0914 insecure=True, ) channel.subscribe(on_channel_state_change) + print(f"{token = }") try: stub = ClientAppIoStub(channel) + while token is None: + token = get_token(stub) + print(f"{token = }") + # Pull Message, Context, and Run from SuperNode message, context, run = pull_message(stub=stub, token=token) @@ -149,6 +155,7 @@ def run_clientapp( # pylint: disable=R0914 def get_token(stub: grpc.Channel) -> Optional[int]: """Get a token from SuperNode.""" + log(DEBUG, "Flower ClientApp requests token.") res: GetTokenResponse = stub.GetToken(GetTokenRequest()) return res.token From 4fbf3ffe4da3b54c26c8d9f96d54098dba383144 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Sat, 17 Aug 2024 08:45:09 +0100 Subject: [PATCH 09/24] tweakw --- src/py/flwr/client/clientapp/app.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/py/flwr/client/clientapp/app.py b/src/py/flwr/client/clientapp/app.py index c7047b57931..ef9ab1cfe73 100644 --- a/src/py/flwr/client/clientapp/app.py +++ b/src/py/flwr/client/clientapp/app.py @@ -99,14 +99,13 @@ def run_clientapp( # pylint: disable=R0914 insecure=True, ) channel.subscribe(on_channel_state_change) - print(f"{token = }") try: stub = ClientAppIoStub(channel) + # If token is not set, loop until token is received from SuperNode while token is None: token = get_token(stub) - print(f"{token = }") # Pull Message, Context, and Run from SuperNode message, context, run = pull_message(stub=stub, token=token) From 8ac40bd1013c7f487c0677d62c357088aefb6d75 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Sat, 17 Aug 2024 10:28:19 +0100 Subject: [PATCH 10/24] init --- src/proto/flwr/proto/clientappio.proto | 5 +++ src/py/flwr/client/clientapp/app.py | 8 +++++ .../client/clientapp/clientappio_servicer.py | 9 +++++ .../clientapp/clientappio_servicer_test.py | 18 +++++++++- src/py/flwr/proto/clientappio_pb2.py | 12 ++++--- src/py/flwr/proto/clientappio_pb2.pyi | 11 ++++++ src/py/flwr/proto/clientappio_pb2_grpc.py | 35 +++++++++++++++++++ src/py/flwr/proto/clientappio_pb2_grpc.pyi | 14 ++++++++ 8 files changed, 106 insertions(+), 6 deletions(-) diff --git a/src/proto/flwr/proto/clientappio.proto b/src/proto/flwr/proto/clientappio.proto index a001a81b60c..22afd6ebe85 100644 --- a/src/proto/flwr/proto/clientappio.proto +++ b/src/proto/flwr/proto/clientappio.proto @@ -17,6 +17,9 @@ service ClientAppIo { // Get token rpc GetToken(GetTokenRequest) returns (GetTokenResponse) {} + + // Get the FAB + rpc GetFab(GetFabRequestWithToken) returns (GetFabResponse) {} } enum ClientAppOutputCode { @@ -44,3 +47,5 @@ message PushClientAppOutputsResponse { ClientAppOutputStatus status = 1; } message GetTokenRequest {} message GetTokenResponse { sint64 token = 1; } + +message GetFabRequestWithToken { sint64 token = 1; } diff --git a/src/py/flwr/client/clientapp/app.py b/src/py/flwr/client/clientapp/app.py index ef9ab1cfe73..eb22b2994c6 100644 --- a/src/py/flwr/client/clientapp/app.py +++ b/src/py/flwr/client/clientapp/app.py @@ -45,6 +45,7 @@ PushClientAppOutputsResponse, ) from flwr.proto.clientappio_pb2_grpc import ClientAppIoStub +from flwr.proto.fab_pb2 import Fab, GetFabResponse from .utils import get_load_client_app_fn @@ -159,6 +160,13 @@ def get_token(stub: grpc.Channel) -> Optional[int]: return res.token +def get_fab(stub: grpc.Channel, token: int) -> Fab: + """Get FAB from SuperNode passing the token.""" + log(DEBUG, "Flower ClientApp requests FAB using token.") + res: GetFabResponse = stub.GetFab(token) + return res.fab + + def pull_message(stub: grpc.Channel, token: int) -> Tuple[Message, Context, Run]: """Pull message from SuperNode to ClientApp.""" res: PullClientAppInputsResponse = stub.PullClientAppInputs( diff --git a/src/py/flwr/client/clientapp/clientappio_servicer.py b/src/py/flwr/client/clientapp/clientappio_servicer.py index b8de9c246fe..673411376b0 100644 --- a/src/py/flwr/client/clientapp/clientappio_servicer.py +++ b/src/py/flwr/client/clientapp/clientappio_servicer.py @@ -36,6 +36,7 @@ # pylint: disable=E0611 from flwr.proto import clientappio_pb2_grpc from flwr.proto.clientappio_pb2 import ( # pylint: disable=E0401 + GetFabRequestWithToken, GetTokenRequest, GetTokenResponse, PullClientAppInputsRequest, @@ -43,6 +44,7 @@ PushClientAppOutputsRequest, PushClientAppOutputsResponse, ) +from flwr.proto.fab_pb2 import GetFabResponse @dataclass @@ -138,6 +140,13 @@ def GetToken( self.token = None return res + def GetFab( + self, request: GetFabRequestWithToken, context: grpc.ServicerContext + ) -> GetFabResponse: + """Get Fab.""" + log(DEBUG, "ClientAppIo.GetFab") + return GetFabResponse() + def set_token(self, token: int) -> None: """Set the token.""" log(DEBUG, "ClientAppIo.set_token") diff --git a/src/py/flwr/client/clientapp/clientappio_servicer_test.py b/src/py/flwr/client/clientapp/clientappio_servicer_test.py index 94db8cf5aeb..8211320e1a3 100644 --- a/src/py/flwr/client/clientapp/clientappio_servicer_test.py +++ b/src/py/flwr/client/clientapp/clientappio_servicer_test.py @@ -17,7 +17,7 @@ import unittest from unittest.mock import Mock, patch -from flwr.client.clientapp.app import get_token, pull_message, push_message +from flwr.client.clientapp.app import get_fab, get_token, pull_message, push_message from flwr.common import Context, Message, typing from flwr.common.constant import RUN_ID_NUM_BYTES from flwr.common.serde import ( @@ -33,6 +33,7 @@ PullClientAppInputsResponse, PushClientAppOutputsResponse, ) +from flwr.proto.fab_pb2 import Fab, GetFabResponse from flwr.proto.message_pb2 import Context as ProtoContext from flwr.proto.run_pb2 import Run as ProtoRun from flwr.server.superlink.state.utils import generate_rand_int_from_bytes @@ -211,3 +212,18 @@ def test_get_token(self) -> None: # Assert self.mock_stub.GetToken.assert_called_once() self.assertEqual(res, token) + + def test_get_fab(self) -> None: + """Test fetching of FAB from SuperNode.""" + # Prepare + token: int = generate_rand_int_from_bytes(RUN_ID_NUM_BYTES) + fab = Fab(hash_str="1234", content=b"\xf3\xf5\xf8\x98") + mock_response = GetFabResponse(fab=fab) + self.mock_stub.GetFab.return_value = mock_response + + # Execute + fab_returned = get_fab(stub=self.mock_stub, token=token) + + # Assert + self.mock_stub.GetFab.assert_called_once() + self.assertEqual(fab_returned, fab) diff --git a/src/py/flwr/proto/clientappio_pb2.py b/src/py/flwr/proto/clientappio_pb2.py index 7679bcb3618..daf6e7f49fe 100644 --- a/src/py/flwr/proto/clientappio_pb2.py +++ b/src/py/flwr/proto/clientappio_pb2.py @@ -17,15 +17,15 @@ from flwr.proto import message_pb2 as flwr_dot_proto_dot_message__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1c\x66lwr/proto/clientappio.proto\x12\nflwr.proto\x1a\x14\x66lwr/proto/fab.proto\x1a\x14\x66lwr/proto/run.proto\x1a\x18\x66lwr/proto/message.proto\"W\n\x15\x43lientAppOutputStatus\x12-\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x1f.flwr.proto.ClientAppOutputCode\x12\x0f\n\x07message\x18\x02 \x01(\t\"+\n\x1aPullClientAppInputsRequest\x12\r\n\x05token\x18\x01 \x01(\x12\"\x87\x01\n\x1bPullClientAppInputsResponse\x12$\n\x07message\x18\x01 \x01(\x0b\x32\x13.flwr.proto.Message\x12$\n\x07\x63ontext\x18\x02 \x01(\x0b\x32\x13.flwr.proto.Context\x12\x1c\n\x03run\x18\x03 \x01(\x0b\x32\x0f.flwr.proto.Run\"x\n\x1bPushClientAppOutputsRequest\x12\r\n\x05token\x18\x01 \x01(\x12\x12$\n\x07message\x18\x02 \x01(\x0b\x32\x13.flwr.proto.Message\x12$\n\x07\x63ontext\x18\x03 \x01(\x0b\x32\x13.flwr.proto.Context\"Q\n\x1cPushClientAppOutputsResponse\x12\x31\n\x06status\x18\x01 \x01(\x0b\x32!.flwr.proto.ClientAppOutputStatus\"\x11\n\x0fGetTokenRequest\"!\n\x10GetTokenResponse\x12\r\n\x05token\x18\x01 \x01(\x12*L\n\x13\x43lientAppOutputCode\x12\x0b\n\x07SUCCESS\x10\x00\x12\x15\n\x11\x44\x45\x41\x44LINE_EXCEEDED\x10\x01\x12\x11\n\rUNKNOWN_ERROR\x10\x02\x32\xad\x02\n\x0b\x43lientAppIo\x12h\n\x13PullClientAppInputs\x12&.flwr.proto.PullClientAppInputsRequest\x1a\'.flwr.proto.PullClientAppInputsResponse\"\x00\x12k\n\x14PushClientAppOutputs\x12\'.flwr.proto.PushClientAppOutputsRequest\x1a(.flwr.proto.PushClientAppOutputsResponse\"\x00\x12G\n\x08GetToken\x12\x1b.flwr.proto.GetTokenRequest\x1a\x1c.flwr.proto.GetTokenResponse\"\x00\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1c\x66lwr/proto/clientappio.proto\x12\nflwr.proto\x1a\x14\x66lwr/proto/fab.proto\x1a\x14\x66lwr/proto/run.proto\x1a\x18\x66lwr/proto/message.proto\"W\n\x15\x43lientAppOutputStatus\x12-\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x1f.flwr.proto.ClientAppOutputCode\x12\x0f\n\x07message\x18\x02 \x01(\t\"+\n\x1aPullClientAppInputsRequest\x12\r\n\x05token\x18\x01 \x01(\x12\"\x87\x01\n\x1bPullClientAppInputsResponse\x12$\n\x07message\x18\x01 \x01(\x0b\x32\x13.flwr.proto.Message\x12$\n\x07\x63ontext\x18\x02 \x01(\x0b\x32\x13.flwr.proto.Context\x12\x1c\n\x03run\x18\x03 \x01(\x0b\x32\x0f.flwr.proto.Run\"x\n\x1bPushClientAppOutputsRequest\x12\r\n\x05token\x18\x01 \x01(\x12\x12$\n\x07message\x18\x02 \x01(\x0b\x32\x13.flwr.proto.Message\x12$\n\x07\x63ontext\x18\x03 \x01(\x0b\x32\x13.flwr.proto.Context\"Q\n\x1cPushClientAppOutputsResponse\x12\x31\n\x06status\x18\x01 \x01(\x0b\x32!.flwr.proto.ClientAppOutputStatus\"\x11\n\x0fGetTokenRequest\"!\n\x10GetTokenResponse\x12\r\n\x05token\x18\x01 \x01(\x12\"\'\n\x16GetFabRequestWithToken\x12\r\n\x05token\x18\x01 \x01(\x12*L\n\x13\x43lientAppOutputCode\x12\x0b\n\x07SUCCESS\x10\x00\x12\x15\n\x11\x44\x45\x41\x44LINE_EXCEEDED\x10\x01\x12\x11\n\rUNKNOWN_ERROR\x10\x02\x32\xf9\x02\n\x0b\x43lientAppIo\x12h\n\x13PullClientAppInputs\x12&.flwr.proto.PullClientAppInputsRequest\x1a\'.flwr.proto.PullClientAppInputsResponse\"\x00\x12k\n\x14PushClientAppOutputs\x12\'.flwr.proto.PushClientAppOutputsRequest\x1a(.flwr.proto.PushClientAppOutputsResponse\"\x00\x12G\n\x08GetToken\x12\x1b.flwr.proto.GetTokenRequest\x1a\x1c.flwr.proto.GetTokenResponse\"\x00\x12J\n\x06GetFab\x12\".flwr.proto.GetFabRequestWithToken\x1a\x1a.flwr.proto.GetFabResponse\"\x00\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'flwr.proto.clientappio_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None - _globals['_CLIENTAPPOUTPUTCODE']._serialized_start=645 - _globals['_CLIENTAPPOUTPUTCODE']._serialized_end=721 + _globals['_CLIENTAPPOUTPUTCODE']._serialized_start=686 + _globals['_CLIENTAPPOUTPUTCODE']._serialized_end=762 _globals['_CLIENTAPPOUTPUTSTATUS']._serialized_start=114 _globals['_CLIENTAPPOUTPUTSTATUS']._serialized_end=201 _globals['_PULLCLIENTAPPINPUTSREQUEST']._serialized_start=203 @@ -40,6 +40,8 @@ _globals['_GETTOKENREQUEST']._serialized_end=608 _globals['_GETTOKENRESPONSE']._serialized_start=610 _globals['_GETTOKENRESPONSE']._serialized_end=643 - _globals['_CLIENTAPPIO']._serialized_start=724 - _globals['_CLIENTAPPIO']._serialized_end=1025 + _globals['_GETFABREQUESTWITHTOKEN']._serialized_start=645 + _globals['_GETFABREQUESTWITHTOKEN']._serialized_end=684 + _globals['_CLIENTAPPIO']._serialized_start=765 + _globals['_CLIENTAPPIO']._serialized_end=1142 # @@protoc_insertion_point(module_scope) diff --git a/src/py/flwr/proto/clientappio_pb2.pyi b/src/py/flwr/proto/clientappio_pb2.pyi index aacf3c6a232..6a46a102896 100644 --- a/src/py/flwr/proto/clientappio_pb2.pyi +++ b/src/py/flwr/proto/clientappio_pb2.pyi @@ -125,3 +125,14 @@ class GetTokenResponse(google.protobuf.message.Message): ) -> None: ... def ClearField(self, field_name: typing_extensions.Literal["token",b"token"]) -> None: ... global___GetTokenResponse = GetTokenResponse + +class GetFabRequestWithToken(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + TOKEN_FIELD_NUMBER: builtins.int + token: builtins.int + def __init__(self, + *, + token: builtins.int = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["token",b"token"]) -> None: ... +global___GetFabRequestWithToken = GetFabRequestWithToken diff --git a/src/py/flwr/proto/clientappio_pb2_grpc.py b/src/py/flwr/proto/clientappio_pb2_grpc.py index 0780a208e3a..a4a9b9623ab 100644 --- a/src/py/flwr/proto/clientappio_pb2_grpc.py +++ b/src/py/flwr/proto/clientappio_pb2_grpc.py @@ -3,6 +3,7 @@ import grpc from flwr.proto import clientappio_pb2 as flwr_dot_proto_dot_clientappio__pb2 +from flwr.proto import fab_pb2 as flwr_dot_proto_dot_fab__pb2 class ClientAppIoStub(object): @@ -29,6 +30,11 @@ def __init__(self, channel): request_serializer=flwr_dot_proto_dot_clientappio__pb2.GetTokenRequest.SerializeToString, response_deserializer=flwr_dot_proto_dot_clientappio__pb2.GetTokenResponse.FromString, ) + self.GetFab = channel.unary_unary( + '/flwr.proto.ClientAppIo/GetFab', + request_serializer=flwr_dot_proto_dot_clientappio__pb2.GetFabRequestWithToken.SerializeToString, + response_deserializer=flwr_dot_proto_dot_fab__pb2.GetFabResponse.FromString, + ) class ClientAppIoServicer(object): @@ -55,6 +61,13 @@ def GetToken(self, request, context): context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') + def GetFab(self, request, context): + """Get the FAB + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + def add_ClientAppIoServicer_to_server(servicer, server): rpc_method_handlers = { @@ -73,6 +86,11 @@ def add_ClientAppIoServicer_to_server(servicer, server): request_deserializer=flwr_dot_proto_dot_clientappio__pb2.GetTokenRequest.FromString, response_serializer=flwr_dot_proto_dot_clientappio__pb2.GetTokenResponse.SerializeToString, ), + 'GetFab': grpc.unary_unary_rpc_method_handler( + servicer.GetFab, + request_deserializer=flwr_dot_proto_dot_clientappio__pb2.GetFabRequestWithToken.FromString, + response_serializer=flwr_dot_proto_dot_fab__pb2.GetFabResponse.SerializeToString, + ), } generic_handler = grpc.method_handlers_generic_handler( 'flwr.proto.ClientAppIo', rpc_method_handlers) @@ -133,3 +151,20 @@ def GetToken(request, flwr_dot_proto_dot_clientappio__pb2.GetTokenResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def GetFab(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/flwr.proto.ClientAppIo/GetFab', + flwr_dot_proto_dot_clientappio__pb2.GetFabRequestWithToken.SerializeToString, + flwr_dot_proto_dot_fab__pb2.GetFabResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/src/py/flwr/proto/clientappio_pb2_grpc.pyi b/src/py/flwr/proto/clientappio_pb2_grpc.pyi index 9e34759a7b2..6142db4701c 100644 --- a/src/py/flwr/proto/clientappio_pb2_grpc.pyi +++ b/src/py/flwr/proto/clientappio_pb2_grpc.pyi @@ -4,6 +4,7 @@ isort:skip_file """ import abc import flwr.proto.clientappio_pb2 +import flwr.proto.fab_pb2 import grpc class ClientAppIoStub: @@ -23,6 +24,11 @@ class ClientAppIoStub: flwr.proto.clientappio_pb2.GetTokenResponse] """Get token""" + GetFab: grpc.UnaryUnaryMultiCallable[ + flwr.proto.clientappio_pb2.GetFabRequestWithToken, + flwr.proto.fab_pb2.GetFabResponse] + """Get the FAB""" + class ClientAppIoServicer(metaclass=abc.ABCMeta): @abc.abstractmethod @@ -49,5 +55,13 @@ class ClientAppIoServicer(metaclass=abc.ABCMeta): """Get token""" pass + @abc.abstractmethod + def GetFab(self, + request: flwr.proto.clientappio_pb2.GetFabRequestWithToken, + context: grpc.ServicerContext, + ) -> flwr.proto.fab_pb2.GetFabResponse: + """Get the FAB""" + pass + def add_ClientAppIoServicer_to_server(servicer: ClientAppIoServicer, server: grpc.Server) -> None: ... From 003b41523edb35be2ca1c6d65be67ba1f6f3d6ad Mon Sep 17 00:00:00 2001 From: jafermarq Date: Sat, 17 Aug 2024 11:22:26 +0100 Subject: [PATCH 11/24] removed set_tooken --- .../client/clientapp/clientappio_servicer.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/src/py/flwr/client/clientapp/clientappio_servicer.py b/src/py/flwr/client/clientapp/clientappio_servicer.py index b8de9c246fe..2aa381b03c9 100644 --- a/src/py/flwr/client/clientapp/clientappio_servicer.py +++ b/src/py/flwr/client/clientapp/clientappio_servicer.py @@ -70,7 +70,7 @@ class ClientAppIoServicer(clientappio_pb2_grpc.ClientAppIoServicer): def __init__(self) -> None: self.clientapp_input: Optional[ClientAppIoInputs] = None self.clientapp_output: Optional[ClientAppIoOutputs] = None - self.token: Optional[int] = None + self.token_returned: bool = False def PullClientAppInputs( self, request: PullClientAppInputsRequest, context: grpc.ServicerContext @@ -131,18 +131,12 @@ def GetToken( """Get token.""" log(DEBUG, "ClientAppIo.GetToken") res = GetTokenResponse() - if self.token: - # If token is set, use it in response - res.token = self.token - # Resetting token - self.token = None + if self.clientapp_input: + # If ClientAppIoInputs is set, return token + res.token = self.clientapp_input.token + self.token_returned = True return res - def set_token(self, token: int) -> None: - """Set the token.""" - log(DEBUG, "ClientAppIo.set_token") - self.token = token - def set_inputs(self, clientapp_input: ClientAppIoInputs) -> None: """Set ClientApp inputs.""" log(DEBUG, "ClientAppIo.SetInputs") From 4ef17528601d50751ed0983e18cccf6b18f543c8 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Sat, 17 Aug 2024 11:25:44 +0100 Subject: [PATCH 12/24] removed set_token func --- src/py/flwr/client/app.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index ab144127ab6..64e103a3c75 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -458,9 +458,6 @@ def _on_backoff(retry_state: RetryState) -> None: # Generate SuperNode token token: int = generate_rand_int_from_bytes(RUN_ID_NUM_BYTES) - # Set token - clientappio_servicer.set_token(token) - # Share Message and Context with servicer clientappio_servicer.set_inputs( ClientAppIoInputs( From 540657f70109a1abc2e0d0f54e4f29f0dec5a67c Mon Sep 17 00:00:00 2001 From: jafermarq Date: Mon, 19 Aug 2024 10:33:12 +0100 Subject: [PATCH 13/24] w/ previous --- src/proto/flwr/proto/clientappio.proto | 6 ---- .../client/clientapp/clientappio_servicer.py | 4 +++ src/py/flwr/proto/clientappio_pb2.py | 26 +++++++------- src/py/flwr/proto/clientappio_pb2.pyi | 17 ---------- src/py/flwr/proto/clientappio_pb2_grpc.py | 34 ------------------- src/py/flwr/proto/clientappio_pb2_grpc.pyi | 13 ------- 6 files changed, 17 insertions(+), 83 deletions(-) diff --git a/src/proto/flwr/proto/clientappio.proto b/src/proto/flwr/proto/clientappio.proto index addcf87c0e7..83220b2efc6 100644 --- a/src/proto/flwr/proto/clientappio.proto +++ b/src/proto/flwr/proto/clientappio.proto @@ -18,9 +18,6 @@ service ClientAppIo { rpc PushClientAppOutputs(PushClientAppOutputsRequest) returns (PushClientAppOutputsResponse) {} - // Get token - rpc GetToken(GetTokenRequest) returns (GetTokenResponse) {} - // Get the FAB rpc GetFab(GetFabRequestWithToken) returns (GetFabResponse) {} } @@ -52,7 +49,4 @@ message PushClientAppOutputsRequest { } message PushClientAppOutputsResponse { ClientAppOutputStatus status = 1; } -message GetTokenRequest {} -message GetTokenResponse { sint64 token = 1; } - message GetFabRequestWithToken { sint64 token = 1; } diff --git a/src/py/flwr/client/clientapp/clientappio_servicer.py b/src/py/flwr/client/clientapp/clientappio_servicer.py index dee16b5ce04..2188a4c5ebf 100644 --- a/src/py/flwr/client/clientapp/clientappio_servicer.py +++ b/src/py/flwr/client/clientapp/clientappio_servicer.py @@ -164,6 +164,10 @@ def PushClientAppOutputs( code = typing.ClientAppOutputCode.UNKNOWN_ERROR status = typing.ClientAppOutputStatus(code=code, message="Unkonwn error") + # Return status to ClientApp process + proto_status = clientappstatus_to_proto(status=status) + return PushClientAppOutputsResponse(status=proto_status) + def GetToken( self, request: GetTokenRequest, context: grpc.ServicerContext ) -> GetTokenResponse: diff --git a/src/py/flwr/proto/clientappio_pb2.py b/src/py/flwr/proto/clientappio_pb2.py index daf6e7f49fe..c8f3093eed0 100644 --- a/src/py/flwr/proto/clientappio_pb2.py +++ b/src/py/flwr/proto/clientappio_pb2.py @@ -17,7 +17,7 @@ from flwr.proto import message_pb2 as flwr_dot_proto_dot_message__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1c\x66lwr/proto/clientappio.proto\x12\nflwr.proto\x1a\x14\x66lwr/proto/fab.proto\x1a\x14\x66lwr/proto/run.proto\x1a\x18\x66lwr/proto/message.proto\"W\n\x15\x43lientAppOutputStatus\x12-\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x1f.flwr.proto.ClientAppOutputCode\x12\x0f\n\x07message\x18\x02 \x01(\t\"+\n\x1aPullClientAppInputsRequest\x12\r\n\x05token\x18\x01 \x01(\x12\"\x87\x01\n\x1bPullClientAppInputsResponse\x12$\n\x07message\x18\x01 \x01(\x0b\x32\x13.flwr.proto.Message\x12$\n\x07\x63ontext\x18\x02 \x01(\x0b\x32\x13.flwr.proto.Context\x12\x1c\n\x03run\x18\x03 \x01(\x0b\x32\x0f.flwr.proto.Run\"x\n\x1bPushClientAppOutputsRequest\x12\r\n\x05token\x18\x01 \x01(\x12\x12$\n\x07message\x18\x02 \x01(\x0b\x32\x13.flwr.proto.Message\x12$\n\x07\x63ontext\x18\x03 \x01(\x0b\x32\x13.flwr.proto.Context\"Q\n\x1cPushClientAppOutputsResponse\x12\x31\n\x06status\x18\x01 \x01(\x0b\x32!.flwr.proto.ClientAppOutputStatus\"\x11\n\x0fGetTokenRequest\"!\n\x10GetTokenResponse\x12\r\n\x05token\x18\x01 \x01(\x12\"\'\n\x16GetFabRequestWithToken\x12\r\n\x05token\x18\x01 \x01(\x12*L\n\x13\x43lientAppOutputCode\x12\x0b\n\x07SUCCESS\x10\x00\x12\x15\n\x11\x44\x45\x41\x44LINE_EXCEEDED\x10\x01\x12\x11\n\rUNKNOWN_ERROR\x10\x02\x32\xf9\x02\n\x0b\x43lientAppIo\x12h\n\x13PullClientAppInputs\x12&.flwr.proto.PullClientAppInputsRequest\x1a\'.flwr.proto.PullClientAppInputsResponse\"\x00\x12k\n\x14PushClientAppOutputs\x12\'.flwr.proto.PushClientAppOutputsRequest\x1a(.flwr.proto.PushClientAppOutputsResponse\"\x00\x12G\n\x08GetToken\x12\x1b.flwr.proto.GetTokenRequest\x1a\x1c.flwr.proto.GetTokenResponse\"\x00\x12J\n\x06GetFab\x12\".flwr.proto.GetFabRequestWithToken\x1a\x1a.flwr.proto.GetFabResponse\"\x00\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1c\x66lwr/proto/clientappio.proto\x12\nflwr.proto\x1a\x14\x66lwr/proto/fab.proto\x1a\x14\x66lwr/proto/run.proto\x1a\x18\x66lwr/proto/message.proto\"W\n\x15\x43lientAppOutputStatus\x12-\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x1f.flwr.proto.ClientAppOutputCode\x12\x0f\n\x07message\x18\x02 \x01(\t\"\x11\n\x0fGetTokenRequest\"!\n\x10GetTokenResponse\x12\r\n\x05token\x18\x01 \x01(\x12\"+\n\x1aPullClientAppInputsRequest\x12\r\n\x05token\x18\x01 \x01(\x12\"\x87\x01\n\x1bPullClientAppInputsResponse\x12$\n\x07message\x18\x01 \x01(\x0b\x32\x13.flwr.proto.Message\x12$\n\x07\x63ontext\x18\x02 \x01(\x0b\x32\x13.flwr.proto.Context\x12\x1c\n\x03run\x18\x03 \x01(\x0b\x32\x0f.flwr.proto.Run\"x\n\x1bPushClientAppOutputsRequest\x12\r\n\x05token\x18\x01 \x01(\x12\x12$\n\x07message\x18\x02 \x01(\x0b\x32\x13.flwr.proto.Message\x12$\n\x07\x63ontext\x18\x03 \x01(\x0b\x32\x13.flwr.proto.Context\"Q\n\x1cPushClientAppOutputsResponse\x12\x31\n\x06status\x18\x01 \x01(\x0b\x32!.flwr.proto.ClientAppOutputStatus\"\'\n\x16GetFabRequestWithToken\x12\r\n\x05token\x18\x01 \x01(\x12*L\n\x13\x43lientAppOutputCode\x12\x0b\n\x07SUCCESS\x10\x00\x12\x15\n\x11\x44\x45\x41\x44LINE_EXCEEDED\x10\x01\x12\x11\n\rUNKNOWN_ERROR\x10\x02\x32\xf9\x02\n\x0b\x43lientAppIo\x12G\n\x08GetToken\x12\x1b.flwr.proto.GetTokenRequest\x1a\x1c.flwr.proto.GetTokenResponse\"\x00\x12h\n\x13PullClientAppInputs\x12&.flwr.proto.PullClientAppInputsRequest\x1a\'.flwr.proto.PullClientAppInputsResponse\"\x00\x12k\n\x14PushClientAppOutputs\x12\'.flwr.proto.PushClientAppOutputsRequest\x1a(.flwr.proto.PushClientAppOutputsResponse\"\x00\x12J\n\x06GetFab\x12\".flwr.proto.GetFabRequestWithToken\x1a\x1a.flwr.proto.GetFabResponse\"\x00\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -28,18 +28,18 @@ _globals['_CLIENTAPPOUTPUTCODE']._serialized_end=762 _globals['_CLIENTAPPOUTPUTSTATUS']._serialized_start=114 _globals['_CLIENTAPPOUTPUTSTATUS']._serialized_end=201 - _globals['_PULLCLIENTAPPINPUTSREQUEST']._serialized_start=203 - _globals['_PULLCLIENTAPPINPUTSREQUEST']._serialized_end=246 - _globals['_PULLCLIENTAPPINPUTSRESPONSE']._serialized_start=249 - _globals['_PULLCLIENTAPPINPUTSRESPONSE']._serialized_end=384 - _globals['_PUSHCLIENTAPPOUTPUTSREQUEST']._serialized_start=386 - _globals['_PUSHCLIENTAPPOUTPUTSREQUEST']._serialized_end=506 - _globals['_PUSHCLIENTAPPOUTPUTSRESPONSE']._serialized_start=508 - _globals['_PUSHCLIENTAPPOUTPUTSRESPONSE']._serialized_end=589 - _globals['_GETTOKENREQUEST']._serialized_start=591 - _globals['_GETTOKENREQUEST']._serialized_end=608 - _globals['_GETTOKENRESPONSE']._serialized_start=610 - _globals['_GETTOKENRESPONSE']._serialized_end=643 + _globals['_GETTOKENREQUEST']._serialized_start=203 + _globals['_GETTOKENREQUEST']._serialized_end=220 + _globals['_GETTOKENRESPONSE']._serialized_start=222 + _globals['_GETTOKENRESPONSE']._serialized_end=255 + _globals['_PULLCLIENTAPPINPUTSREQUEST']._serialized_start=257 + _globals['_PULLCLIENTAPPINPUTSREQUEST']._serialized_end=300 + _globals['_PULLCLIENTAPPINPUTSRESPONSE']._serialized_start=303 + _globals['_PULLCLIENTAPPINPUTSRESPONSE']._serialized_end=438 + _globals['_PUSHCLIENTAPPOUTPUTSREQUEST']._serialized_start=440 + _globals['_PUSHCLIENTAPPOUTPUTSREQUEST']._serialized_end=560 + _globals['_PUSHCLIENTAPPOUTPUTSRESPONSE']._serialized_start=562 + _globals['_PUSHCLIENTAPPOUTPUTSRESPONSE']._serialized_end=643 _globals['_GETFABREQUESTWITHTOKEN']._serialized_start=645 _globals['_GETFABREQUESTWITHTOKEN']._serialized_end=684 _globals['_CLIENTAPPIO']._serialized_start=765 diff --git a/src/py/flwr/proto/clientappio_pb2.pyi b/src/py/flwr/proto/clientappio_pb2.pyi index fb10c57969f..db06ed24fc4 100644 --- a/src/py/flwr/proto/clientappio_pb2.pyi +++ b/src/py/flwr/proto/clientappio_pb2.pyi @@ -126,23 +126,6 @@ class PushClientAppOutputsResponse(google.protobuf.message.Message): def ClearField(self, field_name: typing_extensions.Literal["status",b"status"]) -> None: ... global___PushClientAppOutputsResponse = PushClientAppOutputsResponse -class GetTokenRequest(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - def __init__(self, - ) -> None: ... -global___GetTokenRequest = GetTokenRequest - -class GetTokenResponse(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - TOKEN_FIELD_NUMBER: builtins.int - token: builtins.int - def __init__(self, - *, - token: builtins.int = ..., - ) -> None: ... - def ClearField(self, field_name: typing_extensions.Literal["token",b"token"]) -> None: ... -global___GetTokenResponse = GetTokenResponse - class GetFabRequestWithToken(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor TOKEN_FIELD_NUMBER: builtins.int diff --git a/src/py/flwr/proto/clientappio_pb2_grpc.py b/src/py/flwr/proto/clientappio_pb2_grpc.py index 87909bba9fd..317321e3546 100644 --- a/src/py/flwr/proto/clientappio_pb2_grpc.py +++ b/src/py/flwr/proto/clientappio_pb2_grpc.py @@ -30,11 +30,6 @@ def __init__(self, channel): request_serializer=flwr_dot_proto_dot_clientappio__pb2.PushClientAppOutputsRequest.SerializeToString, response_deserializer=flwr_dot_proto_dot_clientappio__pb2.PushClientAppOutputsResponse.FromString, ) - self.GetToken = channel.unary_unary( - '/flwr.proto.ClientAppIo/GetToken', - request_serializer=flwr_dot_proto_dot_clientappio__pb2.GetTokenRequest.SerializeToString, - response_deserializer=flwr_dot_proto_dot_clientappio__pb2.GetTokenResponse.FromString, - ) self.GetFab = channel.unary_unary( '/flwr.proto.ClientAppIo/GetFab', request_serializer=flwr_dot_proto_dot_clientappio__pb2.GetFabRequestWithToken.SerializeToString, @@ -66,13 +61,6 @@ def PushClientAppOutputs(self, request, context): context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') - def GetToken(self, request, context): - """Get token - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - def GetFab(self, request, context): """Get the FAB """ @@ -98,11 +86,6 @@ def add_ClientAppIoServicer_to_server(servicer, server): request_deserializer=flwr_dot_proto_dot_clientappio__pb2.PushClientAppOutputsRequest.FromString, response_serializer=flwr_dot_proto_dot_clientappio__pb2.PushClientAppOutputsResponse.SerializeToString, ), - 'GetToken': grpc.unary_unary_rpc_method_handler( - servicer.GetToken, - request_deserializer=flwr_dot_proto_dot_clientappio__pb2.GetTokenRequest.FromString, - response_serializer=flwr_dot_proto_dot_clientappio__pb2.GetTokenResponse.SerializeToString, - ), 'GetFab': grpc.unary_unary_rpc_method_handler( servicer.GetFab, request_deserializer=flwr_dot_proto_dot_clientappio__pb2.GetFabRequestWithToken.FromString, @@ -169,23 +152,6 @@ def PushClientAppOutputs(request, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) - @staticmethod - def GetToken(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/flwr.proto.ClientAppIo/GetToken', - flwr_dot_proto_dot_clientappio__pb2.GetTokenRequest.SerializeToString, - flwr_dot_proto_dot_clientappio__pb2.GetTokenResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) - @staticmethod def GetFab(request, target, diff --git a/src/py/flwr/proto/clientappio_pb2_grpc.pyi b/src/py/flwr/proto/clientappio_pb2_grpc.pyi index f7a6bd22329..9def338d6e6 100644 --- a/src/py/flwr/proto/clientappio_pb2_grpc.pyi +++ b/src/py/flwr/proto/clientappio_pb2_grpc.pyi @@ -24,11 +24,6 @@ class ClientAppIoStub: flwr.proto.clientappio_pb2.PushClientAppOutputsResponse] """Send updated Message and Context""" - GetToken: grpc.UnaryUnaryMultiCallable[ - flwr.proto.clientappio_pb2.GetTokenRequest, - flwr.proto.clientappio_pb2.GetTokenResponse] - """Get token""" - GetFab: grpc.UnaryUnaryMultiCallable[ flwr.proto.clientappio_pb2.GetFabRequestWithToken, flwr.proto.fab_pb2.GetFabResponse] @@ -60,14 +55,6 @@ class ClientAppIoServicer(metaclass=abc.ABCMeta): """Send updated Message and Context""" pass - @abc.abstractmethod - def GetToken(self, - request: flwr.proto.clientappio_pb2.GetTokenRequest, - context: grpc.ServicerContext, - ) -> flwr.proto.clientappio_pb2.GetTokenResponse: - """Get token""" - pass - @abc.abstractmethod def GetFab(self, request: flwr.proto.clientappio_pb2.GetFabRequestWithToken, From 0bec66866f9fc494908b2e059e4b12842caf8b8c Mon Sep 17 00:00:00 2001 From: jafermarq Date: Mon, 19 Aug 2024 10:37:17 +0100 Subject: [PATCH 14/24] fix to merge --- .../client/clientapp/clientappio_servicer.py | 29 +++++++++++++++---- 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/src/py/flwr/client/clientapp/clientappio_servicer.py b/src/py/flwr/client/clientapp/clientappio_servicer.py index 2188a4c5ebf..0e949926454 100644 --- a/src/py/flwr/client/clientapp/clientappio_servicer.py +++ b/src/py/flwr/client/clientapp/clientappio_servicer.py @@ -73,6 +73,7 @@ def __init__(self) -> None: self.clientapp_input: Optional[ClientAppIoInputs] = None self.clientapp_output: Optional[ClientAppIoOutputs] = None self.token_returned: bool = False + self.inputs_returned: bool = False def PullClientAppInputs( self, request: PullClientAppInputsRequest, context: grpc.ServicerContext @@ -173,12 +174,28 @@ def GetToken( ) -> GetTokenResponse: """Get token.""" log(DEBUG, "ClientAppIo.GetToken") - res = GetTokenResponse() - if self.clientapp_input: - # If ClientAppIoInputs is set, return token - res.token = self.clientapp_input.token - self.token_returned = True - return res + + # Fail if no ClientAppIoInputs are available + if self.clientapp_input is None: + context.abort( + grpc.StatusCode.FAILED_PRECONDITION, + "No inputs available.", + ) + clientapp_input = cast(ClientAppIoInputs, self.clientapp_input) + + # Fail if token was already returned in a previous call + if self.token_returned: + context.abort( + grpc.StatusCode.FAILED_PRECONDITION, + "Token already returned. A token can be returned only once.", + ) + + # If + # - ClientAppIoInputs is set, and + # - token hasn't been returned before, + # return token + self.token_returned = True + return GetTokenResponse(token=clientapp_input.token) def GetFab( self, request: GetFabRequestWithToken, context: grpc.ServicerContext From df0760eefdf0ad7c60bfe4c3192705ba2701c63a Mon Sep 17 00:00:00 2001 From: jafermarq Date: Mon, 19 Aug 2024 10:38:45 +0100 Subject: [PATCH 15/24] format --- .../client/clientapp/clientappio_servicer.py | 56 +++++++++---------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/src/py/flwr/client/clientapp/clientappio_servicer.py b/src/py/flwr/client/clientapp/clientappio_servicer.py index 0e949926454..1b43a576ad2 100644 --- a/src/py/flwr/client/clientapp/clientappio_servicer.py +++ b/src/py/flwr/client/clientapp/clientappio_servicer.py @@ -75,6 +75,34 @@ def __init__(self) -> None: self.token_returned: bool = False self.inputs_returned: bool = False + def GetToken( + self, request: GetTokenRequest, context: grpc.ServicerContext + ) -> GetTokenResponse: + """Get token.""" + log(DEBUG, "ClientAppIo.GetToken") + + # Fail if no ClientAppIoInputs are available + if self.clientapp_input is None: + context.abort( + grpc.StatusCode.FAILED_PRECONDITION, + "No inputs available.", + ) + clientapp_input = cast(ClientAppIoInputs, self.clientapp_input) + + # Fail if token was already returned in a previous call + if self.token_returned: + context.abort( + grpc.StatusCode.FAILED_PRECONDITION, + "Token already returned. A token can be returned only once.", + ) + + # If + # - ClientAppIoInputs is set, and + # - token hasn't been returned before, + # return token + self.token_returned = True + return GetTokenResponse(token=clientapp_input.token) + def PullClientAppInputs( self, request: PullClientAppInputsRequest, context: grpc.ServicerContext ) -> PullClientAppInputsResponse: @@ -169,34 +197,6 @@ def PushClientAppOutputs( proto_status = clientappstatus_to_proto(status=status) return PushClientAppOutputsResponse(status=proto_status) - def GetToken( - self, request: GetTokenRequest, context: grpc.ServicerContext - ) -> GetTokenResponse: - """Get token.""" - log(DEBUG, "ClientAppIo.GetToken") - - # Fail if no ClientAppIoInputs are available - if self.clientapp_input is None: - context.abort( - grpc.StatusCode.FAILED_PRECONDITION, - "No inputs available.", - ) - clientapp_input = cast(ClientAppIoInputs, self.clientapp_input) - - # Fail if token was already returned in a previous call - if self.token_returned: - context.abort( - grpc.StatusCode.FAILED_PRECONDITION, - "Token already returned. A token can be returned only once.", - ) - - # If - # - ClientAppIoInputs is set, and - # - token hasn't been returned before, - # return token - self.token_returned = True - return GetTokenResponse(token=clientapp_input.token) - def GetFab( self, request: GetFabRequestWithToken, context: grpc.ServicerContext ) -> GetFabResponse: From eba5e7a7181578346d5bb075a942e7c71cd4edf0 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Mon, 19 Aug 2024 14:32:57 +0100 Subject: [PATCH 16/24] comm fab via clientappinputs dataclass --- src/proto/flwr/proto/clientappio.proto | 6 +--- src/py/flwr/client/app.py | 8 +++++ src/py/flwr/client/clientapp/app.py | 18 ++++------ .../client/clientapp/clientappio_servicer.py | 14 +++----- .../clientapp/clientappio_servicer_test.py | 36 +++++++++---------- src/py/flwr/proto/clientappio_pb2.py | 22 ++++++------ src/py/flwr/proto/clientappio_pb2.pyi | 20 ++++------- src/py/flwr/proto/clientappio_pb2_grpc.py | 35 ------------------ src/py/flwr/proto/clientappio_pb2_grpc.pyi | 14 -------- 9 files changed, 53 insertions(+), 120 deletions(-) diff --git a/src/proto/flwr/proto/clientappio.proto b/src/proto/flwr/proto/clientappio.proto index 83220b2efc6..898cb04c5b5 100644 --- a/src/proto/flwr/proto/clientappio.proto +++ b/src/proto/flwr/proto/clientappio.proto @@ -17,9 +17,6 @@ service ClientAppIo { // Send updated Message and Context rpc PushClientAppOutputs(PushClientAppOutputsRequest) returns (PushClientAppOutputsResponse) {} - - // Get the FAB - rpc GetFab(GetFabRequestWithToken) returns (GetFabResponse) {} } enum ClientAppOutputCode { @@ -40,6 +37,7 @@ message PullClientAppInputsResponse { Message message = 1; Context context = 2; Run run = 3; + Fab fab = 4; } message PushClientAppOutputsRequest { @@ -48,5 +46,3 @@ message PushClientAppOutputsRequest { Context context = 3; } message PushClientAppOutputsResponse { ClientAppOutputStatus status = 1; } - -message GetFabRequestWithToken { sint64 token = 1; } diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 26ecd71211c..62521946a07 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -475,6 +475,13 @@ def _on_backoff(retry_state: RetryState) -> None: # Generate SuperNode token token: int = generate_rand_int_from_bytes(RUN_ID_NUM_BYTES) + # Ensure FAB exists + if fab is None: + raise ValueError( + "SuperNode with `--isolation` can only operate " + "when a FAB has been pulled from the SuperLink." + ) + # Mode 1: SuperNode starts ClientApp as subprocess start_subprocess = isolation == ISOLATION_MODE_SUBPROCESS @@ -484,6 +491,7 @@ def _on_backoff(retry_state: RetryState) -> None: message=message, context=context, run=run, + fab=fab, token=token, ), token_returned=start_subprocess, diff --git a/src/py/flwr/client/clientapp/app.py b/src/py/flwr/client/clientapp/app.py index 5c7b22e4dba..3115c5c5509 100644 --- a/src/py/flwr/client/clientapp/app.py +++ b/src/py/flwr/client/clientapp/app.py @@ -30,11 +30,12 @@ from flwr.common.serde import ( context_from_proto, context_to_proto, + fab_from_proto, message_from_proto, message_to_proto, run_from_proto, ) -from flwr.common.typing import Run +from flwr.common.typing import Fab, Run # pylint: disable=E0611 from flwr.proto.clientappio_pb2 import ( @@ -46,7 +47,6 @@ PushClientAppOutputsResponse, ) from flwr.proto.clientappio_pb2_grpc import ClientAppIoStub -from flwr.proto.fab_pb2 import Fab, GetFabResponse from .utils import get_load_client_app_fn @@ -116,7 +116,7 @@ def run_clientapp( # pylint: disable=R0914 time.sleep(1) # Pull Message, Context, and Run from SuperNode - message, context, run = pull_message(stub=stub, token=token) + message, context, run, _ = pull_message(stub=stub, token=token) load_client_app_fn = get_load_client_app_fn( default_app_ref="", @@ -188,14 +188,7 @@ def get_token(stub: grpc.Channel) -> Optional[int]: return None -def get_fab(stub: grpc.Channel, token: int) -> Fab: - """Get FAB from SuperNode passing the token.""" - log(DEBUG, "Flower ClientApp requests FAB using token.") - res: GetFabResponse = stub.GetFab(token) - return res.fab - - -def pull_message(stub: grpc.Channel, token: int) -> Tuple[Message, Context, Run]: +def pull_message(stub: grpc.Channel, token: int) -> Tuple[Message, Context, Run, Fab]: """Pull message from SuperNode to ClientApp.""" log(INFO, "Pulling ClientAppInputs for token %s", token) try: @@ -205,7 +198,8 @@ def pull_message(stub: grpc.Channel, token: int) -> Tuple[Message, Context, Run] message = message_from_proto(res.message) context = context_from_proto(res.context) run = run_from_proto(res.run) - return message, context, run + fab = fab_from_proto(res.fab) + return message, context, run, fab except grpc.RpcError as e: log(ERROR, "[PullClientAppInputs] gRPC error occurred: %s", str(e)) raise e diff --git a/src/py/flwr/client/clientapp/clientappio_servicer.py b/src/py/flwr/client/clientapp/clientappio_servicer.py index 1bdfce8d36d..c08c9e16099 100644 --- a/src/py/flwr/client/clientapp/clientappio_servicer.py +++ b/src/py/flwr/client/clientapp/clientappio_servicer.py @@ -27,16 +27,16 @@ clientappstatus_to_proto, context_from_proto, context_to_proto, + fab_to_proto, message_from_proto, message_to_proto, run_to_proto, ) -from flwr.common.typing import Run +from flwr.common.typing import Fab, Run # pylint: disable=E0611 from flwr.proto import clientappio_pb2_grpc from flwr.proto.clientappio_pb2 import ( # pylint: disable=E0401 - GetFabRequestWithToken, GetTokenRequest, GetTokenResponse, PullClientAppInputsRequest, @@ -44,7 +44,6 @@ PushClientAppOutputsRequest, PushClientAppOutputsResponse, ) -from flwr.proto.fab_pb2 import GetFabResponse @dataclass @@ -54,6 +53,7 @@ class ClientAppInputs: message: Message context: Context run: Run + fab: Fab token: int @@ -138,6 +138,7 @@ def PullClientAppInputs( message=message_to_proto(clientapp_input.message), context=context_to_proto(clientapp_input.context), run=run_to_proto(clientapp_input.run), + fab=fab_to_proto(clientapp_input.fab), ) def PushClientAppOutputs( @@ -197,13 +198,6 @@ def PushClientAppOutputs( proto_status = clientappstatus_to_proto(status=status) return PushClientAppOutputsResponse(status=proto_status) - def GetFab( - self, request: GetFabRequestWithToken, context: grpc.ServicerContext - ) -> GetFabResponse: - """Get Fab.""" - log(DEBUG, "ClientAppIo.GetFab") - return GetFabResponse() - def set_inputs( self, clientapp_input: ClientAppInputs, token_returned: bool ) -> None: diff --git a/src/py/flwr/client/clientapp/clientappio_servicer_test.py b/src/py/flwr/client/clientapp/clientappio_servicer_test.py index 92f28d04fc0..c3f6edf36fd 100644 --- a/src/py/flwr/client/clientapp/clientappio_servicer_test.py +++ b/src/py/flwr/client/clientapp/clientappio_servicer_test.py @@ -17,12 +17,13 @@ import unittest from unittest.mock import Mock, patch -from flwr.client.clientapp.app import get_fab, get_token, pull_message, push_message +from flwr.client.clientapp.app import get_token, pull_message, push_message from flwr.common import Context, Message, typing from flwr.common.constant import RUN_ID_NUM_BYTES from flwr.common.serde import ( clientappstatus_from_proto, clientappstatus_to_proto, + fab_to_proto, message_to_proto, ) from flwr.common.serde_test import RecordMaker @@ -33,7 +34,6 @@ PullClientAppInputsResponse, PushClientAppOutputsResponse, ) -from flwr.proto.fab_pb2 import Fab, GetFabResponse from flwr.proto.message_pb2 import Context as ProtoContext from flwr.proto.run_pb2 import Run as ProtoRun from flwr.server.superlink.state.utils import generate_rand_int_from_bytes @@ -78,7 +78,13 @@ def test_set_inputs(self) -> None: fab_hash="dolor", override_config=self.maker.user_config(), ) - client_input = ClientAppInputs(message, context, run, 1) + + fab = typing.Fab( + hash_str="abc123#$%", + content=b"\xf3\xf5\xf8\x98", + ) + + client_input = ClientAppInputs(message, context, run, fab, 1) client_output = ClientAppOutputs(message, context) # Execute and assert @@ -145,15 +151,20 @@ def test_pull_clientapp_inputs(self) -> None: metadata=self.maker.metadata(), content=self.maker.recordset(3, 2, 1), ) + mock_fab = typing.Fab( + hash_str="abc123#$%", + content=b"\xf3\xf5\xf8\x98", + ) mock_response = PullClientAppInputsResponse( message=message_to_proto(mock_message), context=ProtoContext(node_id=123), run=ProtoRun(run_id=61016, fab_id="mock/mock", fab_version="v1.0.0"), + fab=fab_to_proto(mock_fab), ) self.mock_stub.PullClientAppInputs.return_value = mock_response # Execute - message, context, run = pull_message(self.mock_stub, token=456) + message, context, run, fab = pull_message(self.mock_stub, token=456) # Assert self.mock_stub.PullClientAppInputs.assert_called_once() @@ -164,6 +175,8 @@ def test_pull_clientapp_inputs(self) -> None: self.assertEqual(run.run_id, 61016) self.assertEqual(run.fab_id, "mock/mock") self.assertEqual(run.fab_version, "v1.0.0") + self.assertEqual(fab.hash_str, mock_fab.hash_str) + self.assertEqual(fab.content, mock_fab.content) def test_push_clientapp_outputs(self) -> None: """Test pushing messages to SuperNode.""" @@ -208,18 +221,3 @@ def test_get_token(self) -> None: # Assert self.mock_stub.GetToken.assert_called_once() self.assertEqual(res, token) - - def test_get_fab(self) -> None: - """Test fetching of FAB from SuperNode.""" - # Prepare - token: int = generate_rand_int_from_bytes(RUN_ID_NUM_BYTES) - fab = Fab(hash_str="1234", content=b"\xf3\xf5\xf8\x98") - mock_response = GetFabResponse(fab=fab) - self.mock_stub.GetFab.return_value = mock_response - - # Execute - fab_returned = get_fab(stub=self.mock_stub, token=token) - - # Assert - self.mock_stub.GetFab.assert_called_once() - self.assertEqual(fab_returned, fab) diff --git a/src/py/flwr/proto/clientappio_pb2.py b/src/py/flwr/proto/clientappio_pb2.py index c8f3093eed0..9fd5302fe6c 100644 --- a/src/py/flwr/proto/clientappio_pb2.py +++ b/src/py/flwr/proto/clientappio_pb2.py @@ -17,15 +17,15 @@ from flwr.proto import message_pb2 as flwr_dot_proto_dot_message__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1c\x66lwr/proto/clientappio.proto\x12\nflwr.proto\x1a\x14\x66lwr/proto/fab.proto\x1a\x14\x66lwr/proto/run.proto\x1a\x18\x66lwr/proto/message.proto\"W\n\x15\x43lientAppOutputStatus\x12-\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x1f.flwr.proto.ClientAppOutputCode\x12\x0f\n\x07message\x18\x02 \x01(\t\"\x11\n\x0fGetTokenRequest\"!\n\x10GetTokenResponse\x12\r\n\x05token\x18\x01 \x01(\x12\"+\n\x1aPullClientAppInputsRequest\x12\r\n\x05token\x18\x01 \x01(\x12\"\x87\x01\n\x1bPullClientAppInputsResponse\x12$\n\x07message\x18\x01 \x01(\x0b\x32\x13.flwr.proto.Message\x12$\n\x07\x63ontext\x18\x02 \x01(\x0b\x32\x13.flwr.proto.Context\x12\x1c\n\x03run\x18\x03 \x01(\x0b\x32\x0f.flwr.proto.Run\"x\n\x1bPushClientAppOutputsRequest\x12\r\n\x05token\x18\x01 \x01(\x12\x12$\n\x07message\x18\x02 \x01(\x0b\x32\x13.flwr.proto.Message\x12$\n\x07\x63ontext\x18\x03 \x01(\x0b\x32\x13.flwr.proto.Context\"Q\n\x1cPushClientAppOutputsResponse\x12\x31\n\x06status\x18\x01 \x01(\x0b\x32!.flwr.proto.ClientAppOutputStatus\"\'\n\x16GetFabRequestWithToken\x12\r\n\x05token\x18\x01 \x01(\x12*L\n\x13\x43lientAppOutputCode\x12\x0b\n\x07SUCCESS\x10\x00\x12\x15\n\x11\x44\x45\x41\x44LINE_EXCEEDED\x10\x01\x12\x11\n\rUNKNOWN_ERROR\x10\x02\x32\xf9\x02\n\x0b\x43lientAppIo\x12G\n\x08GetToken\x12\x1b.flwr.proto.GetTokenRequest\x1a\x1c.flwr.proto.GetTokenResponse\"\x00\x12h\n\x13PullClientAppInputs\x12&.flwr.proto.PullClientAppInputsRequest\x1a\'.flwr.proto.PullClientAppInputsResponse\"\x00\x12k\n\x14PushClientAppOutputs\x12\'.flwr.proto.PushClientAppOutputsRequest\x1a(.flwr.proto.PushClientAppOutputsResponse\"\x00\x12J\n\x06GetFab\x12\".flwr.proto.GetFabRequestWithToken\x1a\x1a.flwr.proto.GetFabResponse\"\x00\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1c\x66lwr/proto/clientappio.proto\x12\nflwr.proto\x1a\x14\x66lwr/proto/fab.proto\x1a\x14\x66lwr/proto/run.proto\x1a\x18\x66lwr/proto/message.proto\"W\n\x15\x43lientAppOutputStatus\x12-\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x1f.flwr.proto.ClientAppOutputCode\x12\x0f\n\x07message\x18\x02 \x01(\t\"\x11\n\x0fGetTokenRequest\"!\n\x10GetTokenResponse\x12\r\n\x05token\x18\x01 \x01(\x12\"+\n\x1aPullClientAppInputsRequest\x12\r\n\x05token\x18\x01 \x01(\x12\"\xa5\x01\n\x1bPullClientAppInputsResponse\x12$\n\x07message\x18\x01 \x01(\x0b\x32\x13.flwr.proto.Message\x12$\n\x07\x63ontext\x18\x02 \x01(\x0b\x32\x13.flwr.proto.Context\x12\x1c\n\x03run\x18\x03 \x01(\x0b\x32\x0f.flwr.proto.Run\x12\x1c\n\x03\x66\x61\x62\x18\x04 \x01(\x0b\x32\x0f.flwr.proto.Fab\"x\n\x1bPushClientAppOutputsRequest\x12\r\n\x05token\x18\x01 \x01(\x12\x12$\n\x07message\x18\x02 \x01(\x0b\x32\x13.flwr.proto.Message\x12$\n\x07\x63ontext\x18\x03 \x01(\x0b\x32\x13.flwr.proto.Context\"Q\n\x1cPushClientAppOutputsResponse\x12\x31\n\x06status\x18\x01 \x01(\x0b\x32!.flwr.proto.ClientAppOutputStatus*L\n\x13\x43lientAppOutputCode\x12\x0b\n\x07SUCCESS\x10\x00\x12\x15\n\x11\x44\x45\x41\x44LINE_EXCEEDED\x10\x01\x12\x11\n\rUNKNOWN_ERROR\x10\x02\x32\xad\x02\n\x0b\x43lientAppIo\x12G\n\x08GetToken\x12\x1b.flwr.proto.GetTokenRequest\x1a\x1c.flwr.proto.GetTokenResponse\"\x00\x12h\n\x13PullClientAppInputs\x12&.flwr.proto.PullClientAppInputsRequest\x1a\'.flwr.proto.PullClientAppInputsResponse\"\x00\x12k\n\x14PushClientAppOutputs\x12\'.flwr.proto.PushClientAppOutputsRequest\x1a(.flwr.proto.PushClientAppOutputsResponse\"\x00\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'flwr.proto.clientappio_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None - _globals['_CLIENTAPPOUTPUTCODE']._serialized_start=686 - _globals['_CLIENTAPPOUTPUTCODE']._serialized_end=762 + _globals['_CLIENTAPPOUTPUTCODE']._serialized_start=675 + _globals['_CLIENTAPPOUTPUTCODE']._serialized_end=751 _globals['_CLIENTAPPOUTPUTSTATUS']._serialized_start=114 _globals['_CLIENTAPPOUTPUTSTATUS']._serialized_end=201 _globals['_GETTOKENREQUEST']._serialized_start=203 @@ -35,13 +35,11 @@ _globals['_PULLCLIENTAPPINPUTSREQUEST']._serialized_start=257 _globals['_PULLCLIENTAPPINPUTSREQUEST']._serialized_end=300 _globals['_PULLCLIENTAPPINPUTSRESPONSE']._serialized_start=303 - _globals['_PULLCLIENTAPPINPUTSRESPONSE']._serialized_end=438 - _globals['_PUSHCLIENTAPPOUTPUTSREQUEST']._serialized_start=440 - _globals['_PUSHCLIENTAPPOUTPUTSREQUEST']._serialized_end=560 - _globals['_PUSHCLIENTAPPOUTPUTSRESPONSE']._serialized_start=562 - _globals['_PUSHCLIENTAPPOUTPUTSRESPONSE']._serialized_end=643 - _globals['_GETFABREQUESTWITHTOKEN']._serialized_start=645 - _globals['_GETFABREQUESTWITHTOKEN']._serialized_end=684 - _globals['_CLIENTAPPIO']._serialized_start=765 - _globals['_CLIENTAPPIO']._serialized_end=1142 + _globals['_PULLCLIENTAPPINPUTSRESPONSE']._serialized_end=468 + _globals['_PUSHCLIENTAPPOUTPUTSREQUEST']._serialized_start=470 + _globals['_PUSHCLIENTAPPOUTPUTSREQUEST']._serialized_end=590 + _globals['_PUSHCLIENTAPPOUTPUTSRESPONSE']._serialized_start=592 + _globals['_PUSHCLIENTAPPOUTPUTSRESPONSE']._serialized_end=673 + _globals['_CLIENTAPPIO']._serialized_start=754 + _globals['_CLIENTAPPIO']._serialized_end=1055 # @@protoc_insertion_point(module_scope) diff --git a/src/py/flwr/proto/clientappio_pb2.pyi b/src/py/flwr/proto/clientappio_pb2.pyi index db06ed24fc4..53d376d5810 100644 --- a/src/py/flwr/proto/clientappio_pb2.pyi +++ b/src/py/flwr/proto/clientappio_pb2.pyi @@ -3,6 +3,7 @@ isort:skip_file """ import builtins +import flwr.proto.fab_pb2 import flwr.proto.message_pb2 import flwr.proto.run_pb2 import google.protobuf.descriptor @@ -77,20 +78,24 @@ class PullClientAppInputsResponse(google.protobuf.message.Message): MESSAGE_FIELD_NUMBER: builtins.int CONTEXT_FIELD_NUMBER: builtins.int RUN_FIELD_NUMBER: builtins.int + FAB_FIELD_NUMBER: builtins.int @property def message(self) -> flwr.proto.message_pb2.Message: ... @property def context(self) -> flwr.proto.message_pb2.Context: ... @property def run(self) -> flwr.proto.run_pb2.Run: ... + @property + def fab(self) -> flwr.proto.fab_pb2.Fab: ... def __init__(self, *, message: typing.Optional[flwr.proto.message_pb2.Message] = ..., context: typing.Optional[flwr.proto.message_pb2.Context] = ..., run: typing.Optional[flwr.proto.run_pb2.Run] = ..., + fab: typing.Optional[flwr.proto.fab_pb2.Fab] = ..., ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["context",b"context","message",b"message","run",b"run"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["context",b"context","message",b"message","run",b"run"]) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["context",b"context","fab",b"fab","message",b"message","run",b"run"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["context",b"context","fab",b"fab","message",b"message","run",b"run"]) -> None: ... global___PullClientAppInputsResponse = PullClientAppInputsResponse class PushClientAppOutputsRequest(google.protobuf.message.Message): @@ -125,14 +130,3 @@ class PushClientAppOutputsResponse(google.protobuf.message.Message): def HasField(self, field_name: typing_extensions.Literal["status",b"status"]) -> builtins.bool: ... def ClearField(self, field_name: typing_extensions.Literal["status",b"status"]) -> None: ... global___PushClientAppOutputsResponse = PushClientAppOutputsResponse - -class GetFabRequestWithToken(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - TOKEN_FIELD_NUMBER: builtins.int - token: builtins.int - def __init__(self, - *, - token: builtins.int = ..., - ) -> None: ... - def ClearField(self, field_name: typing_extensions.Literal["token",b"token"]) -> None: ... -global___GetFabRequestWithToken = GetFabRequestWithToken diff --git a/src/py/flwr/proto/clientappio_pb2_grpc.py b/src/py/flwr/proto/clientappio_pb2_grpc.py index 317321e3546..653d49fc1ea 100644 --- a/src/py/flwr/proto/clientappio_pb2_grpc.py +++ b/src/py/flwr/proto/clientappio_pb2_grpc.py @@ -3,7 +3,6 @@ import grpc from flwr.proto import clientappio_pb2 as flwr_dot_proto_dot_clientappio__pb2 -from flwr.proto import fab_pb2 as flwr_dot_proto_dot_fab__pb2 class ClientAppIoStub(object): @@ -30,11 +29,6 @@ def __init__(self, channel): request_serializer=flwr_dot_proto_dot_clientappio__pb2.PushClientAppOutputsRequest.SerializeToString, response_deserializer=flwr_dot_proto_dot_clientappio__pb2.PushClientAppOutputsResponse.FromString, ) - self.GetFab = channel.unary_unary( - '/flwr.proto.ClientAppIo/GetFab', - request_serializer=flwr_dot_proto_dot_clientappio__pb2.GetFabRequestWithToken.SerializeToString, - response_deserializer=flwr_dot_proto_dot_fab__pb2.GetFabResponse.FromString, - ) class ClientAppIoServicer(object): @@ -61,13 +55,6 @@ def PushClientAppOutputs(self, request, context): context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') - def GetFab(self, request, context): - """Get the FAB - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - def add_ClientAppIoServicer_to_server(servicer, server): rpc_method_handlers = { @@ -86,11 +73,6 @@ def add_ClientAppIoServicer_to_server(servicer, server): request_deserializer=flwr_dot_proto_dot_clientappio__pb2.PushClientAppOutputsRequest.FromString, response_serializer=flwr_dot_proto_dot_clientappio__pb2.PushClientAppOutputsResponse.SerializeToString, ), - 'GetFab': grpc.unary_unary_rpc_method_handler( - servicer.GetFab, - request_deserializer=flwr_dot_proto_dot_clientappio__pb2.GetFabRequestWithToken.FromString, - response_serializer=flwr_dot_proto_dot_fab__pb2.GetFabResponse.SerializeToString, - ), } generic_handler = grpc.method_handlers_generic_handler( 'flwr.proto.ClientAppIo', rpc_method_handlers) @@ -151,20 +133,3 @@ def PushClientAppOutputs(request, flwr_dot_proto_dot_clientappio__pb2.PushClientAppOutputsResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) - - @staticmethod - def GetFab(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/flwr.proto.ClientAppIo/GetFab', - flwr_dot_proto_dot_clientappio__pb2.GetFabRequestWithToken.SerializeToString, - flwr_dot_proto_dot_fab__pb2.GetFabResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/src/py/flwr/proto/clientappio_pb2_grpc.pyi b/src/py/flwr/proto/clientappio_pb2_grpc.pyi index 9def338d6e6..3cddc769f74 100644 --- a/src/py/flwr/proto/clientappio_pb2_grpc.pyi +++ b/src/py/flwr/proto/clientappio_pb2_grpc.pyi @@ -4,7 +4,6 @@ isort:skip_file """ import abc import flwr.proto.clientappio_pb2 -import flwr.proto.fab_pb2 import grpc class ClientAppIoStub: @@ -24,11 +23,6 @@ class ClientAppIoStub: flwr.proto.clientappio_pb2.PushClientAppOutputsResponse] """Send updated Message and Context""" - GetFab: grpc.UnaryUnaryMultiCallable[ - flwr.proto.clientappio_pb2.GetFabRequestWithToken, - flwr.proto.fab_pb2.GetFabResponse] - """Get the FAB""" - class ClientAppIoServicer(metaclass=abc.ABCMeta): @abc.abstractmethod @@ -55,13 +49,5 @@ class ClientAppIoServicer(metaclass=abc.ABCMeta): """Send updated Message and Context""" pass - @abc.abstractmethod - def GetFab(self, - request: flwr.proto.clientappio_pb2.GetFabRequestWithToken, - context: grpc.ServicerContext, - ) -> flwr.proto.fab_pb2.GetFabResponse: - """Get the FAB""" - pass - def add_ClientAppIoServicer_to_server(servicer: ClientAppIoServicer, server: grpc.Server) -> None: ... From a0bef7a00018cdbb0e9d0201abf9336068f7d0f5 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Mon, 19 Aug 2024 14:36:08 +0100 Subject: [PATCH 17/24] format --- src/py/flwr/client/clientapp/clientappio_servicer_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/py/flwr/client/clientapp/clientappio_servicer_test.py b/src/py/flwr/client/clientapp/clientappio_servicer_test.py index c3f6edf36fd..89d429c9280 100644 --- a/src/py/flwr/client/clientapp/clientappio_servicer_test.py +++ b/src/py/flwr/client/clientapp/clientappio_servicer_test.py @@ -78,7 +78,6 @@ def test_set_inputs(self) -> None: fab_hash="dolor", override_config=self.maker.user_config(), ) - fab = typing.Fab( hash_str="abc123#$%", content=b"\xf3\xf5\xf8\x98", From 60fd3443a79c03db5b33cc3918ae22d2881ead08 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Mon, 19 Aug 2024 14:48:54 +0100 Subject: [PATCH 18/24] fab delivery optional --- src/py/flwr/client/app.py | 7 ------- src/py/flwr/client/clientapp/app.py | 6 ++++-- src/py/flwr/client/clientapp/clientappio_servicer.py | 4 ++-- src/py/flwr/client/clientapp/clientappio_servicer_test.py | 5 +++-- 4 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 62521946a07..7b2a9d16b88 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -475,13 +475,6 @@ def _on_backoff(retry_state: RetryState) -> None: # Generate SuperNode token token: int = generate_rand_int_from_bytes(RUN_ID_NUM_BYTES) - # Ensure FAB exists - if fab is None: - raise ValueError( - "SuperNode with `--isolation` can only operate " - "when a FAB has been pulled from the SuperLink." - ) - # Mode 1: SuperNode starts ClientApp as subprocess start_subprocess = isolation == ISOLATION_MODE_SUBPROCESS diff --git a/src/py/flwr/client/clientapp/app.py b/src/py/flwr/client/clientapp/app.py index 3115c5c5509..83e3f75a31b 100644 --- a/src/py/flwr/client/clientapp/app.py +++ b/src/py/flwr/client/clientapp/app.py @@ -188,7 +188,9 @@ def get_token(stub: grpc.Channel) -> Optional[int]: return None -def pull_message(stub: grpc.Channel, token: int) -> Tuple[Message, Context, Run, Fab]: +def pull_message( + stub: grpc.Channel, token: int +) -> Tuple[Message, Context, Run, Optional[Fab]]: """Pull message from SuperNode to ClientApp.""" log(INFO, "Pulling ClientAppInputs for token %s", token) try: @@ -198,7 +200,7 @@ def pull_message(stub: grpc.Channel, token: int) -> Tuple[Message, Context, Run, message = message_from_proto(res.message) context = context_from_proto(res.context) run = run_from_proto(res.run) - fab = fab_from_proto(res.fab) + fab = fab_from_proto(res.fab) if res.fab else None return message, context, run, fab except grpc.RpcError as e: log(ERROR, "[PullClientAppInputs] gRPC error occurred: %s", str(e)) diff --git a/src/py/flwr/client/clientapp/clientappio_servicer.py b/src/py/flwr/client/clientapp/clientappio_servicer.py index c08c9e16099..fe7ccd6e908 100644 --- a/src/py/flwr/client/clientapp/clientappio_servicer.py +++ b/src/py/flwr/client/clientapp/clientappio_servicer.py @@ -53,7 +53,7 @@ class ClientAppInputs: message: Message context: Context run: Run - fab: Fab + fab: Optional[Fab] token: int @@ -138,7 +138,7 @@ def PullClientAppInputs( message=message_to_proto(clientapp_input.message), context=context_to_proto(clientapp_input.context), run=run_to_proto(clientapp_input.run), - fab=fab_to_proto(clientapp_input.fab), + fab=fab_to_proto(clientapp_input.fab) if clientapp_input.fab else None, ) def PushClientAppOutputs( diff --git a/src/py/flwr/client/clientapp/clientappio_servicer_test.py b/src/py/flwr/client/clientapp/clientappio_servicer_test.py index 89d429c9280..a03400c12a8 100644 --- a/src/py/flwr/client/clientapp/clientappio_servicer_test.py +++ b/src/py/flwr/client/clientapp/clientappio_servicer_test.py @@ -174,8 +174,9 @@ def test_pull_clientapp_inputs(self) -> None: self.assertEqual(run.run_id, 61016) self.assertEqual(run.fab_id, "mock/mock") self.assertEqual(run.fab_version, "v1.0.0") - self.assertEqual(fab.hash_str, mock_fab.hash_str) - self.assertEqual(fab.content, mock_fab.content) + if fab: + self.assertEqual(fab.hash_str, mock_fab.hash_str) + self.assertEqual(fab.content, mock_fab.content) def test_push_clientapp_outputs(self) -> None: """Test pushing messages to SuperNode.""" From 68e26f79379b1d10e9f5bed03eacb064ca375d09 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Mon, 19 Aug 2024 16:01:44 +0100 Subject: [PATCH 19/24] init --- src/py/flwr/client/app.py | 4 +++- src/py/flwr/client/clientapp/app.py | 7 ++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 7b2a9d16b88..268319c6c53 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -440,7 +440,9 @@ def _on_backoff(retry_state: RetryState) -> None: run: Run = runs[run_id] if get_fab is not None and run.fab_hash: fab = get_fab(run.fab_hash) - install_from_fab(fab.content, flwr_path, True) + if not isolation: + # If ClientApp runs in the same process, install the FAB + install_from_fab(fab.content, flwr_path, True) fab_id, fab_version = get_fab_metadata(fab.content) else: fab = None diff --git a/src/py/flwr/client/clientapp/app.py b/src/py/flwr/client/clientapp/app.py index 83e3f75a31b..9c062555521 100644 --- a/src/py/flwr/client/clientapp/app.py +++ b/src/py/flwr/client/clientapp/app.py @@ -21,6 +21,7 @@ import grpc +from flwr.cli.install import install_from_fab from flwr.client.client_app import ClientApp, LoadClientAppError from flwr.common import Context, Message from flwr.common.constant import ErrorCode @@ -116,7 +117,11 @@ def run_clientapp( # pylint: disable=R0914 time.sleep(1) # Pull Message, Context, and Run from SuperNode - message, context, run, _ = pull_message(stub=stub, token=token) + message, context, run, fab = pull_message(stub=stub, token=token) + + if fab: + log(DEBUG, "Flower ClientApp starts FAB installation.") + install_from_fab(fab.content, flwr_dir=None, skip_prompt=True) load_client_app_fn = get_load_client_app_fn( default_app_ref="", From 44f34b3bea2d1a10eb2815092ab2a9434446481d Mon Sep 17 00:00:00 2001 From: "Daniel J. Beutel" Date: Mon, 19 Aug 2024 22:26:21 +0200 Subject: [PATCH 20/24] Update src/py/flwr/client/clientapp/app.py --- src/py/flwr/client/clientapp/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/client/clientapp/app.py b/src/py/flwr/client/clientapp/app.py index 8650e958f2a..69d334fead1 100644 --- a/src/py/flwr/client/clientapp/app.py +++ b/src/py/flwr/client/clientapp/app.py @@ -119,7 +119,7 @@ def run_clientapp( # pylint: disable=R0914 # Pull Message, Context, Run and (optional) FAB from SuperNode message, context, run, fab = pull_message(stub=stub, token=token) - # Install FAB, if FAB was provided + # Install FAB, if provided if fab: log(DEBUG, "Flower ClientApp starts FAB installation.") install_from_fab(fab.content, flwr_dir=None, skip_prompt=True) From 0f7bc39b12da0b39d07c46f66e79b385042922a9 Mon Sep 17 00:00:00 2001 From: "Daniel J. Beutel" Date: Mon, 19 Aug 2024 22:28:56 +0200 Subject: [PATCH 21/24] Update src/py/flwr/client/app.py --- src/py/flwr/client/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 268319c6c53..0be61181169 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -440,7 +440,7 @@ def _on_backoff(retry_state: RetryState) -> None: run: Run = runs[run_id] if get_fab is not None and run.fab_hash: fab = get_fab(run.fab_hash) - if not isolation: + if isolation is None or isolation == ISOLATION_MODE_SUBPROCESS: # If ClientApp runs in the same process, install the FAB install_from_fab(fab.content, flwr_path, True) fab_id, fab_version = get_fab_metadata(fab.content) From e8383ce76999e68dfd5a2b51da72cba5b850a064 Mon Sep 17 00:00:00 2001 From: "Daniel J. Beutel" Date: Mon, 19 Aug 2024 22:30:28 +0200 Subject: [PATCH 22/24] Update src/py/flwr/client/app.py --- src/py/flwr/client/app.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 0be61181169..48c2559d8ba 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -441,7 +441,8 @@ def _on_backoff(retry_state: RetryState) -> None: if get_fab is not None and run.fab_hash: fab = get_fab(run.fab_hash) if isolation is None or isolation == ISOLATION_MODE_SUBPROCESS: - # If ClientApp runs in the same process, install the FAB + # If `ClientApp` runs in the same process, or a + # subprocess, install the FAB install_from_fab(fab.content, flwr_path, True) fab_id, fab_version = get_fab_metadata(fab.content) else: From 581ee203a226ff21df635687f189608cfed3bae9 Mon Sep 17 00:00:00 2001 From: "Daniel J. Beutel" Date: Mon, 19 Aug 2024 22:31:28 +0200 Subject: [PATCH 23/24] Update src/py/flwr/client/app.py --- src/py/flwr/client/app.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 48c2559d8ba..7c49173600c 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -441,8 +441,8 @@ def _on_backoff(retry_state: RetryState) -> None: if get_fab is not None and run.fab_hash: fab = get_fab(run.fab_hash) if isolation is None or isolation == ISOLATION_MODE_SUBPROCESS: - # If `ClientApp` runs in the same process, or a - # subprocess, install the FAB + # If `ClientApp` runs in the same process, or a + # subprocess, install the FAB install_from_fab(fab.content, flwr_path, True) fab_id, fab_version = get_fab_metadata(fab.content) else: From 307fc5f8f7a069fd977e04dd19bfe3448784c4ad Mon Sep 17 00:00:00 2001 From: "Daniel J. Beutel" Date: Mon, 19 Aug 2024 22:35:18 +0200 Subject: [PATCH 24/24] Update src/py/flwr/client/app.py --- src/py/flwr/client/app.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 7c49173600c..1aed5d5241d 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -440,9 +440,8 @@ def _on_backoff(retry_state: RetryState) -> None: run: Run = runs[run_id] if get_fab is not None and run.fab_hash: fab = get_fab(run.fab_hash) - if isolation is None or isolation == ISOLATION_MODE_SUBPROCESS: - # If `ClientApp` runs in the same process, or a - # subprocess, install the FAB + if not isolation: + # If `ClientApp` runs in the same process, install the FAB install_from_fab(fab.content, flwr_path, True) fab_id, fab_version = get_fab_metadata(fab.content) else: