Skip to content

Module fl_server_ai.notification

View Source
# SPDX-FileCopyrightText: 2024 Benedikt Franke <benedikt.franke@dlr.de>
# SPDX-FileCopyrightText: 2024 Florian Heinrich <florian.heinrich@dlr.de>
#
# SPDX-License-Identifier: Apache-2.0

from .notification_type import NotificationType
from .notification import Notification
from .training import TrainingRoundStartNotification, TrainingFinishedNotification


__all__ = [
    "NotificationType",
    "Notification",
    "TrainingRoundStartNotification",
    "TrainingFinishedNotification",
]

Sub-modules

Classes

Notification

class Notification(
    receivers: List[fl_server_core.models.user.NotificationReceiver],
    body: ~TBody
)

Abstract base class for notifications.

View Source
@dataclass
class Notification(Generic[TBody], Serializable, metaclass=ABCMeta):
    """
    Abstract base class for notifications.
    """

    receivers: List[NotificationReceiver]
    """The receivers of the notification."""
    body: TBody
    """The body of the notification."""
    type: NotificationType = field(init=False)
    """The type of the notification."""

    @dataclass
    class Body(Serializable):
        """
        Inner class for the body of the notification.
        """
        pass

    @property
    def callback_success(self) -> Optional[Signature]:
        """
        The callback to be called on success. By default, this is None.

        Returns:
            Optional[Signature]: The callback to be called on success, or None if no such callback is set.
        """
        return None

    @property
    def callback_error(self) -> Optional[Signature]:
        """
        The callback to be called on error. By default, this is None.

        Returns:
            Optional[Signature]: The callback to be called on error, or None if no such callback is set.
        """
        return None

    def send(
        self,
        callback_success: Optional[Signature] = None,
        callback_error: Optional[Signature] = None
    ) -> AsyncResult:
        """
        Send notification to the receivers asynchronously.

        Args:
            callback_success (Optional[Signature], optional): The callback to be called on success. Defaults to None.
            callback_error (Optional[Signature], optional): The callback to be called on error. Defaults to None.

        Returns:
            AsyncResult: The result of the asynchronous operation.
        """
        callback_success = callback_success or self.callback_success
        callback_error = callback_error or self.callback_error
        return send_notifications.s(
            notification=self, callback_success=callback_success, callback_error=callback_error
        ).apply_async(retry=False)

    def serialize(self) -> dict[str, Any]:
        """
        Serialize the notification into a dictionary.

        Returns:
            dict[str, Any]: The serialized notification.
        """
        return {
            "notification_type": self.type.value,
            "body": self.body.serialize()
        }

Ancestors (in MRO)

  • typing.Generic
  • fl_server_ai.notification.serializable.Serializable

Descendants

  • fl_server_ai.notification.training.TrainingNotification

Class variables

Body

Instance variables

callback_error

The callback to be called on error. By default, this is None.

callback_success

The callback to be called on success. By default, this is None.

Methods

send

def send(
    self,
    callback_success: Optional[celery.canvas.Signature] = None,
    callback_error: Optional[celery.canvas.Signature] = None
) -> celery.result.AsyncResult

Send notification to the receivers asynchronously.

Parameters:

Name Type Description Default
callback_success Optional[Signature] The callback to be called on success. Defaults to None. None
callback_error Optional[Signature] The callback to be called on error. Defaults to None. None

Returns:

Type Description
AsyncResult The result of the asynchronous operation.
View Source
    def send(
        self,
        callback_success: Optional[Signature] = None,
        callback_error: Optional[Signature] = None
    ) -> AsyncResult:
        """
        Send notification to the receivers asynchronously.

        Args:
            callback_success (Optional[Signature], optional): The callback to be called on success. Defaults to None.
            callback_error (Optional[Signature], optional): The callback to be called on error. Defaults to None.

        Returns:
            AsyncResult: The result of the asynchronous operation.
        """
        callback_success = callback_success or self.callback_success
        callback_error = callback_error or self.callback_error
        return send_notifications.s(
            notification=self, callback_success=callback_success, callback_error=callback_error
        ).apply_async(retry=False)

serialize

def serialize(
    self
) -> dict[str, typing.Any]

Serialize the notification into a dictionary.

Returns:

Type Description
dict[str, Any] The serialized notification.
View Source
    def serialize(self) -> dict[str, Any]:
        """
        Serialize the notification into a dictionary.

        Returns:
            dict[str, Any]: The serialized notification.
        """
        return {
            "notification_type": self.type.value,
            "body": self.body.serialize()
        }

NotificationType

class NotificationType(
    _: str,
    description: Optional[str] = None
)

Notification types including a short description for each type.

