-
Notifications
You must be signed in to change notification settings - Fork 15
/
function.py
233 lines (193 loc) · 8.86 KB
/
function.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
import structlog
from django.conf import settings
from django.db import models
from django.http import Http404
from django.urls import reverse
from django_filters.rest_framework import BaseInFilter
from django_filters.rest_framework import DateTimeFromToRangeFilter
from django_filters.rest_framework import DjangoFilterBackend
from django_filters.rest_framework import FilterSet
from rest_framework import mixins
from rest_framework import status
from rest_framework.decorators import action
from rest_framework.filters import OrderingFilter
from rest_framework.viewsets import GenericViewSet
from api.errors import AlreadyExistsError
from api.models import Function
from api.serializers import FunctionSerializer
from api.views.filters_utils import CharInFilter
from api.views.filters_utils import MatchFilter
from api.views.filters_utils import ProcessPermissionFilter
from api.views.utils import ApiResponse
from api.views.utils import CustomFileResponse
from api.views.utils import PermissionMixin
from api.views.utils import ValidationExceptionError
from api.views.utils import get_channel_name
from api.views.utils import to_string_uuid
from api.views.utils import validate_key
from api.views.utils import validate_metadata
from libs.pagination import DefaultPageNumberPagination
from substrapp.models import Function as FunctionFiles
from substrapp.models import FunctionImage
from substrapp.orchestrator import get_orchestrator_client
from substrapp.serializers import FunctionSerializer as FunctionFilesSerializer
from substrapp.utils import get_hash
from substrapp.utils import get_owner
logger = structlog.get_logger(__name__)
def _register_in_orchestrator(request, basename, instance):
"""Register function in orchestrator."""
current_site = settings.DEFAULT_DOMAIN
permissions = request.data.get("permissions", {})
orc_function = {
"key": str(instance.key),
"name": request.data.get("name"),
"description": {
"checksum": get_hash(instance.description),
"storage_address": current_site + reverse("api:function-description", args=[instance.key]),
},
"function": {
"checksum": instance.checksum,
"storage_address": current_site + reverse("api:function-file", args=[instance.key]),
},
"new_permissions": {
"public": permissions.get("public"),
"authorized_ids": permissions.get("authorized_ids"),
},
"metadata": validate_metadata(request.data.get("metadata")),
"inputs": request.data["inputs"],
"outputs": request.data["outputs"],
}
with get_orchestrator_client(get_channel_name(request)) as client:
return client.register_function(orc_function)
def create(request, basename, get_success_headers):
"""Create a new function.
The workflow is composed of several steps:
- Save files in local database to get the addresses.
- Register asset in the orchestrator.
- Save metadata in local database.
"""
# Step1: save files in local database
file = request.data.get("file")
try:
checksum = get_hash(file)
except Exception as e:
raise ValidationExceptionError(e.args, "(not computed)", status.HTTP_400_BAD_REQUEST)
serializer = FunctionFilesSerializer(
data={"file": file, "description": request.data.get("description"), "checksum": checksum}
)
try:
serializer.is_valid(raise_exception=True)
except Exception as e:
raise ValidationExceptionError(e.args, "(not computed)", status.HTTP_400_BAD_REQUEST)
instance = serializer.save()
# Step2: register asset in orchestrator
try:
api_data = _register_in_orchestrator(request, basename, instance)
except Exception:
instance.delete() # warning: post delete signals are not executed by django rollback
raise
# Step3: save metadata in local database
api_data["channel"] = get_channel_name(request)
api_serializer = FunctionSerializer(data=api_data)
try:
api_serializer.save_if_not_exists()
except AlreadyExistsError:
# May happen if the events app already processed the event pushed by the orchestrator
function = Function.objects.get(key=api_data["key"])
data = FunctionSerializer(function).data
except Exception:
instance.delete() # warning: post delete signals are not executed by django rollback
raise
else:
data = api_serializer.data
# Returns function metadata from local database (and function data)
# to ensure consistency between GET and CREATE views
data.update(serializer.data)
# Return ApiResponse
headers = get_success_headers(data)
return ApiResponse(data, status=status.HTTP_201_CREATED, headers=headers)
class FunctionFilter(FilterSet):
creation_date = DateTimeFromToRangeFilter()
compute_plan_key = CharInFilter(field_name="compute_tasks__compute_plan__key", label="compute_plan_key")
class Meta:
model = Function
fields = {
"owner": ["exact"],
"key": ["exact"],
"name": ["exact"],
}
filter_overrides = {
models.CharField: {
"filter_class": BaseInFilter,
"extra": lambda f: {
"lookup_expr": "in",
},
},
models.UUIDField: {
"filter_class": BaseInFilter,
"extra": lambda f: {
"lookup_expr": "in",
},
},
}
class FunctionViewSetConfig:
serializer_class = FunctionSerializer
filter_backends = (OrderingFilter, MatchFilter, DjangoFilterBackend, ProcessPermissionFilter)
ordering_fields = ["creation_date", "key", "name", "owner"]
ordering = ["creation_date", "key"]
pagination_class = DefaultPageNumberPagination
filterset_class = FunctionFilter
def get_queryset(self):
return Function.objects.filter(channel=get_channel_name(self.request))
class FunctionViewSet(
FunctionViewSetConfig, mixins.RetrieveModelMixin, mixins.ListModelMixin, mixins.CreateModelMixin, GenericViewSet
):
def create(self, request, *args, **kwargs):
return create(request, self.basename, lambda data: self.get_success_headers(data))
def update(self, request, *args, **kwargs):
function = self.get_object()
name = request.data.get("name")
orc_function = {
"key": str(function.key),
"name": name,
}
# send update to orchestrator
# the modification in local db will be done upon corresponding event reception
with get_orchestrator_client(get_channel_name(request)) as client:
client.update_function(orc_function)
return ApiResponse({}, status=status.HTTP_200_OK)
class CPFunctionViewSet(FunctionViewSetConfig, mixins.ListModelMixin, GenericViewSet):
def get_queryset(self):
compute_plan_key = self.kwargs.get("compute_plan_pk")
validate_key(compute_plan_key)
queryset = super().get_queryset()
return queryset.filter(compute_tasks__compute_plan__key=compute_plan_key).distinct()
class FunctionPermissionViewSet(PermissionMixin, GenericViewSet):
queryset = FunctionFiles.objects.all()
serializer_class = FunctionFilesSerializer
@action(detail=True)
def file(self, request, *args, **kwargs):
return self.download_file(request, Function, "file", "function_address")
# actions cannot be named "description"
# https://github.com/encode/django-rest-framework/issues/6490
# for some of the restricted names see:
# https://www.django-rest-framework.org/api-guide/viewsets/#introspecting-viewset-actions
@action(detail=True, url_path="description", url_name="description")
def description_(self, request, *args, **kwargs):
return self.download_file(request, Function, "description", "description_address")
@action(detail=True)
def image(self, request, *args, **kwargs):
# TODO refactor the code duplication with api.views.utils.PermissionMixin.download_file
channel_name = get_channel_name(request)
lookup_url_kwarg = self.lookup_url_kwarg or self.lookup_field
key = to_string_uuid(self.kwargs[lookup_url_kwarg])
function = Function.objects.filter(channel=channel_name).get(key=key)
if get_owner() != function.get_owner():
return Http404("The function image is only available on the backend who owns the function.")
try:
function_image = FunctionImage.objects.get(function__key=function.key)
except FunctionImage.DoesNotExist:
return Http404(f"The function image asociated with key {key} is not found.")
# TODO we love hard-coded size, see also api.views.utils.PermissionMixin._download_remote_file
response = CustomFileResponse(streaming_content=(chunk for chunk in function_image.file.chunks(512 * 1024)))
return response