Skip to content

Module fl_server_ai.notification.training

View Source
# SPDX-FileCopyrightText: 2024 Benedikt Franke <benedikt.franke@dlr.de>
# SPDX-FileCopyrightText: 2024 Florian Heinrich <florian.heinrich@dlr.de>
#
# SPDX-License-Identifier: Apache-2.0

from .finished import TrainingFinishedNotification
from .model_test import TrainingModelTestNotification
from .round_start import TrainingRoundStartNotification
from .start import TrainingStartNotification
from .swag import TrainingSWAGRoundStartNotification
from .training import TrainingNotification


__all__ = [
    "TrainingModelTestNotification",
    "TrainingNotification",
    "TrainingFinishedNotification",
    "TrainingStartNotification",
    "TrainingSWAGRoundStartNotification",
    "TrainingRoundStartNotification",
]

Sub-modules

Classes

TrainingFinishedNotification

class TrainingFinishedNotification(
    receivers: List[fl_server_core.models.user.NotificationReceiver],
    body: ~TBody,
    training_uuid: uuid.UUID
)

Notification that a training has finished.

View Source
class TrainingFinishedNotification(TrainingNotification["TrainingFinishedNotification.Body"]):
    """
    Notification that a training has finished.
    """

    type: NotificationType = NotificationType.TRAINING_FINISHED
    """The type of the notification."""

    @dataclass
    class Body(Serializable):
        """
        Inner class representing the body of the notification.
        """
        global_model_uuid: UUID
        """The UUID of the global model."""

    @classmethod
    def from_training(cls, training: TrainingDB):
        """
        Create a `TrainingFinishedNotification` instance from a training object.

        Args:
            training (TrainingDB): The training object to create the notification from.

        Returns:
            TrainingFinishedNotification: The created notification.
        """
        receivers = list(training.participants.all())
        if not receivers.__contains__(training.actor):
            receivers.append(training.actor)
        return cls(
            receivers=receivers,
            body=cls.Body(
                global_model_uuid=training.model.id
            ),
            training_uuid=training.id
        )

Ancestors (in MRO)

  • fl_server_ai.notification.training.TrainingNotification
  • fl_server_ai.notification.notification.Notification
  • typing.Generic
  • fl_server_ai.notification.serializable.Serializable

Class variables

Body
type

Static methods

from_training

def from_training(
    training: fl_server_core.models.training.Training
)

Create a TrainingFinishedNotification instance from a training object.

Parameters:

Name Type Description Default
training TrainingDB The training object to create the notification from. None

Returns:

Type Description
TrainingFinishedNotification The created notification.
View Source
    @classmethod
    def from_training(cls, training: TrainingDB):
        """
        Create a `TrainingFinishedNotification` instance from a training object.

        Args:
            training (TrainingDB): The training object to create the notification from.

        Returns:
            TrainingFinishedNotification: The created notification.
        """
        receivers = list(training.participants.all())
        if not receivers.__contains__(training.actor):
            receivers.append(training.actor)
        return cls(
            receivers=receivers,
            body=cls.Body(
                global_model_uuid=training.model.id
            ),
            training_uuid=training.id
        )

Instance variables

callback_error

The callback to be called on error. By default, this is None.

callback_success

The callback to be called on success. By default, this is None.

Methods

send

def send(
    self,
    callback_success: Optional[celery.canvas.Signature] = None,
    callback_error: Optional[celery.canvas.Signature] = None
) -> celery.result.AsyncResult

Send notification to the receivers asynchronously.

Parameters:

Name Type Description Default
callback_success Optional[Signature] The callback to be called on success. Defaults to None. None
callback_error Optional[Signature] The callback to be called on error. Defaults to None. None

Returns:

Type Description
AsyncResult The result of the asynchronous operation.
View Source
    def send(
        self,
        callback_success: Optional[Signature] = None,
        callback_error: Optional[Signature] = None
    ) -> AsyncResult:
        """
        Send notification to the receivers asynchronously.

        Args:
            callback_success (Optional[Signature], optional): The callback to be called on success. Defaults to None.
            callback_error (Optional[Signature], optional): The callback to be called on error. Defaults to None.

        Returns:
            AsyncResult: The result of the asynchronous operation.
        """
        callback_success = callback_success or self.callback_success
        callback_error = callback_error or self.callback_error
        return send_notifications.s(
            notification=self, callback_success=callback_success, callback_error=callback_error
        ).apply_async(retry=False)

