Module fl_server_api.openapi¶
View Source
# SPDX-FileCopyrightText: 2024 Benedikt Franke <benedikt.franke@dlr.de>
# SPDX-FileCopyrightText: 2024 Florian Heinrich <florian.heinrich@dlr.de>
#
# SPDX-License-Identifier: Apache-2.0
from docstring_parser import Docstring, parse, RenderingStyle
from docstring_parser.google import compose
from drf_spectacular.authentication import BasicScheme
from drf_spectacular.openapi import AutoSchema
from drf_spectacular.utils import OpenApiExample, OpenApiResponse
from inspect import cleandoc
from typing import Callable, List, Optional, Tuple
from .serializers.generic import ErrorSerializer
from .utils import fullname
class BasicAuthAllowingTokenAuthInUrlScheme(BasicScheme):
"""
A class that extends the BasicScheme to allow token authentication in the URL.
"""
target_class = "fl_server_api.views.base.BasicAuthAllowingTokenAuthInUrl"
priority = 0
def create_error_response(
response_description: Optional[str],
example_name: str,
example_details: str,
example_description: Optional[str],
**example_kwargs
) -> OpenApiResponse:
"""
Create an OpenAPI error response.
Args:
response_description (Optional[str]): The description of the response.
example_name (str): The name of the example.
example_details (str): The details of the example.
example_description (Optional[str]): The description of the example.
**example_kwargs: Additional keyword arguments for the example.
Returns:
OpenApiResponse: The created OpenAPI response.
"""
return OpenApiResponse(
response=ErrorSerializer,
description=response_description,
examples=[
OpenApiExample(
example_name,
value={"details": example_details},
description=example_description,
**example_kwargs,
)
]
)
error_response_403 = create_error_response(
"Unauthorized",
"Unauthorized",
"Authentication credentials were not provided.",
"Do not forget to authorize first!"
)
"""Generic OpenAPI 403 response."""
error_response_404 = create_error_response(
"Not found",
"Not found",
"The server cannot find the requested resource.",
"Provide valid request data."
)
"""Generic OpenAPI 404 response."""
def custom_preprocessing_hook(endpoints: List[Tuple[str, str, str, Callable]]):
"""
Hide the "/api/dummy/" endpoint from the OpenAPI schema.
Args:
endpoints (List[Tuple[str, str, str, Callable]]): The list of endpoints.
Returns:
Iterator: The filtered list of endpoints.
"""
# your modifications to the list of operations that are exposed in the schema
# for (path, path_regex, method, callback) in endpoints:
# pass
return filter(lambda endpoint: endpoint[0] != "/api/dummy/", endpoints)
class CustomAutoSchema(AutoSchema):
"""
A custom AutoSchema that includes the documented examples from the Docstrings in the description.
"""
show_examples = True
"""Flag to include examples in the description."""
rendering_style = RenderingStyle.CLEAN
"""Docstring rendering style."""
def _get_docstring(self):
"""
Get the docstring of the view.
This method parses the description of the view.
Returns:
Docstring: The parsed docstring.
"""
return parse(super().get_description())
def _get_param_docstring(self, docstring: Docstring, argument: str) -> Optional[str]:
"""
Get the docstring of a parameter.
This method finds the parameter in the docstring and returns its description.
Args:
docstring (Docstring): The docstring.
argument (str): The name of the argument.
Returns:
Optional[str]: The description of the argument, or `None` if the argument is not found.
"""
params = [p for p in docstring.params if p.arg_name == argument]
if not params:
return None
return params[0].description
def get_description(self):
"""
Get the description of the view including its examples (if desired) formatted as markdown.
Returns:
str: The description of the view as markdown.
"""
docstring = self._get_docstring()
tmp_docstring = Docstring(style=docstring.style)
tmp_docstring.short_description = docstring.short_description
tmp_docstring.long_description = docstring.long_description
if self.show_examples:
tmp_docstring.meta.extend(docstring.examples)
desc = compose(tmp_docstring, self.rendering_style, indent="")
if self.show_examples and desc.__contains__("Examples:"):
# customize examples section:
# - examples should be in a new paragraph (not concatenated with the description)
# - the examples header should be a h3 title
desc = desc.replace("\nExamples:\n", "\n\n### Examples:\n\n")
desc = cleandoc(desc)
return desc
def _resolve_path_parameters(self, variables: List[str]):
"""
Resolve the path parameters of the view and set their descriptions if they are missing.
Args:
variables (List[str]): The list of variables in the path.
Returns:
list: The list of path parameters.
"""
parameters = super()._resolve_path_parameters(variables)
docstring = self._get_docstring()
for parameter in parameters:
if "description" not in parameter:
description = self._get_param_docstring(docstring, parameter["name"])
if description:
parameter["description"] = description
return parameters
def get_operation_id(self):
"""
Get the operation ID which is the fully qualified name of the corresponding view/action/method.
Returns:
str: The operation ID.
"""
action_or_method = getattr(self.view, getattr(self.view, 'action', self.method.lower()), None)
return fullname(action_or_method or self.view.__class__)
Variables¶
Generic OpenAPI 403 response.
Generic OpenAPI 404 response.
Functions¶
create_error_response¶
def create_error_response(
response_description: Optional[str],
example_name: str,
example_details: str,
example_description: Optional[str],
**example_kwargs
) -> drf_spectacular.utils.OpenApiResponse
Create an OpenAPI error response.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
| response_description | Optional[str] | The description of the response. | None |
| example_name | str | The name of the example. | None |
| example_details | str | The details of the example. | None |
| example_description | Optional[str] | The description of the example. | None |
| **example_kwargs | None | Additional keyword arguments for the example. | None |
Returns:
| Type | Description |
|---|---|
| OpenApiResponse | The created OpenAPI response. |
View Source
def create_error_response(
response_description: Optional[str],
example_name: str,
example_details: str,
example_description: Optional[str],
**example_kwargs
) -> OpenApiResponse:
"""
Create an OpenAPI error response.
Args:
response_description (Optional[str]): The description of the response.
example_name (str): The name of the example.
example_details (str): The details of the example.
example_description (Optional[str]): The description of the example.
**example_kwargs: Additional keyword arguments for the example.
Returns:
OpenApiResponse: The created OpenAPI response.
"""
return OpenApiResponse(
response=ErrorSerializer,
description=response_description,
examples=[
OpenApiExample(
example_name,
value={"details": example_details},
description=example_description,
**example_kwargs,
)
]
)
custom_preprocessing_hook¶
Hide the "/api/dummy/" endpoint from the OpenAPI schema.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
| endpoints | List[Tuple[str, str, str, Callable]] | The list of endpoints. | None |
Returns:
| Type | Description |
|---|---|
| Iterator | The filtered list of endpoints. |
View Source
def custom_preprocessing_hook(endpoints: List[Tuple[str, str, str, Callable]]):
"""
Hide the "/api/dummy/" endpoint from the OpenAPI schema.
Args:
endpoints (List[Tuple[str, str, str, Callable]]): The list of endpoints.
Returns:
Iterator: The filtered list of endpoints.
"""
# your modifications to the list of operations that are exposed in the schema
# for (path, path_regex, method, callback) in endpoints:
# pass
return filter(lambda endpoint: endpoint[0] != "/api/dummy/", endpoints)
Classes¶
BasicAuthAllowingTokenAuthInUrlScheme¶
A class that extends the BasicScheme to allow token authentication in the URL.
View Source
Ancestors (in MRO)¶
- drf_spectacular.authentication.BasicScheme
- drf_spectacular.extensions.OpenApiAuthenticationExtension
- drf_spectacular.plumbing.OpenApiGeneratorExtension
- typing.Generic
Class variables¶
Static methods¶
get_match¶
View Source
Methods¶
get_security_definition¶
View Source
get_security_requirement¶
def get_security_requirement(
self,
auto_schema: 'AutoSchema'
) -> Union[Dict[str, List[Any]], List[Dict[str, List[Any]]]]
View Source
CustomAutoSchema¶
A custom AutoSchema that includes the documented examples from the Docstrings in the description.
View Source
class CustomAutoSchema(AutoSchema):
"""
A custom AutoSchema that includes the documented examples from the Docstrings in the description.
"""
show_examples = True
"""Flag to include examples in the description."""
rendering_style = RenderingStyle.CLEAN
"""Docstring rendering style."""
def _get_docstring(self):
"""
Get the docstring of the view.
This method parses the description of the view.
Returns:
Docstring: The parsed docstring.
"""
return parse(super().get_description())
def _get_param_docstring(self, docstring: Docstring, argument: str) -> Optional[str]:
"""
Get the docstring of a parameter.
This method finds the parameter in the docstring and returns its description.
Args:
docstring (Docstring): The docstring.
argument (str): The name of the argument.
Returns:
Optional[str]: The description of the argument, or `None` if the argument is not found.
"""
params = [p for p in docstring.params if p.arg_name == argument]
if not params:
return None
return params[0].description
def get_description(self):
"""
Get the description of the view including its examples (if desired) formatted as markdown.
Returns:
str: The description of the view as markdown.
"""
docstring = self._get_docstring()
tmp_docstring = Docstring(style=docstring.style)
tmp_docstring.short_description = docstring.short_description
tmp_docstring.long_description = docstring.long_description
if self.show_examples:
tmp_docstring.meta.extend(docstring.examples)
desc = compose(tmp_docstring, self.rendering_style, indent="")
if self.show_examples and desc.__contains__("Examples:"):
# customize examples section:
# - examples should be in a new paragraph (not concatenated with the description)
# - the examples header should be a h3 title
desc = desc.replace("\nExamples:\n", "\n\n### Examples:\n\n")
desc = cleandoc(desc)
return desc
def _resolve_path_parameters(self, variables: List[str]):
"""
Resolve the path parameters of the view and set their descriptions if they are missing.
Args:
variables (List[str]): The list of variables in the path.
Returns:
list: The list of path parameters.
"""
parameters = super()._resolve_path_parameters(variables)
docstring = self._get_docstring()
for parameter in parameters:
if "description" not in parameter:
description = self._get_param_docstring(docstring, parameter["name"])
if description:
parameter["description"] = description
return parameters
def get_operation_id(self):
"""
Get the operation ID which is the fully qualified name of the corresponding view/action/method.
Returns:
str: The operation ID.
"""
action_or_method = getattr(self.view, getattr(self.view, 'action', self.method.lower()), None)
return fullname(action_or_method or self.view.__class__)
Ancestors (in MRO)¶
- drf_spectacular.openapi.AutoSchema
- rest_framework.schemas.inspectors.ViewInspector
Descendants¶
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
- drf_spectacular.utils.ExtendedSchema
Class variables¶
Docstring rendering style.
Flag to include examples in the description.
Instance variables¶
View property.
Methods¶
get_auth¶
Obtains authentication classes and permissions from view. If authentication
is known, resolve security requirement for endpoint and security definition for
the component section.
For custom authentication subclass OpenApiAuthenticationExtension.
View Source
def get_auth(self):
"""
Obtains authentication classes and permissions from view. If authentication
is known, resolve security requirement for endpoint and security definition for
the component section.
For custom authentication subclass ``OpenApiAuthenticationExtension``.
"""
auths = []
for authenticator in self.view.get_authenticators():
if not whitelisted(authenticator, spectacular_settings.AUTHENTICATION_WHITELIST, True):
continue
scheme = OpenApiAuthenticationExtension.get_match(authenticator)
if not scheme:
warn(
f'could not resolve authenticator {authenticator.__class__}. There '
f'was no OpenApiAuthenticationExtension registered for that class. '
f'Try creating one by subclassing it. Ignoring for now.'
)
continue
security_requirements = scheme.get_security_requirement(self)
if security_requirements is not None:
if isinstance(security_requirements, dict):
auths.append(security_requirements)
else:
auths.extend(security_requirements)
if isinstance(scheme.name, str):
names, definitions = [scheme.name], [scheme.get_security_definition(self)]
else:
names, definitions = scheme.name, scheme.get_security_definition(self)
for name, definition in zip(names, definitions):
self.registry.register_on_missing(
ResolvedComponent(
name=name,
type=ResolvedComponent.SECURITY_SCHEMA,
object=authenticator.__class__,
schema=definition
)
)
if spectacular_settings.SECURITY:
auths.extend(spectacular_settings.SECURITY)
perms = [p.__class__ for p in self.view.get_permissions()]
if permissions.AllowAny in perms:
auths.append({})
elif permissions.IsAuthenticatedOrReadOnly in perms and self.method in permissions.SAFE_METHODS:
auths.append({})
return auths
get_callbacks¶
override this for custom behaviour
View Source
get_description¶
Get the description of the view including its examples (if desired) formatted as markdown.
Returns:
| Type | Description |
|---|---|
| str | The description of the view as markdown. |
View Source
def get_description(self):
"""
Get the description of the view including its examples (if desired) formatted as markdown.
Returns:
str: The description of the view as markdown.
"""
docstring = self._get_docstring()
tmp_docstring = Docstring(style=docstring.style)
tmp_docstring.short_description = docstring.short_description
tmp_docstring.long_description = docstring.long_description
if self.show_examples:
tmp_docstring.meta.extend(docstring.examples)
desc = compose(tmp_docstring, self.rendering_style, indent="")
if self.show_examples and desc.__contains__("Examples:"):
# customize examples section:
# - examples should be in a new paragraph (not concatenated with the description)
# - the examples header should be a h3 title
desc = desc.replace("\nExamples:\n", "\n\n### Examples:\n\n")
desc = cleandoc(desc)
return desc
get_examples¶
override this for custom behaviour
get_extensions¶
get_external_docs¶
override this for custom behaviour
get_filter_backends¶
override this for custom behaviour
View Source
get_operation¶
def get_operation(
self,
path,
path_regex,
path_prefix,
method,
registry: drf_spectacular.plumbing.ComponentRegistry
)
View Source
def get_operation(self, path, path_regex, path_prefix, method, registry: ComponentRegistry):
self.registry = registry
self.path = path
self.path_regex = path_regex
self.path_prefix = path_prefix
self.method = method.upper()
if self.is_excluded():
return None
operation = {'operationId': self.get_operation_id()}
description = self.get_description()
if description:
operation['description'] = description
summary = self.get_summary()
if summary:
operation['summary'] = summary
external_docs = self._get_external_docs()
if external_docs:
operation['externalDocs'] = external_docs
parameters = self._get_parameters()
if parameters:
operation['parameters'] = parameters
tags = self.get_tags()
if tags:
operation['tags'] = tags
request_body = self._get_request_body()
if request_body:
operation['requestBody'] = request_body
auth = self.get_auth()
if auth:
operation['security'] = auth
deprecated = self.is_deprecated()
if deprecated:
operation['deprecated'] = deprecated
operation['responses'] = self._get_response_bodies()
extensions = self.get_extensions()
if extensions:
operation.update(sanitize_specification_extensions(extensions))
callbacks = self._get_callbacks()
if callbacks:
operation['callbacks'] = callbacks
return operation
get_operation_id¶
Get the operation ID which is the fully qualified name of the corresponding view/action/method.
Returns:
| Type | Description |
|---|---|
| str | The operation ID. |
View Source
def get_operation_id(self):
"""
Get the operation ID which is the fully qualified name of the corresponding view/action/method.
Returns:
str: The operation ID.
"""
action_or_method = getattr(self.view, getattr(self.view, 'action', self.method.lower()), None)
return fullname(action_or_method or self.view.__class__)
get_override_parameters¶
override this for custom behaviour
get_paginated_name¶
get_request_serializer¶
override this for custom behaviour
View Source
get_response_serializers¶
override this for custom behaviour
View Source
get_serializer_name¶
View Source
get_summary¶
override this for custom behaviour
get_tags¶
override this for custom behaviour
View Source
is_deprecated¶
override this for custom behaviour
is_excluded¶
override this for custom behaviour
map_parsers¶
View Source
map_renderers¶
View Source
def map_renderers(self, attribute):
assert attribute in ['media_type', 'format']
# Either use whitelist or default back to old behavior by excluding BrowsableAPIRenderer
def use_renderer(r):
if spectacular_settings.RENDERER_WHITELIST is not None:
return whitelisted(r, spectacular_settings.RENDERER_WHITELIST)
else:
return not isinstance(r, renderers.BrowsableAPIRenderer)
return list(dict.fromkeys([
getattr(r, attribute).split(';')[0]
for r in self.view.get_renderers()
if use_renderer(r) and hasattr(r, attribute)
]))
resolve_serializer¶
def resolve_serializer(
self,
serializer,
direction,
bypass_extensions=False
) -> drf_spectacular.plumbing.ResolvedComponent
View Source
def resolve_serializer(self, serializer, direction, bypass_extensions=False) -> ResolvedComponent:
assert_basic_serializer(serializer)
serializer = force_instance(serializer)
with add_trace_message(serializer.__class__):
component = ResolvedComponent(
name=self._get_serializer_name(serializer, direction, bypass_extensions),
type=ResolvedComponent.SCHEMA,
object=serializer,
)
if component in self.registry:
return self.registry[component] # return component with schema
self.registry.register(component)
component.schema = self._map_serializer(serializer, direction, bypass_extensions)
discard_component = (
# components with empty schemas serve no purpose
not component.schema
# concrete component without properties are likely only transactional so discard
or (
component.schema.get('type') == 'object'
and not component.schema.get('properties')
and 'additionalProperties' not in component.schema
)
)
if discard_component:
del self.registry[component]
return ResolvedComponent(None, None) # sentinel
return component