From 17d44bae96f3ca31103cf97cc5225a64cbae1b15 Mon Sep 17 00:00:00 2001 From: "Daniel J. Beutel" Date: Sun, 18 Aug 2024 22:45:37 +0200 Subject: [PATCH 1/4] Support different modes --- src/py/flwr/client/app.py | 62 ++++++++++++++++++----------- src/py/flwr/client/supernode/app.py | 26 +++++++++--- 2 files changed, 59 insertions(+), 29 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 65e1e1b4d36..6efb5f2e501 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -62,6 +62,9 @@ ADDRESS_CLIENTAPPIO_API_GRPC_RERE = "0.0.0.0:9094" +ISOLATION_MODE_SUBPROCESS = "subprocess" +ISOLATION_MODE_PROCESS = "process" + def _check_actionable_client( client: Optional[Client], client_fn: Optional[ClientFnExt] @@ -207,7 +210,7 @@ def start_client_internal( max_retries: Optional[int] = None, max_wait_time: Optional[float] = None, flwr_path: Optional[Path] = None, - isolate: Optional[bool] = False, + isolation: Optional[str] = None, supernode_address: Optional[str] = ADDRESS_CLIENTAPPIO_API_GRPC_RERE, ) -> None: """Start a Flower client node which connects to a Flower server. @@ -256,11 +259,13 @@ class `flwr.client.Client` (default: None) If set to None, there is no limit to the total time. flwr_path: Optional[Path] (default: None) The fully resolved path containing installed Flower Apps. - isolate : Optional[bool] (default: False) - Whether to run `ClientApp` in a separate process. By default, this value is - `False`, and the `ClientApp` runs in the same process as the SuperNode. If - `True`, the `ClientApp` runs in an isolated process and communicates using - gRPC at the address `supernode_address`. + isolation : Optional[str] (default: None) + Isolation mode for `ClientApp`. Possible values are `subprocess` and + `process`. Defaults to `None`, which runs the `ClientApp` in the same process + as the SuperNode. If `subprocess`, the `ClientApp` runs in a subprocess started + by the SueprNode and communicates using gRPC at the address + `supernode_address`. If `process`, the `ClientApp` runs in a separate isolated + process and communicates using gRPC at the address `supernode_address`. supernode_address : Optional[str] (default: `ADDRESS_CLIENTAPPIO_API_GRPC_RERE`) The SuperNode gRPC server address. """ @@ -288,9 +293,12 @@ def _load_client_app(_1: str, _2: str) -> ClientApp: load_client_app_fn = _load_client_app - if isolate: + if isolation: if supernode_address is None: - raise ValueError("`supernode_address` required when `isolate` is set") + raise ValueError( + f"`supernode_address` required when `isolation` is " + f"{ISOLATION_MODE_SUBPROCESS} or {ISOLATION_MODE_PROCESS}", + ) _clientappio_grpc_server, clientappio_servicer = run_clientappio_api_grpc( address=supernode_address ) @@ -455,7 +463,14 @@ def _on_backoff(retry_state: RetryState) -> None: # Handle app loading and task message try: - if isolate: + if isolation: + # Two isolation modes: + # 1. `subprocess`: SuperNode is starting the ClientApp + # process as a subprocess. + # 2. `process`: ClientApp process gets started separately + # (via `flwr-clientapp`), for example, in a separate + # Docker container. + # Generate SuperNode token token: int = generate_rand_int_from_bytes(RUN_ID_NUM_BYTES) @@ -470,20 +485,21 @@ def _on_backoff(retry_state: RetryState) -> None: token_returned=True, ) - # Run `ClientApp` in subprocess - command = [ - "flwr-clientapp", - "--supernode", - supernode_address, - "--token", - str(token), - ] - subprocess.run( - command, - stdout=None, - stderr=None, - check=True, - ) + if isolation == ISOLATION_MODE_SUBPROCESS: + # Start ClientApp subprocess + command = [ + "flwr-clientapp", + "--supernode", + supernode_address, + "--token", + str(token), + ] + subprocess.run( + command, + stdout=None, + stderr=None, + check=True, + ) # Wait for output to become available while not clientappio_servicer.has_outputs(): diff --git a/src/py/flwr/client/supernode/app.py b/src/py/flwr/client/supernode/app.py index ef17b9375d7..2e6b942b5d2 100644 --- a/src/py/flwr/client/supernode/app.py +++ b/src/py/flwr/client/supernode/app.py @@ -37,7 +37,11 @@ from flwr.common.exit_handlers import register_exit_handlers from flwr.common.logger import log, warn_deprecated_feature -from ..app import start_client_internal +from ..app import ( + ISOLATION_MODE_PROCESS, + ISOLATION_MODE_SUBPROCESS, + start_client_internal, +) from ..clientapp.utils import get_load_client_app_fn ADDRESS_FLEET_API_GRPC_RERE = "0.0.0.0:9092" @@ -62,6 +66,8 @@ def run_supernode() -> None: ) authentication_keys = _try_setup_client_authentication(args) + log(DEBUG, "Isolation mode: %s", args.isolation) + start_client_internal( server_address=args.superlink, load_client_app_fn=load_fn, @@ -72,7 +78,7 @@ def run_supernode() -> None: max_retries=args.max_retries, max_wait_time=args.max_wait_time, node_config=parse_config_args([args.node_config]), - isolate=args.isolate, + isolation=args.isolation, supernode_address=args.supernode_address, ) @@ -199,10 +205,18 @@ def _parse_args_run_supernode() -> argparse.ArgumentParser: """, ) parser.add_argument( - "--isolate", - action="store_true", - help="Run `ClientApp` in an isolated subprocess. By default, `ClientApp` " - "runs in the same process that executes the SuperNode.", + "--isolation", + default=None, + required=False, + choices=[ + ISOLATION_MODE_SUBPROCESS, + ISOLATION_MODE_PROCESS, + ], + help="Isolation mode when running `ClientApp` (optional, possible values: " + "`subprocess`, `process`). By default, `ClientApp` runs in the same process " + "that executes the SuperNode. Use `subprocess` to configure SuperNode to run " + "`ClientApp` in a subprocess. Use `process` to indicate that a separate " + "independent process gets created outside of SuperNode.", ) parser.add_argument( "--supernode-address", From 0eea8365576604f352c027836981c716bc2b6d16 Mon Sep 17 00:00:00 2001 From: "Daniel J. Beutel" Date: Mon, 19 Aug 2024 11:46:51 +0200 Subject: [PATCH 2/4] Enable long-running flwr-clientapp --- src/py/flwr/client/app.py | 15 +- src/py/flwr/client/clientapp/app.py | 141 +++++++++++------- .../client/clientapp/clientappio_servicer.py | 1 + 3 files changed, 99 insertions(+), 58 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 6efb5f2e501..1365ecb48cf 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -474,6 +474,9 @@ def _on_backoff(retry_state: RetryState) -> None: # Generate SuperNode token token: int = generate_rand_int_from_bytes(RUN_ID_NUM_BYTES) + # Mode 1: SuperNode starts ClientApp as subprocess + start_subprocess = isolation == ISOLATION_MODE_SUBPROCESS + # Share Message and Context with servicer clientappio_servicer.set_inputs( clientapp_input=ClientAppIoInputs( @@ -482,10 +485,10 @@ def _on_backoff(retry_state: RetryState) -> None: run=run, token=token, ), - token_returned=True, + token_returned=start_subprocess, ) - if isolation == ISOLATION_MODE_SUBPROCESS: + if start_subprocess: # Start ClientApp subprocess command = [ "flwr-clientapp", @@ -500,10 +503,10 @@ def _on_backoff(retry_state: RetryState) -> None: stderr=None, check=True, ) - - # Wait for output to become available - while not clientappio_servicer.has_outputs(): - time.sleep(0.1) + else: + # Wait for output to become available + while not clientappio_servicer.has_outputs(): + time.sleep(0.1) outputs = clientappio_servicer.get_outputs() reply_message, context = outputs.message, outputs.context diff --git a/src/py/flwr/client/clientapp/app.py b/src/py/flwr/client/clientapp/app.py index c2365e6ed68..cdb61b14f28 100644 --- a/src/py/flwr/client/clientapp/app.py +++ b/src/py/flwr/client/clientapp/app.py @@ -15,6 +15,7 @@ """Flower ClientApp process.""" import argparse +import time from logging import DEBUG, ERROR, INFO from typing import Optional, Tuple @@ -68,10 +69,11 @@ def flwr_clientapp() -> None: help="Unique token generated by SuperNode for each ClientApp execution", ) args = parser.parse_args() + log( DEBUG, "Staring isolated `ClientApp` connected to SuperNode ClientAppIo at %s " - "with the token %s", + "with token %s", args.supernode, args.token, ) @@ -105,46 +107,62 @@ def run_clientapp( # pylint: disable=R0914 try: stub = ClientAppIoStub(channel) - # If token is not set, loop until token is received from SuperNode - while token is None: - token = get_token(stub) - - # Pull Message, Context, and Run from SuperNode - message, context, run = pull_message(stub=stub, token=token) - - load_client_app_fn = get_load_client_app_fn( - default_app_ref="", - app_path=None, - multi_app=True, - flwr_dir=None, - ) - - try: - # Load ClientApp - client_app: ClientApp = load_client_app_fn(run.fab_id, run.fab_version) - - # Execute ClientApp - reply_message = client_app(message=message, context=context) - except Exception as ex: # pylint: disable=broad-exception-caught - # Don't update/change NodeState - - e_code = ErrorCode.CLIENT_APP_RAISED_EXCEPTION - # Ex fmt: ":<'division by zero'>" - reason = str(type(ex)) + ":<'" + str(ex) + "'>" - exc_entity = "ClientApp" - if isinstance(ex, LoadClientAppError): - reason = "An exception was raised when attempting to load `ClientApp`" - e_code = ErrorCode.LOAD_CLIENT_APP_EXCEPTION - - log(ERROR, "%s raised an exception", exc_entity, exc_info=ex) + only_once = token is not None + while True: + # If token is not set, loop until token is received from SuperNode + while token is None: + token = get_token(stub) + time.sleep(1) + + # Pull Message, Context, and Run from SuperNode + message, context, run = pull_message(stub=stub, token=token) + + load_client_app_fn = get_load_client_app_fn( + default_app_ref="", + app_path=None, + multi_app=True, + flwr_dir=None, + ) - # Create error message - reply_message = message.create_error_reply( - error=Error(code=e_code, reason=reason) + try: + # Load ClientApp + client_app: ClientApp = load_client_app_fn(run.fab_id, run.fab_version) + + # Execute ClientApp + reply_message = client_app(message=message, context=context) + except Exception as ex: # pylint: disable=broad-exception-caught + # Don't update/change NodeState + + e_code = ErrorCode.CLIENT_APP_RAISED_EXCEPTION + # Ex fmt: ":<'division by zero'>" + reason = str(type(ex)) + ":<'" + str(ex) + "'>" + exc_entity = "ClientApp" + if isinstance(ex, LoadClientAppError): + reason = ( + "An exception was raised when attempting to load `ClientApp`" + ) + e_code = ErrorCode.LOAD_CLIENT_APP_EXCEPTION + + log(ERROR, "%s raised an exception", exc_entity, exc_info=ex) + + # Create error message + reply_message = message.create_error_reply( + error=Error(code=e_code, reason=reason) + ) + + # Push Message and Context to SuperNode + _ = push_message( + stub=stub, token=token, message=reply_message, context=context ) - # Push Message and Context to SuperNode - _ = push_message(stub=stub, token=token, message=reply_message, context=context) + # Reset token to `None` to prevent flwr-clientapp from trying to pull the + # same inputs again + token = None + + # Stop the loop if `flwr-clientapp` is expected to process only a single + # message + if only_once: + break except KeyboardInterrupt: log(INFO, "Closing connection") @@ -157,30 +175,49 @@ def run_clientapp( # pylint: disable=R0914 def get_token(stub: grpc.Channel) -> Optional[int]: """Get a token from SuperNode.""" log(DEBUG, "Flower ClientApp process requests token") - res: GetTokenResponse = stub.GetToken(GetTokenRequest()) - return res.token + try: + res: GetTokenResponse = stub.GetToken(GetTokenRequest()) + log(DEBUG, "Received token: %s", res.token) + return res.token + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.FAILED_PRECONDITION: + log(DEBUG, "No token available yet") + else: + log(ERROR, "[GetToken] gRPC error occurred: %s", str(e)) + return None def pull_message(stub: grpc.Channel, token: int) -> Tuple[Message, Context, Run]: """Pull message from SuperNode to ClientApp.""" - res: PullClientAppInputsResponse = stub.PullClientAppInputs( - PullClientAppInputsRequest(token=token) - ) - message = message_from_proto(res.message) - context = context_from_proto(res.context) - run = run_from_proto(res.run) - return message, context, run + log(INFO, "Pulling ClientAppIoInputs for token %s", token) + try: + res: PullClientAppInputsResponse = stub.PullClientAppInputs( + PullClientAppInputsRequest(token=token) + ) + message = message_from_proto(res.message) + context = context_from_proto(res.context) + run = run_from_proto(res.run) + return message, context, run + except grpc.RpcError as e: + log(ERROR, "[PullClientAppInputs] gRPC error occurred: %s", str(e)) + raise e def push_message( stub: grpc.Channel, token: int, message: Message, context: Context ) -> PushClientAppOutputsResponse: """Push message to SuperNode from ClientApp.""" + log(INFO, "Pushing ClientAppIoOutputs for token %s", token) proto_message = message_to_proto(message) proto_context = context_to_proto(context) - res: PushClientAppOutputsResponse = stub.PushClientAppOutputs( - PushClientAppOutputsRequest( - token=token, message=proto_message, context=proto_context + + try: + res: PushClientAppOutputsResponse = stub.PushClientAppOutputs( + PushClientAppOutputsRequest( + token=token, message=proto_message, context=proto_context + ) ) - ) - return res + return res + except grpc.RpcError as e: + log(ERROR, "[PushClientAppOutputs] gRPC error occurred: %s", str(e)) + raise e diff --git a/src/py/flwr/client/clientapp/clientappio_servicer.py b/src/py/flwr/client/clientapp/clientappio_servicer.py index 0dc08959112..1f420c0a46e 100644 --- a/src/py/flwr/client/clientapp/clientappio_servicer.py +++ b/src/py/flwr/client/clientapp/clientappio_servicer.py @@ -218,6 +218,7 @@ def set_inputs( "ClientAppIoInputs and ClientAppIoOutputs must not be set before " "calling `set_inputs`." ) + log(DEBUG, "ClientAppIoInputs set (token: %s)", clientapp_input.token) self.clientapp_input = clientapp_input self.token_returned = token_returned From bd6a33138b2ca480e83319b3010efd38644476a5 Mon Sep 17 00:00:00 2001 From: "Daniel J. Beutel" Date: Mon, 19 Aug 2024 13:13:27 +0200 Subject: [PATCH 3/4] Resolve lint issues --- src/py/flwr/client/app.py | 2 ++ src/py/flwr/client/clientapp/app.py | 6 +++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 1365ecb48cf..8d2367e5c11 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -398,6 +398,7 @@ def _on_backoff(retry_state: RetryState) -> None: ) app_state_tracker.register_signal_handler() + # pylint: disable=too-many-nested-blocks while not app_state_tracker.interrupt: try: # Receive @@ -563,6 +564,7 @@ def _on_backoff(retry_state: RetryState) -> None: except StopIteration: sleep_duration = 0 break + # pylint: enable=too-many-nested-blocks # Unregister node if delete_node is not None and app_state_tracker.is_connected: diff --git a/src/py/flwr/client/clientapp/app.py b/src/py/flwr/client/clientapp/app.py index cdb61b14f28..09e3a0486ed 100644 --- a/src/py/flwr/client/clientapp/app.py +++ b/src/py/flwr/client/clientapp/app.py @@ -177,11 +177,11 @@ def get_token(stub: grpc.Channel) -> Optional[int]: log(DEBUG, "Flower ClientApp process requests token") try: res: GetTokenResponse = stub.GetToken(GetTokenRequest()) - log(DEBUG, "Received token: %s", res.token) + log(DEBUG, "[GetToken] Received token: %s", res.token) return res.token except grpc.RpcError as e: - if e.code() == grpc.StatusCode.FAILED_PRECONDITION: - log(DEBUG, "No token available yet") + if e.code() == grpc.StatusCode.FAILED_PRECONDITION: # pylint: disable=no-member + log(DEBUG, "[GetToken] No token available yet") else: log(ERROR, "[GetToken] gRPC error occurred: %s", str(e)) return None From 6b13e7c02cedd4d5c02bc4cbbf7c9e81d8729717 Mon Sep 17 00:00:00 2001 From: jafermarq Date: Mon, 19 Aug 2024 13:25:10 +0100 Subject: [PATCH 4/4] init --- src/py/flwr/client/app.py | 4 +- src/py/flwr/client/clientapp/app.py | 4 +- .../client/clientapp/clientappio_servicer.py | 38 +++++++++---------- .../clientapp/clientappio_servicer_test.py | 28 ++++++-------- 4 files changed, 35 insertions(+), 39 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 8d2367e5c11..26ecd71211c 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -52,7 +52,7 @@ from flwr.server.superlink.fleet.grpc_bidi.grpc_server import generic_create_grpc_server from flwr.server.superlink.state.utils import generate_rand_int_from_bytes -from .clientapp.clientappio_servicer import ClientAppIoInputs, ClientAppIoServicer +from .clientapp.clientappio_servicer import ClientAppInputs, ClientAppIoServicer from .grpc_adapter_client.connection import grpc_adapter from .grpc_client.connection import grpc_connection from .grpc_rere_client.connection import grpc_request_response @@ -480,7 +480,7 @@ def _on_backoff(retry_state: RetryState) -> None: # Share Message and Context with servicer clientappio_servicer.set_inputs( - clientapp_input=ClientAppIoInputs( + clientapp_input=ClientAppInputs( message=message, context=context, run=run, diff --git a/src/py/flwr/client/clientapp/app.py b/src/py/flwr/client/clientapp/app.py index 09e3a0486ed..f7262d510e4 100644 --- a/src/py/flwr/client/clientapp/app.py +++ b/src/py/flwr/client/clientapp/app.py @@ -189,7 +189,7 @@ def get_token(stub: grpc.Channel) -> Optional[int]: def pull_message(stub: grpc.Channel, token: int) -> Tuple[Message, Context, Run]: """Pull message from SuperNode to ClientApp.""" - log(INFO, "Pulling ClientAppIoInputs for token %s", token) + log(INFO, "Pulling ClientAppInputs for token %s", token) try: res: PullClientAppInputsResponse = stub.PullClientAppInputs( PullClientAppInputsRequest(token=token) @@ -207,7 +207,7 @@ def push_message( stub: grpc.Channel, token: int, message: Message, context: Context ) -> PushClientAppOutputsResponse: """Push message to SuperNode from ClientApp.""" - log(INFO, "Pushing ClientAppIoOutputs for token %s", token) + log(INFO, "Pushing ClientAppOutputs for token %s", token) proto_message = message_to_proto(message) proto_context = context_to_proto(context) diff --git a/src/py/flwr/client/clientapp/clientappio_servicer.py b/src/py/flwr/client/clientapp/clientappio_servicer.py index 1f420c0a46e..020470ee1f3 100644 --- a/src/py/flwr/client/clientapp/clientappio_servicer.py +++ b/src/py/flwr/client/clientapp/clientappio_servicer.py @@ -46,7 +46,7 @@ @dataclass -class ClientAppIoInputs: +class ClientAppInputs: """Specify the inputs to the ClientApp.""" message: Message @@ -56,7 +56,7 @@ class ClientAppIoInputs: @dataclass -class ClientAppIoOutputs: +class ClientAppOutputs: """Specify the outputs from the ClientApp.""" message: Message @@ -68,8 +68,8 @@ class ClientAppIoServicer(clientappio_pb2_grpc.ClientAppIoServicer): """ClientAppIo API servicer.""" def __init__(self) -> None: - self.clientapp_input: Optional[ClientAppIoInputs] = None - self.clientapp_output: Optional[ClientAppIoOutputs] = None + self.clientapp_input: Optional[ClientAppInputs] = None + self.clientapp_output: Optional[ClientAppOutputs] = None self.token_returned: bool = False self.inputs_returned: bool = False @@ -79,13 +79,13 @@ def GetToken( """Get token.""" log(DEBUG, "ClientAppIo.GetToken") - # Fail if no ClientAppIoInputs are available + # Fail if no ClientAppInputs are available if self.clientapp_input is None: context.abort( grpc.StatusCode.FAILED_PRECONDITION, "No inputs available.", ) - clientapp_input = cast(ClientAppIoInputs, self.clientapp_input) + clientapp_input = cast(ClientAppInputs, self.clientapp_input) # Fail if token was already returned in a previous call if self.token_returned: @@ -95,7 +95,7 @@ def GetToken( ) # If - # - ClientAppIoInputs is set, and + # - ClientAppInputs is set, and # - token hasn't been returned before, # return token self.token_returned = True @@ -107,13 +107,13 @@ def PullClientAppInputs( """Pull Message, Context, and Run.""" log(DEBUG, "ClientAppIo.PullClientAppInputs") - # Fail if no ClientAppIoInputs are available + # Fail if no ClientAppInputs are available if self.clientapp_input is None: context.abort( grpc.StatusCode.FAILED_PRECONDITION, "No inputs available.", ) - clientapp_input = cast(ClientAppIoInputs, self.clientapp_input) + clientapp_input = cast(ClientAppInputs, self.clientapp_input) # Fail if token wasn't returned in a previous call if not self.token_returned: @@ -144,13 +144,13 @@ def PushClientAppOutputs( """Push Message and Context.""" log(DEBUG, "ClientAppIo.PushClientAppOutputs") - # Fail if no ClientAppIoInputs are available + # Fail if no ClientAppInputs are available if not self.clientapp_input: context.abort( grpc.StatusCode.FAILED_PRECONDITION, "No inputs available.", ) - clientapp_input = cast(ClientAppIoInputs, self.clientapp_input) + clientapp_input = cast(ClientAppInputs, self.clientapp_input) # Fail if token wasn't returned in a previous call if not self.token_returned: @@ -178,7 +178,7 @@ def PushClientAppOutputs( # Preconditions met try: # Update Message and Context - self.clientapp_output = ClientAppIoOutputs( + self.clientapp_output = ClientAppOutputs( message=message_from_proto(request.message), context=context_from_proto(request.context), ) @@ -196,13 +196,13 @@ def PushClientAppOutputs( return PushClientAppOutputsResponse(status=proto_status) def set_inputs( - self, clientapp_input: ClientAppIoInputs, token_returned: bool + self, clientapp_input: ClientAppInputs, token_returned: bool ) -> None: """Set ClientApp inputs. Parameters ---------- - clientapp_input : ClientAppIoInputs + clientapp_input : ClientAppInputs The inputs to the ClientApp. token_returned : bool A boolean indicating if the token has been returned. @@ -215,10 +215,10 @@ def set_inputs( or self.token_returned ): raise ValueError( - "ClientAppIoInputs and ClientAppIoOutputs must not be set before " + "ClientAppInputs and ClientAppOutputs must not be set before " "calling `set_inputs`." ) - log(DEBUG, "ClientAppIoInputs set (token: %s)", clientapp_input.token) + log(DEBUG, "ClientAppInputs set (token: %s)", clientapp_input.token) self.clientapp_input = clientapp_input self.token_returned = token_returned @@ -226,13 +226,13 @@ def has_outputs(self) -> bool: """Check if ClientAppOutputs are available.""" return self.clientapp_output is not None - def get_outputs(self) -> ClientAppIoOutputs: + def get_outputs(self) -> ClientAppOutputs: """Get ClientApp outputs.""" if self.clientapp_output is None: - raise ValueError("ClientAppIoOutputs not set before calling `get_outputs`.") + raise ValueError("ClientAppOutputs not set before calling `get_outputs`.") # Set outputs to a local variable and clear state - output: ClientAppIoOutputs = self.clientapp_output + output: ClientAppOutputs = self.clientapp_output self.clientapp_input = None self.clientapp_output = None self.token_returned = False diff --git a/src/py/flwr/client/clientapp/clientappio_servicer_test.py b/src/py/flwr/client/clientapp/clientappio_servicer_test.py index 8011ae60519..cdffc698ef3 100644 --- a/src/py/flwr/client/clientapp/clientappio_servicer_test.py +++ b/src/py/flwr/client/clientapp/clientappio_servicer_test.py @@ -37,11 +37,7 @@ from flwr.proto.run_pb2 import Run as ProtoRun from flwr.server.superlink.state.utils import generate_rand_int_from_bytes -from .clientappio_servicer import ( - ClientAppIoInputs, - ClientAppIoOutputs, - ClientAppIoServicer, -) +from .clientappio_servicer import ClientAppInputs, ClientAppIoServicer, ClientAppOutputs class TestClientAppIoServicer(unittest.TestCase): @@ -81,32 +77,32 @@ def test_set_inputs(self) -> None: fab_hash="dolor", override_config=self.maker.user_config(), ) - client_input = ClientAppIoInputs(message, context, run, 1) - client_output = ClientAppIoOutputs(message, context) + client_input = ClientAppInputs(message, context, run, 1) + client_output = ClientAppOutputs(message, context) # Execute and assert - # - when ClientAppIoInputs is not None, ClientAppIoOutputs is None + # - when ClientAppInputs is not None, ClientAppOutputs is None with self.assertRaises(ValueError): self.servicer.clientapp_input = client_input self.servicer.clientapp_output = None self.servicer.set_inputs(client_input, token_returned=True) # Execute and assert - # - when ClientAppIoInputs is None, ClientAppIoOutputs is not None + # - when ClientAppInputs is None, ClientAppOutputs is not None with self.assertRaises(ValueError): self.servicer.clientapp_input = None self.servicer.clientapp_output = client_output self.servicer.set_inputs(client_input, token_returned=True) # Execute and assert - # - when ClientAppIoInputs and ClientAppIoOutputs is not None + # - when ClientAppInputs and ClientAppOutputs is not None with self.assertRaises(ValueError): self.servicer.clientapp_input = client_input self.servicer.clientapp_output = client_output self.servicer.set_inputs(client_input, token_returned=True) # Execute and assert - # - when ClientAppIoInputs is set at .clientapp_input + # - when ClientAppInputs is set at .clientapp_input self.servicer.clientapp_input = None self.servicer.clientapp_output = None self.servicer.set_inputs(client_input, token_returned=True) @@ -125,18 +121,18 @@ def test_get_outputs(self) -> None: state=self.maker.recordset(2, 2, 1), run_config={"runconfig1": 6.1}, ) - client_output = ClientAppIoOutputs(message, context) + client_output = ClientAppOutputs(message, context) - # Execute and assert - when `ClientAppIoOutputs` is None + # Execute and assert - when `ClientAppOutputs` is None self.servicer.clientapp_output = None with self.assertRaises(ValueError): - # `ClientAppIoOutputs` should not be None + # `ClientAppOutputs` should not be None _ = self.servicer.get_outputs() - # Execute and assert - when `ClientAppIoOutputs` is not None + # Execute and assert - when `ClientAppOutputs` is not None self.servicer.clientapp_output = client_output output = self.servicer.get_outputs() - assert isinstance(output, ClientAppIoOutputs) + assert isinstance(output, ClientAppOutputs) assert output == client_output assert self.servicer.clientapp_input is None assert self.servicer.clientapp_output is None