Module fl_server_api.views.dummy¶
View Source
# SPDX-FileCopyrightText: 2024 Benedikt Franke <benedikt.franke@dlr.de>
# SPDX-FileCopyrightText: 2024 Florian Heinrich <florian.heinrich@dlr.de>
#
# SPDX-License-Identifier: Apache-2.0
from django.db import IntegrityError, transaction
from django.http import HttpRequest, JsonResponse
from logging import getLogger
import random
from rest_framework.authtoken.models import Token
from rest_framework.viewsets import ViewSet as DjangoViewSet
import torch
import torch.nn
import torchvision.transforms.v2
from uuid import uuid4
from fl_server_core.models import Training, User
from fl_server_core.models.training import TrainingState
from fl_server_core.tests.dummy import Dummy
from fl_server_core.utils.torch_serialization import from_torch_module
class DummyView(DjangoViewSet):
"""
A ViewSet that creates dummy data for testing.
"""
_logger = getLogger("fl.server")
authentication_classes: list = []
permission_classes: list = []
def create_dummy_metrics_and_models(self, _request: HttpRequest):
"""
Create dummy metrics, models, etc.
This method creates
- a dummy user,
- a torch model,
- a preprocessing model,
- a model,
- participants,
- a training, and
- metrics of the training.
Args:
_request (HttpRequest): The request.
Returns:
JsonResponse: A JsonResponse with the details of the created objects.
"""
epochs = 80
try:
# `transaction.atomic` is important
# otherwise Django's internal atomic transaction won't be closed if an IntegrityError is raised
# and `User.objects.get` will raise an TransactionManagementError
with transaction.atomic():
user = Dummy.create_user(username="dummy-user", password="secret")
except IntegrityError:
self._logger.warning("Dummy User already exists")
user = User.objects.get(username="dummy-user")
torch_model = torch.jit.script(torch.nn.Sequential(
torch.nn.Conv2d(1, 32, 3),
torch.nn.ReLU(),
torch.nn.Conv2d(32, 64, 3),
torch.nn.ReLU(),
torch.nn.MaxPool2d(2),
torch.nn.Flatten(),
torch.nn.Dropout(0.25),
torch.nn.Linear(9216, 128),
torch.nn.ReLU(),
torch.nn.Dropout(),
torch.nn.Linear(128, 10),
))
preprocessing_model = torch.nn.Sequential(
torchvision.transforms.v2.ToImage(),
torchvision.transforms.v2.ToDtype(torch.float32, scale=True),
torchvision.transforms.v2.Resize((28, 28)),
torchvision.transforms.v2.Grayscale(),
torchvision.transforms.v2.Normalize((0.1307,), (0.3015,))
)
model = Dummy.create_model(
owner=user,
weights=from_torch_module(torch_model), # torchscript model
preprocessing=from_torch_module(preprocessing_model), # normal torch model
input_shape=[None, 3, 28, 28],
)
participants = [Dummy.create_client() for _ in range(5)]
training = Dummy.create_training(
state=TrainingState.COMPLETED,
target_num_updates=epochs,
actor=user,
model=model,
participants=participants,
)
self._fill_metrics(training, epochs)
self.created = True
return JsonResponse({
"message": "Created Dummy Data in Metrics Database!",
"training_uuid": training.id,
"user_uuid": user.id,
"user_credentials": {
"username": "dummy-user",
"password": "secret",
"token": Token.objects.get(user=user).key,
},
"participants": [str(p.id) for p in training.participants.all()],
})
def _fill_metrics(self, training: Training, epochs: int):
"""
Create and fill metrics of a training.
This method creates metrics for each epoch and each participant of the training.
The metrics include the NLLoss and the Accuracy.
Args:
training (Training): The training.
epochs (int): The number of epochs.
"""
for epoch in range(epochs):
for client in training.participants.all():
Dummy.create_metric(
model=training.model,
identifier=str(uuid4()).split("-")[0],
key="NLLoss",
value_float=27.354/(epoch+1) + random.randint(0, 100) / 100,
step=epoch,
reporter=client
)
Dummy.create_metric(
model=training.model,
identifier=str(uuid4()).split("-")[0],
key="Accuracy",
value_float=epoch/epochs + random.randint(0, 100) / 1000,
step=epoch,
reporter=client
)
Classes¶
DummyView¶
A ViewSet that creates dummy data for testing.
View Source
class DummyView(DjangoViewSet):
"""
A ViewSet that creates dummy data for testing.
"""
_logger = getLogger("fl.server")
authentication_classes: list = []
permission_classes: list = []
def create_dummy_metrics_and_models(self, _request: HttpRequest):
"""
Create dummy metrics, models, etc.
This method creates
- a dummy user,
- a torch model,
- a preprocessing model,
- a model,
- participants,
- a training, and
- metrics of the training.
Args:
_request (HttpRequest): The request.
Returns:
JsonResponse: A JsonResponse with the details of the created objects.
"""
epochs = 80
try:
# `transaction.atomic` is important
# otherwise Django's internal atomic transaction won't be closed if an IntegrityError is raised
# and `User.objects.get` will raise an TransactionManagementError
with transaction.atomic():
user = Dummy.create_user(username="dummy-user", password="secret")
except IntegrityError:
self._logger.warning("Dummy User already exists")
user = User.objects.get(username="dummy-user")
torch_model = torch.jit.script(torch.nn.Sequential(
torch.nn.Conv2d(1, 32, 3),
torch.nn.ReLU(),
torch.nn.Conv2d(32, 64, 3),
torch.nn.ReLU(),
torch.nn.MaxPool2d(2),
torch.nn.Flatten(),
torch.nn.Dropout(0.25),
torch.nn.Linear(9216, 128),
torch.nn.ReLU(),
torch.nn.Dropout(),
torch.nn.Linear(128, 10),
))
preprocessing_model = torch.nn.Sequential(
torchvision.transforms.v2.ToImage(),
torchvision.transforms.v2.ToDtype(torch.float32, scale=True),
torchvision.transforms.v2.Resize((28, 28)),
torchvision.transforms.v2.Grayscale(),
torchvision.transforms.v2.Normalize((0.1307,), (0.3015,))
)
model = Dummy.create_model(
owner=user,
weights=from_torch_module(torch_model), # torchscript model
preprocessing=from_torch_module(preprocessing_model), # normal torch model
input_shape=[None, 3, 28, 28],
)
participants = [Dummy.create_client() for _ in range(5)]
training = Dummy.create_training(
state=TrainingState.COMPLETED,
target_num_updates=epochs,
actor=user,
model=model,
participants=participants,
)
self._fill_metrics(training, epochs)
self.created = True
return JsonResponse({
"message": "Created Dummy Data in Metrics Database!",
"training_uuid": training.id,
"user_uuid": user.id,
"user_credentials": {
"username": "dummy-user",
"password": "secret",
"token": Token.objects.get(user=user).key,
},
"participants": [str(p.id) for p in training.participants.all()],
})
def _fill_metrics(self, training: Training, epochs: int):
"""
Create and fill metrics of a training.
This method creates metrics for each epoch and each participant of the training.
The metrics include the NLLoss and the Accuracy.
Args:
training (Training): The training.
epochs (int): The number of epochs.
"""
for epoch in range(epochs):
for client in training.participants.all():
Dummy.create_metric(
model=training.model,
identifier=str(uuid4()).split("-")[0],
key="NLLoss",
value_float=27.354/(epoch+1) + random.randint(0, 100) / 100,
step=epoch,
reporter=client
)
Dummy.create_metric(
model=training.model,
identifier=str(uuid4()).split("-")[0],
key="Accuracy",
value_float=epoch/epochs + random.randint(0, 100) / 1000,
step=epoch,
reporter=client
)
Ancestors (in MRO)¶
- rest_framework.viewsets.ViewSet
- rest_framework.viewsets.ViewSetMixin
- rest_framework.views.APIView
- django.views.generic.base.View
Class variables¶
Static methods¶
as_view¶
Because of the way class based views create a closure around the
instantiated view, we need to totally reimplement .as_view
,
and slightly modify the view function that is created and returned.
View Source
@classonlymethod
def as_view(cls, actions=None, **initkwargs):
"""
Because of the way class based views create a closure around the
instantiated view, we need to totally reimplement `.as_view`,
and slightly modify the view function that is created and returned.
"""
# The name and description initkwargs may be explicitly overridden for
# certain route configurations. eg, names of extra actions.
cls.name = None
cls.description = None
# The suffix initkwarg is reserved for displaying the viewset type.
# This initkwarg should have no effect if the name is provided.
# eg. 'List' or 'Instance'.
cls.suffix = None
# The detail initkwarg is reserved for introspecting the viewset type.
cls.detail = None
# Setting a basename allows a view to reverse its action urls. This
# value is provided by the router through the initkwargs.
cls.basename = None
# actions must not be empty
if not actions:
raise TypeError("The `actions` argument must be provided when "
"calling `.as_view()` on a ViewSet. For example "
"`.as_view({'get': 'list'})`")
# sanitize keyword arguments
for key in initkwargs:
if key in cls.http_method_names:
raise TypeError("You tried to pass in the %s method name as a "
"keyword argument to %s(). Don't do that."
% (key, cls.__name__))
if not hasattr(cls, key):
raise TypeError("%s() received an invalid keyword %r" % (
cls.__name__, key))
# name and suffix are mutually exclusive
if 'name' in initkwargs and 'suffix' in initkwargs:
raise TypeError("%s() received both `name` and `suffix`, which are "
"mutually exclusive arguments." % (cls.__name__))
def view(request, *args, **kwargs):
self = cls(**initkwargs)
if 'get' in actions and 'head' not in actions:
actions['head'] = actions['get']
# We also store the mapping of request methods to actions,
# so that we can later set the action attribute.
# eg. `self.action = 'list'` on an incoming GET request.
self.action_map = actions
# Bind methods to actions
# This is the bit that's different to a standard view
for method, action in actions.items():
handler = getattr(self, action)
setattr(self, method, handler)
self.request = request
self.args = args
self.kwargs = kwargs
# And continue as usual
return self.dispatch(request, *args, **kwargs)
# take name and docstring from class
update_wrapper(view, cls, updated=())
# and possible attributes set by decorators
# like csrf_exempt from dispatch
update_wrapper(view, cls.dispatch, assigned=())
# We need to set these on the view function, so that breadcrumb
# generation can pick out these bits of information from a
# resolved URL.
view.cls = cls
view.initkwargs = initkwargs
view.actions = actions
return csrf_exempt(view)
get_extra_actions¶
Get the methods that are marked as an extra ViewSet @action
.
View Source
Instance variables¶
Wrap Django's private _allowed_methods
interface in a public property.
Methods¶
check_object_permissions¶
Check if the request should be permitted for a given object.
Raises an appropriate exception if the request is not permitted.
View Source
def check_object_permissions(self, request, obj):
"""
Check if the request should be permitted for a given object.
Raises an appropriate exception if the request is not permitted.
"""
for permission in self.get_permissions():
if not permission.has_object_permission(request, self, obj):
self.permission_denied(
request,
message=getattr(permission, 'message', None),
code=getattr(permission, 'code', None)
)
check_permissions¶
Check if the request should be permitted.
Raises an appropriate exception if the request is not permitted.
View Source
def check_permissions(self, request):
"""
Check if the request should be permitted.
Raises an appropriate exception if the request is not permitted.
"""
for permission in self.get_permissions():
if not permission.has_permission(request, self):
self.permission_denied(
request,
message=getattr(permission, 'message', None),
code=getattr(permission, 'code', None)
)
check_throttles¶
Check if request should be throttled.
Raises an appropriate exception if the request is throttled.
View Source
def check_throttles(self, request):
"""
Check if request should be throttled.
Raises an appropriate exception if the request is throttled.
"""
throttle_durations = []
for throttle in self.get_throttles():
if not throttle.allow_request(request, self):
throttle_durations.append(throttle.wait())
if throttle_durations:
# Filter out `None` values which may happen in case of config / rate
# changes, see #1438
durations = [
duration for duration in throttle_durations
if duration is not None
]
duration = max(durations, default=None)
self.throttled(request, duration)
create_dummy_metrics_and_models¶
Create dummy metrics, models, etc.
This method creates
- a dummy user,
- a torch model,
- a preprocessing model,
- a model,
- participants,
- a training, and
- metrics of the training.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
_request | HttpRequest | The request. | None |
Returns:
Type | Description |
---|---|
JsonResponse | A JsonResponse with the details of the created objects. |
View Source
def create_dummy_metrics_and_models(self, _request: HttpRequest):
"""
Create dummy metrics, models, etc.
This method creates
- a dummy user,
- a torch model,
- a preprocessing model,
- a model,
- participants,
- a training, and
- metrics of the training.
Args:
_request (HttpRequest): The request.
Returns:
JsonResponse: A JsonResponse with the details of the created objects.
"""
epochs = 80
try:
# `transaction.atomic` is important
# otherwise Django's internal atomic transaction won't be closed if an IntegrityError is raised
# and `User.objects.get` will raise an TransactionManagementError
with transaction.atomic():
user = Dummy.create_user(username="dummy-user", password="secret")
except IntegrityError:
self._logger.warning("Dummy User already exists")
user = User.objects.get(username="dummy-user")
torch_model = torch.jit.script(torch.nn.Sequential(
torch.nn.Conv2d(1, 32, 3),
torch.nn.ReLU(),
torch.nn.Conv2d(32, 64, 3),
torch.nn.ReLU(),
torch.nn.MaxPool2d(2),
torch.nn.Flatten(),
torch.nn.Dropout(0.25),
torch.nn.Linear(9216, 128),
torch.nn.ReLU(),
torch.nn.Dropout(),
torch.nn.Linear(128, 10),
))
preprocessing_model = torch.nn.Sequential(
torchvision.transforms.v2.ToImage(),
torchvision.transforms.v2.ToDtype(torch.float32, scale=True),
torchvision.transforms.v2.Resize((28, 28)),
torchvision.transforms.v2.Grayscale(),
torchvision.transforms.v2.Normalize((0.1307,), (0.3015,))
)
model = Dummy.create_model(
owner=user,
weights=from_torch_module(torch_model), # torchscript model
preprocessing=from_torch_module(preprocessing_model), # normal torch model
input_shape=[None, 3, 28, 28],
)
participants = [Dummy.create_client() for _ in range(5)]
training = Dummy.create_training(
state=TrainingState.COMPLETED,
target_num_updates=epochs,
actor=user,
model=model,
participants=participants,
)
self._fill_metrics(training, epochs)
self.created = True
return JsonResponse({
"message": "Created Dummy Data in Metrics Database!",
"training_uuid": training.id,
"user_uuid": user.id,
"user_credentials": {
"username": "dummy-user",
"password": "secret",
"token": Token.objects.get(user=user).key,
},
"participants": [str(p.id) for p in training.participants.all()],
})
determine_version¶
If versioning is being used, then determine any API version for the
incoming request. Returns a two-tuple of (version, versioning_scheme)
View Source
def determine_version(self, request, *args, **kwargs):
"""
If versioning is being used, then determine any API version for the
incoming request. Returns a two-tuple of (version, versioning_scheme)
"""
if self.versioning_class is None:
return (None, None)
scheme = self.versioning_class()
return (scheme.determine_version(request, *args, **kwargs), scheme)
dispatch¶
.dispatch()
is pretty much the same as Django's regular dispatch,
but with extra hooks for startup, finalize, and exception handling.
View Source
def dispatch(self, request, *args, **kwargs):
"""
`.dispatch()` is pretty much the same as Django's regular dispatch,
but with extra hooks for startup, finalize, and exception handling.
"""
self.args = args
self.kwargs = kwargs
request = self.initialize_request(request, *args, **kwargs)
self.request = request
self.headers = self.default_response_headers # deprecate?
try:
self.initial(request, *args, **kwargs)
# Get the appropriate handler method
if request.method.lower() in self.http_method_names:
handler = getattr(self, request.method.lower(),
self.http_method_not_allowed)
else:
handler = self.http_method_not_allowed
response = handler(request, *args, **kwargs)
except Exception as exc:
response = self.handle_exception(exc)
self.response = self.finalize_response(request, response, *args, **kwargs)
return self.response
finalize_response¶
Returns the final response object.
View Source
def finalize_response(self, request, response, *args, **kwargs):
"""
Returns the final response object.
"""
# Make the error obvious if a proper response is not returned
assert isinstance(response, HttpResponseBase), (
'Expected a `Response`, `HttpResponse` or `HttpStreamingResponse` '
'to be returned from the view, but received a `%s`'
% type(response)
)
if isinstance(response, Response):
if not getattr(request, 'accepted_renderer', None):
neg = self.perform_content_negotiation(request, force=True)
request.accepted_renderer, request.accepted_media_type = neg
response.accepted_renderer = request.accepted_renderer
response.accepted_media_type = request.accepted_media_type
response.renderer_context = self.get_renderer_context()
# Add new vary headers to the response instead of overwriting.
vary_headers = self.headers.pop('Vary', None)
if vary_headers is not None:
patch_vary_headers(response, cc_delim_re.split(vary_headers))
for key, value in self.headers.items():
response[key] = value
return response
get_authenticate_header¶
If a request is unauthenticated, determine the WWW-Authenticate
header to use for 401 responses, if any.
View Source
get_authenticators¶
Instantiates and returns the list of authenticators that this view can use.
View Source
get_content_negotiator¶
Instantiate and return the content negotiation class to use.
View Source
get_exception_handler¶
Returns the exception handler that this view uses.
View Source
get_exception_handler_context¶
Returns a dict that is passed through to EXCEPTION_HANDLER,
as the context
argument.
View Source
get_extra_action_url_map¶
Build a map of {names: urls} for the extra actions.
This method will noop if detail
was not provided as a view initkwarg.
View Source
def get_extra_action_url_map(self):
"""
Build a map of {names: urls} for the extra actions.
This method will noop if `detail` was not provided as a view initkwarg.
"""
action_urls = OrderedDict()
# exit early if `detail` has not been provided
if self.detail is None:
return action_urls
# filter for the relevant extra actions
actions = [
action for action in self.get_extra_actions()
if action.detail == self.detail
]
for action in actions:
try:
url_name = '%s-%s' % (self.basename, action.url_name)
namespace = self.request.resolver_match.namespace
if namespace:
url_name = '%s:%s' % (namespace, url_name)
url = reverse(url_name, self.args, self.kwargs, request=self.request)
view = self.__class__(**action.kwargs)
action_urls[view.get_view_name()] = url
except NoReverseMatch:
pass # URL requires additional arguments, ignore
return action_urls
get_format_suffix¶
Determine if the request includes a '.json' style format suffix
View Source
get_parser_context¶
Returns a dict that is passed through to Parser.parse(),
as the parser_context
keyword argument.
View Source
def get_parser_context(self, http_request):
"""
Returns a dict that is passed through to Parser.parse(),
as the `parser_context` keyword argument.
"""
# Note: Additionally `request` and `encoding` will also be added
# to the context by the Request object.
return {
'view': self,
'args': getattr(self, 'args', ()),
'kwargs': getattr(self, 'kwargs', {})
}
get_parsers¶
Instantiates and returns the list of parsers that this view can use.
View Source
get_permissions¶
Instantiates and returns the list of permissions that this view requires.
View Source
get_renderer_context¶
Returns a dict that is passed through to Renderer.render(),
as the renderer_context
keyword argument.
View Source
def get_renderer_context(self):
"""
Returns a dict that is passed through to Renderer.render(),
as the `renderer_context` keyword argument.
"""
# Note: Additionally 'response' will also be added to the context,
# by the Response object.
return {
'view': self,
'args': getattr(self, 'args', ()),
'kwargs': getattr(self, 'kwargs', {}),
'request': getattr(self, 'request', None)
}
get_renderers¶
Instantiates and returns the list of renderers that this view can use.
View Source
get_throttles¶
Instantiates and returns the list of throttles that this view uses.
View Source
get_view_description¶
Return some descriptive text for the view, as used in OPTIONS responses
and in the browsable API.
View Source
get_view_name¶
Return the view name, as used in OPTIONS responses and in the
browsable API.
View Source
handle_exception¶
Handle any exception that occurs, by returning an appropriate response,
or re-raising the error.
View Source
def handle_exception(self, exc):
"""
Handle any exception that occurs, by returning an appropriate response,
or re-raising the error.
"""
if isinstance(exc, (exceptions.NotAuthenticated,
exceptions.AuthenticationFailed)):
# WWW-Authenticate header for 401 responses, else coerce to 403
auth_header = self.get_authenticate_header(self.request)
if auth_header:
exc.auth_header = auth_header
else:
exc.status_code = status.HTTP_403_FORBIDDEN
exception_handler = self.get_exception_handler()
context = self.get_exception_handler_context()
response = exception_handler(exc, context)
if response is None:
self.raise_uncaught_exception(exc)
response.exception = True
return response
http_method_not_allowed¶
If request.method
does not correspond to a handler method,
determine what kind of exception to raise.
View Source
initial¶
Runs anything that needs to occur prior to calling the method handler.
View Source
def initial(self, request, *args, **kwargs):
"""
Runs anything that needs to occur prior to calling the method handler.
"""
self.format_kwarg = self.get_format_suffix(**kwargs)
# Perform content negotiation and store the accepted info on the request
neg = self.perform_content_negotiation(request)
request.accepted_renderer, request.accepted_media_type = neg
# Determine the API version, if versioning is in use.
version, scheme = self.determine_version(request, *args, **kwargs)
request.version, request.versioning_scheme = version, scheme
# Ensure that the incoming request is permitted
self.perform_authentication(request)
self.check_permissions(request)
self.check_throttles(request)
initialize_request¶
Set the .action
attribute on the view, depending on the request method.
View Source
def initialize_request(self, request, *args, **kwargs):
"""
Set the `.action` attribute on the view, depending on the request method.
"""
request = super().initialize_request(request, *args, **kwargs)
method = request.method.lower()
if method == 'options':
# This is a special case as we always provide handling for the
# options method in the base `View` class.
# Unlike the other explicitly defined actions, 'metadata' is implicit.
self.action = 'metadata'
else:
self.action = self.action_map.get(method)
return request
options¶
Handler method for HTTP 'OPTIONS' request.
View Source
def options(self, request, *args, **kwargs):
"""
Handler method for HTTP 'OPTIONS' request.
"""
if self.metadata_class is None:
return self.http_method_not_allowed(request, *args, **kwargs)
data = self.metadata_class().determine_metadata(request, self)
return Response(data, status=status.HTTP_200_OK)
perform_authentication¶
Perform authentication on the incoming request.
Note that if you override this and simply 'pass', then authentication
will instead be performed lazily, the first time either
request.user
or request.auth
is accessed.
View Source
perform_content_negotiation¶
Determine which renderer and media type to use render the response.
View Source
def perform_content_negotiation(self, request, force=False):
"""
Determine which renderer and media type to use render the response.
"""
renderers = self.get_renderers()
conneg = self.get_content_negotiator()
try:
return conneg.select_renderer(request, renderers, self.format_kwarg)
except Exception:
if force:
return (renderers[0], renderers[0].media_type)
raise
permission_denied¶
If request is not permitted, determine what kind of exception to raise.
View Source
def permission_denied(self, request, message=None, code=None):
"""
If request is not permitted, determine what kind of exception to raise.
"""
if request.authenticators and not request.successful_authenticator:
raise exceptions.NotAuthenticated()
raise exceptions.PermissionDenied(detail=message, code=code)
raise_uncaught_exception¶
View Source
reverse_action¶
Reverse the action for the given url_name
.
View Source
def reverse_action(self, url_name, *args, **kwargs):
"""
Reverse the action for the given `url_name`.
"""
url_name = '%s-%s' % (self.basename, url_name)
namespace = None
if self.request and self.request.resolver_match:
namespace = self.request.resolver_match.namespace
if namespace:
url_name = namespace + ':' + url_name
kwargs.setdefault('request', self.request)
return reverse(url_name, *args, **kwargs)
setup¶
Initialize attributes shared by all view methods.
View Source
throttled¶
If request is throttled, determine what kind of exception to raise.