Module fl_server_ai.notification¶
View Source
# SPDX-FileCopyrightText: 2024 Benedikt Franke <benedikt.franke@dlr.de>
# SPDX-FileCopyrightText: 2024 Florian Heinrich <florian.heinrich@dlr.de>
#
# SPDX-License-Identifier: Apache-2.0
from .notification_type import NotificationType
from .notification import Notification
from .training import TrainingRoundStartNotification, TrainingFinishedNotification
__all__ = [
"NotificationType",
"Notification",
"TrainingRoundStartNotification",
"TrainingFinishedNotification",
]
Sub-modules¶
- fl_server_ai.notification.notification
- fl_server_ai.notification.notification_type
- fl_server_ai.notification.serializable
- fl_server_ai.notification.training
Classes¶
Notification¶
class Notification(
receivers: List[fl_server_core.models.user.NotificationReceiver],
body: ~TBody
)
Abstract base class for notifications.
View Source
@dataclass
class Notification(Generic[TBody], Serializable, metaclass=ABCMeta):
"""
Abstract base class for notifications.
"""
receivers: List[NotificationReceiver]
"""The receivers of the notification."""
body: TBody
"""The body of the notification."""
type: NotificationType = field(init=False)
"""The type of the notification."""
@dataclass
class Body(Serializable):
"""
Inner class for the body of the notification.
"""
pass
@property
def callback_success(self) -> Optional[Signature]:
"""
The callback to be called on success. By default, this is None.
Returns:
Optional[Signature]: The callback to be called on success, or None if no such callback is set.
"""
return None
@property
def callback_error(self) -> Optional[Signature]:
"""
The callback to be called on error. By default, this is None.
Returns:
Optional[Signature]: The callback to be called on error, or None if no such callback is set.
"""
return None
def send(
self,
callback_success: Optional[Signature] = None,
callback_error: Optional[Signature] = None
) -> AsyncResult:
"""
Send notification to the receivers asynchronously.
Args:
callback_success (Optional[Signature], optional): The callback to be called on success. Defaults to None.
callback_error (Optional[Signature], optional): The callback to be called on error. Defaults to None.
Returns:
AsyncResult: The result of the asynchronous operation.
"""
callback_success = callback_success or self.callback_success
callback_error = callback_error or self.callback_error
return send_notifications.s(
notification=self, callback_success=callback_success, callback_error=callback_error
).apply_async(retry=False)
def serialize(self) -> dict[str, Any]:
"""
Serialize the notification into a dictionary.
Returns:
dict[str, Any]: The serialized notification.
"""
return {
"notification_type": self.type.value,
"body": self.body.serialize()
}
Ancestors (in MRO)¶
- typing.Generic
- fl_server_ai.notification.serializable.Serializable
Descendants¶
- fl_server_ai.notification.training.TrainingNotification
Class variables¶
Instance variables¶
The callback to be called on error. By default, this is None.
The callback to be called on success. By default, this is None.
Methods¶
send¶
def send(
self,
callback_success: Optional[celery.canvas.Signature] = None,
callback_error: Optional[celery.canvas.Signature] = None
) -> celery.result.AsyncResult
Send notification to the receivers asynchronously.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callback_success | Optional[Signature] | The callback to be called on success. Defaults to None. | None |
callback_error | Optional[Signature] | The callback to be called on error. Defaults to None. | None |
Returns:
Type | Description |
---|---|
AsyncResult | The result of the asynchronous operation. |
View Source
def send(
self,
callback_success: Optional[Signature] = None,
callback_error: Optional[Signature] = None
) -> AsyncResult:
"""
Send notification to the receivers asynchronously.
Args:
callback_success (Optional[Signature], optional): The callback to be called on success. Defaults to None.
callback_error (Optional[Signature], optional): The callback to be called on error. Defaults to None.
Returns:
AsyncResult: The result of the asynchronous operation.
"""
callback_success = callback_success or self.callback_success
callback_error = callback_error or self.callback_error
return send_notifications.s(
notification=self, callback_success=callback_success, callback_error=callback_error
).apply_async(retry=False)
serialize¶
Serialize the notification into a dictionary.
Returns:
Type | Description |
---|---|
dict[str, Any] | The serialized notification. |
View Source
NotificationType¶
Notification types including a short description for each type.
View Source
class NotificationType(Enum):
"""
Notification types including a short description for each type.
"""
UPDATE_ROUND_START = "UPDATE_ROUND_START", _("Start update round")
SWAG_ROUND_START = "SWAG_ROUND_START", _("Start swag round")
TRAINING_START = "TRAINING_START", _("Training start")
TRAINING_FINISHED = "TRAINING_FINISHED", _("Training finished")
CLIENT_REMOVED = "CLIENT_REMOVED", _("Client removed")
MODEL_TEST_ROUND = "MODEL_TEST_ROUND", _("Start model round")
def __new__(cls, *args, **kwargs):
"""
Override the `__new__` method to set the value of the enum member.
Args:
*args: Variable length argument list where the first value is the value of the enum member.
**kwargs: Arbitrary keyword arguments.
Returns:
object: New instance of the class.
"""
obj = object.__new__(cls)
obj._value_ = args[0]
return obj
def __init__(self, _: str, description: Optional[str] = None):
"""
Initialize the enum member.
Args:
_ (str): The value of the enum member. Ignored in this method as it's already set by __new__.
description (Optional[str], optional): The description of the enum member. Defaults to None.
"""
# ignore the first param since it's already set by __new__
self._description_ = description
@property
def description(self):
"""
Property to get the description of the enum member.
Returns:
str: The description of the enum member.
"""
return self._description_
Ancestors (in MRO)¶
- enum.Enum
Class variables¶
TrainingFinishedNotification¶
class TrainingFinishedNotification(
receivers: List[fl_server_core.models.user.NotificationReceiver],
body: ~TBody,
training_uuid: uuid.UUID
)
Notification that a training has finished.
View Source
class TrainingFinishedNotification(TrainingNotification["TrainingFinishedNotification.Body"]):
"""
Notification that a training has finished.
"""
type: NotificationType = NotificationType.TRAINING_FINISHED
"""The type of the notification."""
@dataclass
class Body(Serializable):
"""
Inner class representing the body of the notification.
"""
global_model_uuid: UUID
"""The UUID of the global model."""
@classmethod
def from_training(cls, training: TrainingDB):
"""
Create a `TrainingFinishedNotification` instance from a training object.
Args:
training (TrainingDB): The training object to create the notification from.
Returns:
TrainingFinishedNotification: The created notification.
"""
receivers = list(training.participants.all())
if not receivers.__contains__(training.actor):
receivers.append(training.actor)
return cls(
receivers=receivers,
body=cls.Body(
global_model_uuid=training.model.id
),
training_uuid=training.id
)
Ancestors (in MRO)¶
- fl_server_ai.notification.training.TrainingNotification
- fl_server_ai.notification.Notification
- typing.Generic
- fl_server_ai.notification.serializable.Serializable
Class variables¶
Static methods¶
from_training¶
Create a TrainingFinishedNotification
instance from a training object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
training | TrainingDB | The training object to create the notification from. | None |
Returns:
Type | Description |
---|---|
TrainingFinishedNotification | The created notification. |
View Source
@classmethod
def from_training(cls, training: TrainingDB):
"""
Create a `TrainingFinishedNotification` instance from a training object.
Args:
training (TrainingDB): The training object to create the notification from.
Returns:
TrainingFinishedNotification: The created notification.
"""
receivers = list(training.participants.all())
if not receivers.__contains__(training.actor):
receivers.append(training.actor)
return cls(
receivers=receivers,
body=cls.Body(
global_model_uuid=training.model.id
),
training_uuid=training.id
)
Instance variables¶
The callback to be called on error. By default, this is None.
The callback to be called on success. By default, this is None.
Methods¶
send¶
def send(
self,
callback_success: Optional[celery.canvas.Signature] = None,
callback_error: Optional[celery.canvas.Signature] = None
) -> celery.result.AsyncResult
Send notification to the receivers asynchronously.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callback_success | Optional[Signature] | The callback to be called on success. Defaults to None. | None |
callback_error | Optional[Signature] | The callback to be called on error. Defaults to None. | None |
Returns:
Type | Description |
---|---|
AsyncResult | The result of the asynchronous operation. |
View Source
def send(
self,
callback_success: Optional[Signature] = None,
callback_error: Optional[Signature] = None
) -> AsyncResult:
"""
Send notification to the receivers asynchronously.
Args:
callback_success (Optional[Signature], optional): The callback to be called on success. Defaults to None.
callback_error (Optional[Signature], optional): The callback to be called on error. Defaults to None.
Returns:
AsyncResult: The result of the asynchronous operation.
"""
callback_success = callback_success or self.callback_success
callback_error = callback_error or self.callback_error
return send_notifications.s(
notification=self, callback_success=callback_success, callback_error=callback_error
).apply_async(retry=False)
serialize¶
Serialize the notification into a dictionary.
Returns:
Type | Description |
---|---|
dict[str, Any] | The serialized notification. |
View Source
TrainingRoundStartNotification¶
class TrainingRoundStartNotification(
receivers: List[fl_server_core.models.user.NotificationReceiver],
body: ~TBody,
training_uuid: uuid.UUID
)
Notification for the start of a training round.
View Source
class TrainingRoundStartNotification(TrainingNotification["TrainingRoundStartNotification.Body"]):
"""
Notification for the start of a training round.
"""
type: NotificationType = NotificationType.UPDATE_ROUND_START
"""The type of the notification."""
@property
def callback_success(self) -> Optional[Signature]:
return training_notification_callback_success.s(training_uuid=self.training_uuid)
@property
def callback_error(self) -> Optional[Signature]:
return training_notification_callback_failure.s(training_uuid=self.training_uuid)
@dataclass
class Body(Serializable):
"""
Inner class representing the body of the notification.
"""
round: int
"""The round number."""
global_model_uuid: UUID
"""The UUID of the global model."""
@classmethod
def from_training(cls, training: TrainingDB):
"""
Create a `TrainingRoundStartNotification` instance from a training object.
Args:
training (TrainingDB): The training object to create the notification from.
Returns:
TrainingRoundStartNotification: The created notification.
"""
return cls(
receivers=training.participants.all(),
body=cls.Body(
round=training.model.round,
global_model_uuid=training.model.id
),
training_uuid=training.id
)
Ancestors (in MRO)¶
- fl_server_ai.notification.training.TrainingNotification
- fl_server_ai.notification.Notification
- typing.Generic
- fl_server_ai.notification.serializable.Serializable
Descendants¶
- fl_server_ai.notification.training.TrainingModelTestNotification
- fl_server_ai.notification.training.TrainingSWAGRoundStartNotification
Class variables¶
Static methods¶
from_training¶
Create a TrainingRoundStartNotification
instance from a training object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
training | TrainingDB | The training object to create the notification from. | None |
Returns:
Type | Description |
---|---|
TrainingRoundStartNotification | The created notification. |
View Source
@classmethod
def from_training(cls, training: TrainingDB):
"""
Create a `TrainingRoundStartNotification` instance from a training object.
Args:
training (TrainingDB): The training object to create the notification from.
Returns:
TrainingRoundStartNotification: The created notification.
"""
return cls(
receivers=training.participants.all(),
body=cls.Body(
round=training.model.round,
global_model_uuid=training.model.id
),
training_uuid=training.id
)
Instance variables¶
Methods¶
send¶
def send(
self,
callback_success: Optional[celery.canvas.Signature] = None,
callback_error: Optional[celery.canvas.Signature] = None
) -> celery.result.AsyncResult
Send notification to the receivers asynchronously.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callback_success | Optional[Signature] | The callback to be called on success. Defaults to None. | None |
callback_error | Optional[Signature] | The callback to be called on error. Defaults to None. | None |
Returns:
Type | Description |
---|---|
AsyncResult | The result of the asynchronous operation. |
View Source
def send(
self,
callback_success: Optional[Signature] = None,
callback_error: Optional[Signature] = None
) -> AsyncResult:
"""
Send notification to the receivers asynchronously.
Args:
callback_success (Optional[Signature], optional): The callback to be called on success. Defaults to None.
callback_error (Optional[Signature], optional): The callback to be called on error. Defaults to None.
Returns:
AsyncResult: The result of the asynchronous operation.
"""
callback_success = callback_success or self.callback_success
callback_error = callback_error or self.callback_error
return send_notifications.s(
notification=self, callback_success=callback_success, callback_error=callback_error
).apply_async(retry=False)
serialize¶
Serialize the notification into a dictionary.
Returns:
Type | Description |
---|---|
dict[str, Any] | The serialized notification. |