View Source
class NotificationType(Enum):
    """
    Notification types including a short description for each type.
    """

    UPDATE_ROUND_START = "UPDATE_ROUND_START", _("Start update round")
    SWAG_ROUND_START = "SWAG_ROUND_START", _("Start swag round")
    TRAINING_START = "TRAINING_START", _("Training start")
    TRAINING_FINISHED = "TRAINING_FINISHED", _("Training finished")
    CLIENT_REMOVED = "CLIENT_REMOVED", _("Client removed")
    MODEL_TEST_ROUND = "MODEL_TEST_ROUND", _("Start model round")

    def __new__(cls, *args, **kwargs):
        """
        Override the `__new__` method to set the value of the enum member.

        Args:
            *args: Variable length argument list where the first value is the value of the enum member.
            **kwargs: Arbitrary keyword arguments.

        Returns:
            object: New instance of the class.
        """
        obj = object.__new__(cls)
        obj._value_ = args[0]
        return obj

    def __init__(self, _: str, description: Optional[str] = None):
        """
        Initialize the enum member.

        Args:
            _ (str): The value of the enum member. Ignored in this method as it's already set by __new__.
            description (Optional[str], optional): The description of the enum member. Defaults to None.
        """
        # ignore the first param since it's already set by __new__
        self._description_ = description

    @property
    def description(self):
        """
        Property to get the description of the enum member.

        Returns:
            str: The description of the enum member.
        """
        return self._description_

Ancestors (in MRO)

  • enum.Enum

Class variables

CLIENT_REMOVED
MODEL_TEST_ROUND
SWAG_ROUND_START
TRAINING_FINISHED
TRAINING_START
UPDATE_ROUND_START
name
value

TrainingFinishedNotification

class TrainingFinishedNotification(
    receivers: List[fl_server_core.models.user.NotificationReceiver],
    body: ~TBody,
    training_uuid: uuid.UUID
)

Notification that a training has finished.

View Source
class TrainingFinishedNotification(TrainingNotification["TrainingFinishedNotification.Body"]):
    """
    Notification that a training has finished.
    """

    type: NotificationType = NotificationType.TRAINING_FINISHED
    """The type of the notification."""

    @dataclass
    class Body(Serializable):
        """
        Inner class representing the body of the notification.
        """
        global_model_uuid: UUID
        """The UUID of the global model."""

    @classmethod
    def from_training(cls, training: TrainingDB):
        """
        Create a `TrainingFinishedNotification` instance from a training object.

        Args:
            training (TrainingDB): The training object to create the notification from.

        Returns:
            TrainingFinishedNotification: The created notification.
        """
        receivers = list(training.participants.all())
        if not receivers.__contains__(training.actor):
            receivers.append(training.actor)
        return cls(
            receivers=receivers,
            body=cls.Body(
                global_model_uuid=training.model.id
            ),
            training_uuid=training.id
        )

Ancestors (in MRO)

  • fl_server_ai.notification.training.TrainingNotification
  • fl_server_ai.notification.Notification
  • typing.Generic
  • fl_server_ai.notification.serializable.Serializable

Class variables

Body
type

Static methods

from_training

def from_training(
    training: fl_server_core.models.training.Training
)

Create a TrainingFinishedNotification instance from a training object.

Parameters:

Name Type Description Default
training TrainingDB The training object to create the notification from. None

Returns:

Type Description
TrainingFinishedNotification The created notification.
View Source
    @classmethod
    def from_training(cls, training: TrainingDB):
        """
        Create a `TrainingFinishedNotification` instance from a training object.

        Args:
            training (TrainingDB): The training object to create the notification from.

        Returns:
            TrainingFinishedNotification: The created notification.
        """
        receivers = list(training.participants.all())
        if not receivers.__contains__(training.actor):
            receivers.append(training.actor)
        return cls(
            receivers=receivers,
            body=cls.Body(
                global_model_uuid=training.model.id
            ),
            training_uuid=training.id
        )

Instance variables

callback_error

The callback to be called on error. By default, this is None.

callback_success

The callback to be called on success. By default, this is None.

Methods

send

def send(
    self,
    callback_success: Optional[celery.canvas.Signature] = None,
    callback_error: Optional[celery.canvas.Signature] = None
) -> celery.result.AsyncResult

Send notification to the receivers asynchronously.

Parameters:

Name Type Description Default
callback_success Optional[Signature] The callback to be called on success. Defaults to None. None
callback_error Optional[Signature] The callback to be called on error. Defaults to None. None

Returns:

Type Description
AsyncResult The result of the asynchronous operation.
View Source
    def send(
        self,
        callback_success: Optional[Signature] = None,
        callback_error: Optional[Signature] = None
    ) -> AsyncResult:
        """
        Send notification to the receivers asynchronously.

        Args:
            callback_success (Optional[Signature], optional): The callback to be called on success. Defaults to None.
            callback_error (Optional[Signature], optional): The callback to be called on error. Defaults to None.

        Returns:
            AsyncResult: The result of the asynchronous operation.
        """
        callback_success = callback_success or self.callback_success
        callback_error = callback_error or self.callback_error
        return send_notifications.s(
            notification=self, callback_success=callback_success, callback_error=callback_error
        ).apply_async(retry=False)