serialize

def serialize(
    self
) -> dict[str, typing.Any]

Serialize the notification into a dictionary.

Returns:

Type Description
dict[str, Any] The serialized notification.
View Source
    def serialize(self) -> dict[str, Any]:
        data = super().serialize()
        data["training_uuid"] = str(self.training_uuid)
        return data

TrainingModelTestNotification

class TrainingModelTestNotification(
    receivers: List[fl_server_core.models.user.NotificationReceiver],
    body: ~TBody,
    training_uuid: uuid.UUID
)

Notification for the start of a model test round.

View Source
class TrainingModelTestNotification(TrainingRoundStartNotification):
    """
    Notification for the start of a model test round.
    """

    type: NotificationType = NotificationType.MODEL_TEST_ROUND
    """The type of the notification."""

Ancestors (in MRO)

  • fl_server_ai.notification.training.TrainingRoundStartNotification
  • fl_server_ai.notification.training.TrainingNotification
  • fl_server_ai.notification.notification.Notification
  • typing.Generic
  • fl_server_ai.notification.serializable.Serializable

Class variables

Body
type

Static methods

from_training

def from_training(
    training: fl_server_core.models.training.Training
)

Create a TrainingRoundStartNotification instance from a training object.

Parameters:

Name Type Description Default
training TrainingDB The training object to create the notification from. None

Returns:

Type Description
TrainingRoundStartNotification The created notification.
View Source
    @classmethod
    def from_training(cls, training: TrainingDB):
        """
        Create a `TrainingRoundStartNotification` instance from a training object.

        Args:
            training (TrainingDB): The training object to create the notification from.

        Returns:
            TrainingRoundStartNotification: The created notification.
        """
        return cls(
            receivers=training.participants.all(),
            body=cls.Body(
                round=training.model.round,
                global_model_uuid=training.model.id
            ),
            training_uuid=training.id
        )

Instance variables

callback_error
callback_success

Methods

send

def send(
    self,
    callback_success: Optional[celery.canvas.Signature] = None,
    callback_error: Optional[celery.canvas.Signature] = None
) -> celery.result.AsyncResult

Send notification to the receivers asynchronously.

Parameters:

Name Type Description Default
callback_success Optional[Signature] The callback to be called on success. Defaults to None. None
callback_error Optional[Signature] The callback to be called on error. Defaults to None. None

Returns:

Type Description
AsyncResult The result of the asynchronous operation.
View Source
    def send(
        self,
        callback_success: Optional[Signature] = None,
        callback_error: Optional[Signature] = None
    ) -> AsyncResult:
        """
        Send notification to the receivers asynchronously.

        Args:
            callback_success (Optional[Signature], optional): The callback to be called on success. Defaults to None.
            callback_error (Optional[Signature], optional): The callback to be called on error. Defaults to None.

        Returns:
            AsyncResult: The result of the asynchronous operation.
        """
        callback_success = callback_success or self.callback_success
        callback_error = callback_error or self.callback_error
        return send_notifications.s(
            notification=self, callback_success=callback_success, callback_error=callback_error
        ).apply_async(retry=False)

serialize

def serialize(
    self
) -> dict[str, typing.Any]

Serialize the notification into a dictionary.

Returns:

Type Description
dict[str, Any] The serialized notification.
View Source
    def serialize(self) -> dict[str, Any]:
        data = super().serialize()
        data["training_uuid"] = str(self.training_uuid)
        return data

TrainingNotification

class TrainingNotification(
    receivers: List[fl_server_core.models.user.NotificationReceiver],
    body: ~TBody,
    training_uuid: uuid.UUID
)

Abstract base class for training notifications.

View Source
@dataclass
class TrainingNotification(Generic[TBody], Notification[TBody]):
    """
    Abstract base class for training notifications.
    """

    training_uuid: UUID
    """The UUID of the training."""

    def serialize(self) -> dict[str, Any]:
        data = super().serialize()
        data["training_uuid"] = str(self.training_uuid)
        return data

Ancestors (in MRO)

  • fl_server_ai.notification.notification.Notification
  • typing.Generic
  • fl_server_ai.notification.serializable.Serializable

