Skip to content

fl_server_ai.aggregation

Modules:

Name Description
base
fed_dc
fed_prox
mean
method

Classes:

Name Description
Aggregation

Abstract base class for aggregation strategies.

MeanAggregation

Implements the aggregate method for aggregating models by calculating their mean.

Functions:

Name Description
get_aggregation_class

Get the aggregation class based on the provided model, training or aggregation method.

Attributes

__all__ module-attribute

__all__ = ['Aggregation', 'get_aggregation_class', 'MeanAggregation']

Classes

Aggregation

Bases: ABC


              flowchart TD
              fl_server_ai.aggregation.Aggregation[Aggregation]

              

              click fl_server_ai.aggregation.Aggregation href "" "fl_server_ai.aggregation.Aggregation"
            

Abstract base class for aggregation strategies.

Methods:

Name Description
aggregate

Abstract method for aggregating models.

Source code in fl_server_ai/aggregation/base.py
class Aggregation(ABC):
    """
    Abstract base class for aggregation strategies.
    """
    _logger = getLogger("fl.server")

    @abstractmethod
    def aggregate(
        self,
        models: Sequence[torch.nn.Module],
        model_sample_sizes: Sequence[int],
        *,
        deepcopy: bool = True
    ) -> torch.nn.Module:
        """
        Abstract method for aggregating models.

        Args:
            models (Sequence[torch.nn.Module]): The models to be aggregated.
            model_sample_sizes (Sequence[int]): The sample sizes for each model.
            deepcopy (bool, optional): Whether to create a deep copy of the models. Defaults to True.

        Returns:
            torch.nn.Module: The aggregated model.
        """
        pass

Functions

aggregate abstractmethod
aggregate(models: Sequence[Module], model_sample_sizes: Sequence[int], *, deepcopy: bool = True) -> Module

Abstract method for aggregating models.

Parameters:

Name Type Description Default
models
Sequence[Module]

The models to be aggregated.

required
model_sample_sizes
Sequence[int]

The sample sizes for each model.

required
deepcopy
bool

Whether to create a deep copy of the models. Defaults to True.

True

Returns:

Type Description
Module

torch.nn.Module: The aggregated model.

Source code in fl_server_ai/aggregation/base.py
@abstractmethod
def aggregate(
    self,
    models: Sequence[torch.nn.Module],
    model_sample_sizes: Sequence[int],
    *,
    deepcopy: bool = True
) -> torch.nn.Module:
    """
    Abstract method for aggregating models.

    Args:
        models (Sequence[torch.nn.Module]): The models to be aggregated.
        model_sample_sizes (Sequence[int]): The sample sizes for each model.
        deepcopy (bool, optional): Whether to create a deep copy of the models. Defaults to True.

    Returns:
        torch.nn.Module: The aggregated model.
    """
    pass

MeanAggregation

Bases: Aggregation


              flowchart TD
              fl_server_ai.aggregation.MeanAggregation[MeanAggregation]
              fl_server_ai.aggregation.base.Aggregation[Aggregation]

                              fl_server_ai.aggregation.base.Aggregation --> fl_server_ai.aggregation.MeanAggregation
                


              click fl_server_ai.aggregation.MeanAggregation href "" "fl_server_ai.aggregation.MeanAggregation"
              click fl_server_ai.aggregation.base.Aggregation href "" "fl_server_ai.aggregation.base.Aggregation"
            

Implements the aggregate method for aggregating models by calculating their mean.

Methods:

Name Description
aggregate

Aggregate models by calculating the mean.

Source code in fl_server_ai/aggregation/mean.py
class MeanAggregation(Aggregation):
    """
    Implements the aggregate method for aggregating models by calculating their mean.
    """

    @torch.no_grad()
    def aggregate(
        self,
        models: Sequence[torch.nn.Module],
        model_sample_sizes: Sequence[int],
        *,
        deepcopy: bool = True
    ) -> torch.nn.Module:
        """
        Aggregate models by calculating the mean.

        Args:
            models (Sequence[torch.nn.Module]): The models to be aggregated.
            model_sample_sizes (Sequence[int]): The sample sizes for each model.
            deepcopy (bool, optional): Whether to create a deep copy of the models. Defaults to True.

        Returns:
            torch.nn.Module: The aggregated model.

        Raises:
            AggregationException: If the models do not have the same architecture.
        """
        assert len(models) == len(model_sample_sizes)

        self._logger.debug(f"Doing mean aggregation for {len(models)} models!")
        model_state_dicts = [model.state_dict() for model in models]

        total_dataset_size = model_sample_sizes[0]
        result_dict = model_state_dicts[0]
        for layer_name in result_dict:
            result_dict[layer_name] *= model_sample_sizes[0]

        # sum accumulation
        for model_dict, dataset_size in zip(model_state_dicts[1:], model_sample_sizes[1:]):
            if set(model_dict.keys()) != set(result_dict.keys()):
                raise AggregationException("Models do not have the same architecture!")

            total_dataset_size += dataset_size
            for layer_name in result_dict:
                result_dict[layer_name] += model_dict[layer_name] * dataset_size

        # factor 1/n
        for layer_name in result_dict:
            result_dict[layer_name] = result_dict[layer_name] / total_dataset_size

        # return aggregated model
        result_model = copy.deepcopy(models[0]) if deepcopy else models[0]
        result_model.load_state_dict(result_dict)
        return result_model