serialize

def serialize(
    self
) -> dict[str, typing.Any]

Serialize the notification into a dictionary.

Returns:

Type Description
dict[str, Any] The serialized notification.
View Source
    def serialize(self) -> dict[str, Any]:
        data = super().serialize()
        data["training_uuid"] = str(self.training_uuid)
        return data

TrainingRoundStartNotification

class TrainingRoundStartNotification(
    receivers: List[fl_server_core.models.user.NotificationReceiver],
    body: ~TBody,
    training_uuid: uuid.UUID
)

Notification for the start of a training round.

View Source
class TrainingRoundStartNotification(TrainingNotification["TrainingRoundStartNotification.Body"]):
    """
    Notification for the start of a training round.
    """

    type: NotificationType = NotificationType.UPDATE_ROUND_START
    """The type of the notification."""

    @property
    def callback_success(self) -> Optional[Signature]:
        return training_notification_callback_success.s(training_uuid=self.training_uuid)

    @property
    def callback_error(self) -> Optional[Signature]:
        return training_notification_callback_failure.s(training_uuid=self.training_uuid)

    @dataclass
    class Body(Serializable):
        """
        Inner class representing the body of the notification.
        """
        round: int
        """The round number."""
        global_model_uuid: UUID
        """The UUID of the global model."""

    @classmethod
    def from_training(cls, training: TrainingDB):
        """
        Create a `TrainingRoundStartNotification` instance from a training object.

        Args:
            training (TrainingDB): The training object to create the notification from.

        Returns:
            TrainingRoundStartNotification: The created notification.
        """
        return cls(
            receivers=training.participants.all(),
            body=cls.Body(
                round=training.model.round,
                global_model_uuid=training.model.id
            ),
            training_uuid=training.id
        )

Ancestors (in MRO)

  • fl_server_ai.notification.training.TrainingNotification
  • fl_server_ai.notification.Notification
  • typing.Generic
  • fl_server_ai.notification.serializable.Serializable

Descendants

  • fl_server_ai.notification.training.TrainingModelTestNotification
  • fl_server_ai.notification.training.TrainingSWAGRoundStartNotification

Class variables

Body
type

Static methods

from_training

def from_training(
    training: fl_server_core.models.training.Training
)

Create a TrainingRoundStartNotification instance from a training object.

Parameters:

Name Type Description Default
training TrainingDB The training object to create the notification from. None

Returns:

Type Description
TrainingRoundStartNotification The created notification.
View Source
    @classmethod
    def from_training(cls, training: TrainingDB):
        """
        Create a `TrainingRoundStartNotification` instance from a training object.

        Args:
            training (TrainingDB): The training object to create the notification from.

        Returns:
            TrainingRoundStartNotification: The created notification.
        """
        return cls(
            receivers=training.participants.all(),
            body=cls.Body(
                round=training.model.round,
                global_model_uuid=training.model.id
            ),
            training_uuid=training.id
        )

Instance variables

callback_error
callback_success

Methods

send

def send(
    self,
    callback_success: Optional[celery.canvas.Signature] = None,
    callback_error: Optional[celery.canvas.Signature] = None
) -> celery.result.AsyncResult

Send notification to the receivers asynchronously.

Parameters:

Name Type Description Default
callback_success Optional[Signature] The callback to be called on success. Defaults to None. None
callback_error Optional[Signature] The callback to be called on error. Defaults to None. None

Returns:

Type Description
AsyncResult The result of the asynchronous operation.
View Source
    def send(
        self,
        callback_success: Optional[Signature] = None,
        callback_error: Optional[Signature] = None
    ) -> AsyncResult:
        """
        Send notification to the receivers asynchronously.

        Args:
            callback_success (Optional[Signature], optional): The callback to be called on success. Defaults to None.
            callback_error (Optional[Signature], optional): The callback to be called on error. Defaults to None.

        Returns:
            AsyncResult: The result of the asynchronous operation.
        """
        callback_success = callback_success or self.callback_success
        callback_error = callback_error or self.callback_error
        return send_notifications.s(
            notification=self, callback_success=callback_success, callback_error=callback_error
        ).apply_async(retry=False)

serialize

def serialize(
    self
) -> dict[str, typing.Any]

Serialize the notification into a dictionary.

Returns:

Type Description
dict[str, Any] The serialized notification.
View Source
    def serialize(self) -> dict[str, Any]:
        data = super().serialize()
        data["training_uuid"] = str(self.training_uuid)
        return data