Descendants

  • fl_server_ai.notification.training.TrainingFinishedNotification
  • fl_server_ai.notification.training.TrainingRoundStartNotification
  • fl_server_ai.notification.training.TrainingStartNotification

Class variables

Body

Instance variables

callback_error

The callback to be called on error. By default, this is None.

callback_success

The callback to be called on success. By default, this is None.

Methods

send

def send(
    self,
    callback_success: Optional[celery.canvas.Signature] = None,
    callback_error: Optional[celery.canvas.Signature] = None
) -> celery.result.AsyncResult

Send notification to the receivers asynchronously.

Parameters:

Name Type Description Default
callback_success Optional[Signature] The callback to be called on success. Defaults to None. None
callback_error Optional[Signature] The callback to be called on error. Defaults to None. None

Returns:

Type Description
AsyncResult The result of the asynchronous operation.
View Source
    def send(
        self,
        callback_success: Optional[Signature] = None,
        callback_error: Optional[Signature] = None
    ) -> AsyncResult:
        """
        Send notification to the receivers asynchronously.

        Args:
            callback_success (Optional[Signature], optional): The callback to be called on success. Defaults to None.
            callback_error (Optional[Signature], optional): The callback to be called on error. Defaults to None.

        Returns:
            AsyncResult: The result of the asynchronous operation.
        """
        callback_success = callback_success or self.callback_success
        callback_error = callback_error or self.callback_error
        return send_notifications.s(
            notification=self, callback_success=callback_success, callback_error=callback_error
        ).apply_async(retry=False)

serialize

def serialize(
    self
) -> dict[str, typing.Any]

Serialize the notification into a dictionary.

Returns:

Type Description
dict[str, Any] The serialized notification.
View Source
    def serialize(self) -> dict[str, Any]:
        data = super().serialize()
        data["training_uuid"] = str(self.training_uuid)
        return data

TrainingRoundStartNotification

class TrainingRoundStartNotification(
    receivers: List[fl_server_core.models.user.NotificationReceiver],
    body: ~TBody,
    training_uuid: uuid.UUID
)

Notification for the start of a training round.

View Source
class TrainingRoundStartNotification(TrainingNotification["TrainingRoundStartNotification.Body"]):
    """
    Notification for the start of a training round.
    """

    type: NotificationType = NotificationType.UPDATE_ROUND_START
    """The type of the notification."""

    @property
    def callback_success(self) -> Optional[Signature]:
        return training_notification_callback_success.s(training_uuid=self.training_uuid)

    @property
    def callback_error(self) -> Optional[Signature]:
        return training_notification_callback_failure.s(training_uuid=self.training_uuid)

    @dataclass
    class Body(Serializable):
        """
        Inner class representing the body of the notification.
        """
        round: int
        """The round number."""
        global_model_uuid: UUID
        """The UUID of the global model."""

    @classmethod
    def from_training(cls, training: TrainingDB):
        """
        Create a `TrainingRoundStartNotification` instance from a training object.

        Args:
            training (TrainingDB): The training object to create the notification from.

        Returns:
            TrainingRoundStartNotification: The created notification.
        """
        return cls(
            receivers=training.participants.all(),
            body=cls.Body(
                round=training.model.round,
                global_model_uuid=training.model.id
            ),
            training_uuid=training.id
        )

Ancestors (in MRO)

  • fl_server_ai.notification.training.TrainingNotification
  • fl_server_ai.notification.notification.Notification
  • typing.Generic
  • fl_server_ai.notification.serializable.Serializable

Descendants

  • fl_server_ai.notification.training.TrainingModelTestNotification
  • fl_server_ai.notification.training.TrainingSWAGRoundStartNotification

Class variables

Body
type

Static methods

from_training

def from_training(
    training: fl_server_core.models.training.Training
)

Create a TrainingRoundStartNotification instance from a training object.

Parameters:

Name Type Description Default
training TrainingDB The training object to create the notification from. None

Returns:

Type Description
TrainingRoundStartNotification The created notification.
View Source
    @classmethod
    def from_training(cls, training: TrainingDB):
        """
        Create a `TrainingRoundStartNotification` instance from a training object.

        Args:
            training (TrainingDB): The training object to create the notification from.

        Returns:
            TrainingRoundStartNotification: The created notification.
        """
        return cls(
            receivers=training.participants.all(),
            body=cls.Body(
                round=training.model.round,
                global_model_uuid=training.model.id
            ),
            training_uuid=training.id
        )