Functions

aggregate
aggregate(models: Sequence[Module], model_sample_sizes: Sequence[int], *, deepcopy: bool = True) -> Module

Aggregate models by calculating the mean.

Parameters:

Name Type Description Default
models
Sequence[Module]

The models to be aggregated.

required
model_sample_sizes
Sequence[int]

The sample sizes for each model.

required
deepcopy
bool

Whether to create a deep copy of the models. Defaults to True.

True

Returns:

Type Description
Module

torch.nn.Module: The aggregated model.

Raises:

Type Description
AggregationException

If the models do not have the same architecture.

Source code in fl_server_ai/aggregation/mean.py
@torch.no_grad()
def aggregate(
    self,
    models: Sequence[torch.nn.Module],
    model_sample_sizes: Sequence[int],
    *,
    deepcopy: bool = True
) -> torch.nn.Module:
    """
    Aggregate models by calculating the mean.

    Args:
        models (Sequence[torch.nn.Module]): The models to be aggregated.
        model_sample_sizes (Sequence[int]): The sample sizes for each model.
        deepcopy (bool, optional): Whether to create a deep copy of the models. Defaults to True.

    Returns:
        torch.nn.Module: The aggregated model.

    Raises:
        AggregationException: If the models do not have the same architecture.
    """
    assert len(models) == len(model_sample_sizes)

    self._logger.debug(f"Doing mean aggregation for {len(models)} models!")
    model_state_dicts = [model.state_dict() for model in models]

    total_dataset_size = model_sample_sizes[0]
    result_dict = model_state_dicts[0]
    for layer_name in result_dict:
        result_dict[layer_name] *= model_sample_sizes[0]

    # sum accumulation
    for model_dict, dataset_size in zip(model_state_dicts[1:], model_sample_sizes[1:]):
        if set(model_dict.keys()) != set(result_dict.keys()):
            raise AggregationException("Models do not have the same architecture!")

        total_dataset_size += dataset_size
        for layer_name in result_dict:
            result_dict[layer_name] += model_dict[layer_name] * dataset_size

    # factor 1/n
    for layer_name in result_dict:
        result_dict[layer_name] = result_dict[layer_name] / total_dataset_size

    # return aggregated model
    result_model = copy.deepcopy(models[0]) if deepcopy else models[0]
    result_model.load_state_dict(result_dict)
    return result_model

Functions

get_aggregation_class

get_aggregation_class(value: Model) -> Type[Aggregation]
get_aggregation_class(value: Training) -> Type[Aggregation]
get_aggregation_class(value: AggregationMethod) -> Type[Aggregation]
get_aggregation_class(value: AggregationMethod | Model | Training) -> Type[Aggregation]

Get the aggregation class based on the provided model, training or aggregation method.

Parameters:

Name Type Description Default

value

AggregationMethod | Model | Training

The value based on which the aggregation class is determined.

required

Returns:

Type Description
Type[Aggregation]

Type[Aggregation]: The type of the aggregation class.

Raises:

Type Description
ValueError

If the type of value or the aggregation method is unknown.

Source code in fl_server_ai/aggregation/method.py
def get_aggregation_class(value: AggregationMethod | Model | Training) -> Type[Aggregation]:
    """
    Get the aggregation class based on the provided model, training or aggregation method.

    Args:
        value (AggregationMethod | Model | Training): The value based on which the aggregation class is determined.

    Returns:
        Type[Aggregation]: The type of the aggregation class.

    Raises:
        ValueError: If the type of value or the aggregation method is unknown.
    """
    if isinstance(value, AggregationMethod):
        method = value
    elif isinstance(value, Training):
        method = value.aggregation_method
    elif isinstance(value, Model):
        aggregation_method = Training.objects.filter(model=value) \
                .values("aggregation_method") \
                .first()["aggregation_method"]
        method = aggregation_method
    else:
        raise ValueError(f"Unknown type: {type(value)}")

    match method:
        case AggregationMethod.FED_AVG: return MeanAggregation
        case AggregationMethod.FED_DC: return FedDC
        case AggregationMethod.FED_PROX: return FedProx
        case _: raise ValueError(f"Unknown aggregation method: {method}")