Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Bulyan (#1817) #1891

Merged
merged 47 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
643abc3
Implement Bulyan (#1817)
edogab33 May 23, 2023
a9390ae
Fix tests
adam-narozniak May 23, 2023
21b4829
Remove redundant warning
adam-narozniak May 23, 2023
1ef78a0
Reference the arxiv page not the pdf directly
adam-narozniak May 23, 2023
0cdd39b
Add Bulyan strategy to changelog
adam-narozniak May 23, 2023
23dcf0a
Fix mypy tests
adam-narozniak May 23, 2023
3e5d3f9
Fix mypy tests
adam-narozniak May 23, 2023
4aaddfb
Fix formatting
adam-narozniak May 23, 2023
f41abd7
Fix spacing in changelog
adam-narozniak May 23, 2023
4822748
Merge branch 'main' into bulyan-strategy
danieljanes May 24, 2023
d121e82
Remove redundant test, fix mocking
adam-narozniak May 25, 2023
f185487
Merge remote-tracking branch 'origin/bulyan-strategy' into bulyan-str…
adam-narozniak May 25, 2023
3eb4c0a
Remove redundant tests
adam-narozniak May 25, 2023
db8d167
Fix weights comparison for determining Byzantine-resilient model
adam-narozniak May 26, 2023
fe54b0f
Add length check in _check_weights_equality
adam-narozniak May 26, 2023
0732031
Add tests for _check_weights_equality correctness
adam-narozniak May 26, 2023
5c420d3
Add tests for _find_reference_weights
adam-narozniak May 26, 2023
4eaccd1
Add correct beta closest averaging
adam-narozniak May 26, 2023
38c6306
Add test for beta closest averaging
adam-narozniak May 26, 2023
ce01dc7
Simplify aggregate_bulyan
adam-narozniak May 26, 2023
3f30afa
Add support for different Byzantine–resilient first step aggregations
adam-narozniak Jun 9, 2023
18e5d1c
Add the check for assumptions needed to use the aggregation strategy
adam-narozniak Jun 9, 2023
68712ff
Fix formatting
adam-narozniak Jun 9, 2023
7de9616
Fix formatting and tests
adam-narozniak Jun 9, 2023
71175d8
Merge branch 'main' into bulyan-strategy
adam-narozniak Jun 9, 2023
c9c9f7b
Fix formatting
adam-narozniak Jun 9, 2023
6dc6261
Apply suggestions
adam-narozniak Jun 9, 2023
330058b
Move list constants to aggregate_bulyan
adam-narozniak Jun 9, 2023
aa7e735
Fix mypy tests
adam-narozniak Jun 9, 2023
23680ea
Apply suggestions
adam-narozniak Jun 12, 2023
7067e79
Merge branch 'main' into bulyan-strategy
adam-narozniak Jun 14, 2023
08d021f
Merge branch 'main' into bulyan-strategy
adam-narozniak Aug 1, 2023
2be7e9d
Fix tests in aggregate
adam-narozniak Aug 1, 2023
a1171a0
Fix pylint errors in aggregate
adam-narozniak Aug 2, 2023
bd23813
Fix pylint errors in aggregate
adam-narozniak Aug 2, 2023
27e2417
Merge branch 'main' into bulyan-strategy
adam-narozniak Sep 6, 2023
8f578e1
Merge branch 'main' into bulyan-strategy
adam-narozniak Sep 12, 2023
2f0a75d
Fix the ruff linter errors
adam-narozniak Sep 12, 2023
52b6eb8
Merge remote-tracking branch 'origin/bulyan-strategy' into bulyan-str…
adam-narozniak Sep 12, 2023
5ff58f2
Merge branch 'main' into bulyan-strategy
adam-narozniak Oct 2, 2023
e5a4ad8
Add Bulyan to the docs
adam-narozniak Oct 2, 2023
1df361a
Enable import from strategy
adam-narozniak Oct 2, 2023
6b85484
Merge branch 'main' into bulyan-strategy
jafermarq Nov 14, 2023
cee220b
Merge branch 'main' into bulyan-strategy
jafermarq Nov 16, 2023
e0c30cc
updated copyright note
jafermarq Nov 16, 2023
dfb3d5b
url clickable in docs
jafermarq Nov 16, 2023
2d9fde8
minor fix
jafermarq Nov 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions doc/source/ref-api-flwr.rst
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,16 @@ server.strategy.Krum

.. automethod:: __init__

.. _flwr-server-strategy-Bulyan-apiref:

server.strategy.Bulyan
^^^^^^^^^^^^^^^^^^^^^^

.. autoclass:: flwr.server.strategy.Bulyan
:members:

.. automethod:: __init__


.. _flwr-server-strategy-FedXgbNnAvg-apiref:

Expand Down
4 changes: 4 additions & 0 deletions doc/source/ref-changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@

Flower received many improvements under the hood, too many to list here.

- **Add new** `Bulyan` **strategy** ([#1817](https:/adap/flower/pull/1817), [#1891](https:/adap/flower/pull/1891))

The new `Bulyan` strategy implements Bulyan by [El Mhamdi et al., 2018](https://arxiv.org/abs/1802.07927)

### Incompatible changes

- **Remove support for Python 3.7** ([#2280](https:/adap/flower/pull/2280), [#2299](https:/adap/flower/pull/2299), [#2304](https:/adap/flower/pull/2304), [#2306](https:/adap/flower/pull/2306), [#2355](https:/adap/flower/pull/2355), [#2356](https:/adap/flower/pull/2356))
Expand Down
2 changes: 2 additions & 0 deletions src/py/flwr/server/strategy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Contains the strategy abstraction and different implementations."""


from .bulyan import Bulyan as Bulyan
from .dpfedavg_adaptive import DPFedAvgAdaptive as DPFedAvgAdaptive
from .dpfedavg_fixed import DPFedAvgFixed as DPFedAvgFixed
from .fault_tolerant_fedavg import FaultTolerantFedAvg as FaultTolerantFedAvg
Expand Down Expand Up @@ -48,6 +49,7 @@
"FedMedian",
"FedTrimmedAvg",
"Krum",
"Bulyan",
"DPFedAvgAdaptive",
"DPFedAvgFixed",
"Strategy",
Expand Down
176 changes: 173 additions & 3 deletions src/py/flwr/server/strategy/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
# limitations under the License.
# ==============================================================================
"""Aggregation functions for strategy implementations."""

# mypy: disallow_untyped_calls=False

from functools import reduce
from typing import List, Tuple
from typing import Any, Callable, List, Tuple

import numpy as np

Expand Down Expand Up @@ -56,7 +56,7 @@ def aggregate_median(results: List[Tuple[NDArrays, int]]) -> NDArrays:
def aggregate_krum(
results: List[Tuple[NDArrays, int]], num_malicious: int, to_keep: int
) -> NDArrays:
"""Choose one parameter vector according to the Krum fucntion.
"""Choose one parameter vector according to the Krum function.

If to_keep is not None, then MultiKrum is applied.
"""
Expand Down Expand Up @@ -91,6 +91,89 @@ def aggregate_krum(
return weights[np.argmin(scores)]


# pylint: disable=too-many-locals
def aggregate_bulyan(
results: List[Tuple[NDArrays, int]],
num_malicious: int,
aggregation_rule: Callable, # type: ignore
**aggregation_rule_kwargs: Any,
) -> NDArrays:
"""Perform Bulyan aggregation.

Parameters
----------
results: List[Tuple[NDArrays, int]]
Weights and number of samples for each of the client.
num_malicious: int
The maximum number of malicious clients.
aggregation_rule: Callable
Byzantine resilient aggregation rule used as the first step of the Bulyan
aggregation_rule_kwargs: Any
The arguments to the aggregation rule.

Returns
-------
aggregated_parameters: NDArrays
Aggregated parameters according to the Bulyan strategy.
"""
byzantine_resilient_single_ret_model_aggregation = [aggregate_krum]
# also GeoMed (but not implemented yet)
byzantine_resilient_many_return_models_aggregation = [] # type: ignore
# Brute, Medoid (but not implemented yet)

num_clients = len(results)
if num_clients < 4 * num_malicious + 3:
raise ValueError(
"The Bulyan aggregation requires then number of clients to be greater or "
"equal to the 4 * num_malicious + 3. This is the assumption of this method."
"It is needed to ensure that the method reduces the attacker's leeway to "
"the one proved in the paper."
)
selected_models_set: List[Tuple[NDArrays, int]] = []

theta = len(results) - 2 * num_malicious
beta = theta - 2 * num_malicious

for _ in range(theta):
best_model = aggregation_rule(
results=results, num_malicious=num_malicious, **aggregation_rule_kwargs
)
list_of_weights = [weights for weights, num_samples in results]
# This group gives exact result
if aggregation_rule in byzantine_resilient_single_ret_model_aggregation:
best_idx = _find_reference_weights(best_model, list_of_weights)
# This group requires finding the closest model to the returned one
# (weights distance wise)
elif aggregation_rule in byzantine_resilient_many_return_models_aggregation:
# when different aggregation strategies available
# write a function to find the closest model
raise NotImplementedError(
"aggregate_bulyan currently does not support the aggregation rules that"
" return many models as results. "
"Such aggregation rules are currently not available in Flower."
)
else:
raise ValueError(
"The given aggregation rule is not added as Byzantine resilient. "
"Please choose from Byzantine resilient rules."
)

selected_models_set.append(results[best_idx])

# remove idx from tracker and weights_results
results.pop(best_idx)

# Compute median parameter vector across selected_models_set
median_vect = aggregate_median(selected_models_set)

# Take the averaged beta parameters of the closest distance to the median
# (coordinate-wise)
parameters_aggregated = _aggregate_n_closest_weights(
median_vect, selected_models_set, beta_closest=beta
)
return parameters_aggregated


def weighted_loss_avg(results: List[Tuple[int, float]]) -> float:
"""Aggregate evaluation results obtained from multiple clients."""
num_total_evaluation_examples = sum([num_examples for num_examples, _ in results])
Expand Down Expand Up @@ -168,3 +251,90 @@ def aggregate_trimmed_avg(
]

return trimmed_w


def _check_weights_equality(weights1: NDArrays, weights2: NDArrays) -> bool:
"""Check if weights are the same."""
if len(weights1) != len(weights2):
return False
return all(
np.array_equal(layer_weights1, layer_weights2)
for layer_weights1, layer_weights2 in zip(weights1, weights2)
)


def _find_reference_weights(
reference_weights: NDArrays, list_of_weights: List[NDArrays]
) -> int:
"""Find the reference weights by looping through the `list_of_weights`.

Raise Error if the reference weights is not found.

Parameters
----------
reference_weights: NDArrays
Weights that will be searched for.
list_of_weights: List[NDArrays]
List of weights that will be searched through.

Returns
-------
index: int
The index of `reference_weights` in the `list_of_weights`.

Raises
------
ValueError
If `reference_weights` is not found in `list_of_weights`.
"""
for idx, weights in enumerate(list_of_weights):
if _check_weights_equality(reference_weights, weights):
return idx
raise ValueError("The reference weights not found in list_of_weights.")


def _aggregate_n_closest_weights(
reference_weights: NDArrays, results: List[Tuple[NDArrays, int]], beta_closest: int
) -> NDArrays:
"""Calculate element-wise mean of the `N` closest values.

Note, each i-th coordinate of the result weight is the average of the beta_closest
-ith coordinates to the reference weights


Parameters
----------
reference_weights: NDArrays
The weights from which the distances will be computed
results: List[Tuple[NDArrays, int]]
The weights from models
beta_closest: int
The number of the closest distance weights that will be averaged

Returns
-------
aggregated_weights: NDArrays
Averaged (element-wise) beta weights that have the closest distance to
reference weights
"""
list_of_weights = [weights for weights, num_examples in results]
aggregated_weights = []

for layer_id, layer_weights in enumerate(reference_weights):
other_weights_layer_list = []
for other_w in list_of_weights:
other_weights_layer = other_w[layer_id]
other_weights_layer_list.append(other_weights_layer)
other_weights_layer_np = np.array(other_weights_layer_list)
diff_np = np.abs(layer_weights - other_weights_layer_np)
# Create indices of the smallest differences
# We do not need the exact order but just the beta closest weights
# therefore np.argpartition is used instead of np.argsort
indices = np.argpartition(diff_np, kth=beta_closest - 1, axis=0)
# Take the weights (coordinate-wise) corresponding to the beta of the
# closest distances
beta_closest_weights = np.take_along_axis(
other_weights_layer_np, indices=indices, axis=0
)[:beta_closest]
aggregated_weights.append(np.mean(beta_closest_weights, axis=0))
return aggregated_weights
77 changes: 76 additions & 1 deletion src/py/flwr/server/strategy/aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@

import numpy as np

from .aggregate import aggregate, weighted_loss_avg
from .aggregate import (
_aggregate_n_closest_weights,
_check_weights_equality,
_find_reference_weights,
aggregate,
weighted_loss_avg,
)


def test_aggregate() -> None:
Expand Down Expand Up @@ -64,3 +70,72 @@ def test_weighted_loss_avg_multiple_values() -> None:

# Assert
assert expected == actual


def test_check_weights_equality_true() -> None:
"""Check weights equality - the same weights."""
weights1 = [np.array([1, 2]), np.array([[1, 2], [3, 4]])]
weights2 = [np.array([1, 2]), np.array([[1, 2], [3, 4]])]
results = _check_weights_equality(weights1, weights2)
expected = True
assert expected == results


def test_check_weights_equality_numeric_false() -> None:
"""Check weights equality - different weights, same length."""
weights1 = [np.array([1, 2]), np.array([[1, 2], [3, 4]])]
weights2 = [np.array([2, 2]), np.array([[1, 2], [3, 4]])]
results = _check_weights_equality(weights1, weights2)
expected = False
assert expected == results


def test_check_weights_equality_various_length_false() -> None:
"""Check weights equality - the same first layer weights, different length."""
weights1 = [np.array([1, 2]), np.array([[1, 2], [3, 4]])]
weights2 = [np.array([1, 2])]
results = _check_weights_equality(weights1, weights2)
expected = False
assert expected == results


def test_find_reference_weights() -> None:
"""Check if the finding weights from list of weigths work."""
reference_weights = [np.array([1, 2]), np.array([[1, 2], [3, 4]])]
list_of_weights = [
[np.array([2, 2]), np.array([[1, 2], [3, 4]])],
[np.array([3, 2]), np.array([[1, 2], [3, 4]])],
[np.array([3, 2]), np.array([[1, 2], [10, 4]])],
[np.array([1, 2]), np.array([[1, 2], [3, 4]])],
]

result = _find_reference_weights(reference_weights, list_of_weights)

expected = 3
assert result == expected


def test_aggregate_n_closest_weights_mean() -> None:
"""Check if aggregation of n closest weights to the reference works."""
beta_closest = 2
reference_weights = [np.array([1, 2]), np.array([[1, 2], [3, 4]])]

list_of_weights = [
[np.array([1, 2]), np.array([[1, 2], [3, 4]])],
[np.array([1.1, 2.1]), np.array([[1.1, 2.1], [3.1, 4.1]])],
[np.array([1.2, 2.2]), np.array([[1.2, 2.2], [3.2, 4.2]])],
[np.array([1.3, 2.3]), np.array([[0.9, 2.5], [3.4, 3.8]])],
]
list_of_weights_and_samples = [(weights, 0) for weights in list_of_weights]

beta_closest_weights = _aggregate_n_closest_weights(
reference_weights, list_of_weights_and_samples, beta_closest=beta_closest
)
expected_averaged = [np.array([1.05, 2.05]), np.array([[0.95, 2.05], [3.05, 4.05]])]

assert all(
(
np.array_equal(expected, result)
for expected, result in zip(expected_averaged, beta_closest_weights)
)
)
Loading