Instance variables

callback_error
callback_success

Methods

send

def send(
    self,
    callback_success: Optional[celery.canvas.Signature] = None,
    callback_error: Optional[celery.canvas.Signature] = None
) -> celery.result.AsyncResult

Send notification to the receivers asynchronously.

Parameters:

Name Type Description Default
callback_success Optional[Signature] The callback to be called on success. Defaults to None. None
callback_error Optional[Signature] The callback to be called on error. Defaults to None. None

Returns:

Type Description
AsyncResult The result of the asynchronous operation.
View Source
    def send(
        self,
        callback_success: Optional[Signature] = None,
        callback_error: Optional[Signature] = None
    ) -> AsyncResult:
        """
        Send notification to the receivers asynchronously.

        Args:
            callback_success (Optional[Signature], optional): The callback to be called on success. Defaults to None.
            callback_error (Optional[Signature], optional): The callback to be called on error. Defaults to None.

        Returns:
            AsyncResult: The result of the asynchronous operation.
        """
        callback_success = callback_success or self.callback_success
        callback_error = callback_error or self.callback_error
        return send_notifications.s(
            notification=self, callback_success=callback_success, callback_error=callback_error
        ).apply_async(retry=False)

serialize

def serialize(
    self
) -> dict[str, typing.Any]

Serialize the notification into a dictionary.

Returns:

Type Description
dict[str, Any] The serialized notification.
View Source
    def serialize(self) -> dict[str, Any]:
        data = super().serialize()
        data["training_uuid"] = str(self.training_uuid)
        return data

TrainingSWAGRoundStartNotification

class TrainingSWAGRoundStartNotification(
    receivers: List[fl_server_core.models.user.NotificationReceiver],
    body: ~TBody,
    training_uuid: uuid.UUID
)

Notification for the start of a SWAG training round.

View Source
class TrainingSWAGRoundStartNotification(TrainingRoundStartNotification):
    """
    Notification for the start of a SWAG training round.
    """

    type: NotificationType = NotificationType.SWAG_ROUND_START
    """The type of the notification."""

Ancestors (in MRO)

  • fl_server_ai.notification.training.TrainingRoundStartNotification
  • fl_server_ai.notification.training.TrainingNotification
  • fl_server_ai.notification.notification.Notification
  • typing.Generic
  • fl_server_ai.notification.serializable.Serializable

Class variables

Body
type

Static methods

from_training

def from_training(
    training: fl_server_core.models.training.Training
)

Create a TrainingRoundStartNotification instance from a training object.

Parameters:

Name Type Description Default
training TrainingDB The training object to create the notification from. None

Returns:

Type Description
TrainingRoundStartNotification The created notification.
View Source
    @classmethod
    def from_training(cls, training: TrainingDB):
        """
        Create a `TrainingRoundStartNotification` instance from a training object.

        Args:
            training (TrainingDB): The training object to create the notification from.

        Returns:
            TrainingRoundStartNotification: The created notification.
        """
        return cls(
            receivers=training.participants.all(),
            body=cls.Body(
                round=training.model.round,
                global_model_uuid=training.model.id
            ),
            training_uuid=training.id
        )

Instance variables

callback_error
callback_success

Methods

send

def send(
    self,
    callback_success: Optional[celery.canvas.Signature] = None,
    callback_error: Optional[celery.canvas.Signature] = None
) -> celery.result.AsyncResult

Send notification to the receivers asynchronously.

Parameters:

Name Type Description Default
callback_success Optional[Signature] The callback to be called on success. Defaults to None. None
callback_error Optional[Signature] The callback to be called on error. Defaults to None. None

Returns:

Type Description
AsyncResult The result of the asynchronous operation.
View Source
    def send(
        self,
        callback_success: Optional[Signature] = None,
        callback_error: Optional[Signature] = None
    ) -> AsyncResult:
        """
        Send notification to the receivers asynchronously.

        Args:
            callback_success (Optional[Signature], optional): The callback to be called on success. Defaults to None.
            callback_error (Optional[Signature], optional): The callback to be called on error. Defaults to None.

        Returns:
            AsyncResult: The result of the asynchronous operation.
        """
        callback_success = callback_success or self.callback_success
        callback_error = callback_error or self.callback_error
        return send_notifications.s(
            notification=self, callback_success=callback_success, callback_error=callback_error
        ).apply_async(retry=False)

serialize

def serialize(
    self
) -> dict[str, typing.Any]

Serialize the notification into a dictionary.

Returns:

Type Description
dict[str, Any] The serialized notification.
View Source
    def serialize(self) -> dict[str, Any]:
        data = super().serialize()
        data["training_uuid"] = str(self.training_uuid)
        return data

TrainingStartNotification

class TrainingStartNotification(
    receivers: List[fl_server_core.models.user.NotificationReceiver],
    body: ~TBody,
    training_uuid: uuid.UUID
)

Class representing a notification for the start of a training.

View Source
class TrainingStartNotification(TrainingNotification["TrainingStartNotification.Body"]):
    """
    Class representing a notification for the start of a training.
    """

    type: NotificationType = NotificationType.TRAINING_START
    """The type of the notification."""

    @dataclass
    class Body(Serializable):
        """
        Inner class representing the body of the notification.
        """
        global_model_uuid: UUID
        """The UUID of the global model."""

    @classmethod
    def from_training(cls, training: TrainingDB):
        """
        Create a `TrainingStartNotification` instance from a training object.

        Args:
            training (TrainingDB): The training object to create the notification from.

        Returns:
            TrainingStartNotification: The created notification.
        """
        return cls(
            receivers=training.participants.all(),
            body=cls.Body(
                global_model_uuid=training.model.id
            ),
            training_uuid=training.id
        )

Ancestors (in MRO)

  • fl_server_ai.notification.training.TrainingNotification
  • fl_server_ai.notification.notification.Notification
  • typing.Generic
  • fl_server_ai.notification.serializable.Serializable

Class variables

Body
type

Static methods

from_training

def from_training(
    training: fl_server_core.models.training.Training
)

Create a TrainingStartNotification instance from a training object.

Parameters:

Name Type Description Default
training TrainingDB The training object to create the notification from. None

Returns:

Type Description
TrainingStartNotification The created notification.
View Source
    @classmethod
    def from_training(cls, training: TrainingDB):
        """
        Create a `TrainingStartNotification` instance from a training object.

        Args:
            training (TrainingDB): The training object to create the notification from.

        Returns:
            TrainingStartNotification: The created notification.
        """
        return cls(
            receivers=training.participants.all(),
            body=cls.Body(
                global_model_uuid=training.model.id
            ),
            training_uuid=training.id
        )

Instance variables

callback_error

The callback to be called on error. By default, this is None.

callback_success

The callback to be called on success. By default, this is None.

Methods

send

def send(
    self,
    callback_success: Optional[celery.canvas.Signature] = None,
    callback_error: Optional[celery.canvas.Signature] = None
) -> celery.result.AsyncResult

Send notification to the receivers asynchronously.

Parameters:

Name Type Description Default
callback_success Optional[Signature] The callback to be called on success. Defaults to None. None
callback_error Optional[Signature] The callback to be called on error. Defaults to None. None

Returns:

Type Description
AsyncResult The result of the asynchronous operation.
View Source
    def send(
        self,
        callback_success: Optional[Signature] = None,
        callback_error: Optional[Signature] = None
    ) -> AsyncResult:
        """
        Send notification to the receivers asynchronously.

        Args:
            callback_success (Optional[Signature], optional): The callback to be called on success. Defaults to None.
            callback_error (Optional[Signature], optional): The callback to be called on error. Defaults to None.

        Returns:
            AsyncResult: The result of the asynchronous operation.
        """
        callback_success = callback_success or self.callback_success
        callback_error = callback_error or self.callback_error
        return send_notifications.s(
            notification=self, callback_success=callback_success, callback_error=callback_error
        ).apply_async(retry=False)

serialize

def serialize(
    self
) -> dict[str, typing.Any]

Serialize the notification into a dictionary.

Returns:

Type Description
dict[str, Any] The serialized notification.
View Source
    def serialize(self) -> dict[str, Any]:
        data = super().serialize()
        data["training_uuid"] = str(self.training_uuid)
        return data