This repository has been archived by the owner on Nov 20, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
routes.py
194 lines (167 loc) · 5.18 KB
/
routes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
"""Routes module."""
# Copyright 2021 The Compute-to-Data Authors
# SPDX-License-Identifier: LGPL-2.1-only
import logging
import json
import os
import sqlite3
from flask_cors import CORS
from flask import Flask, request, jsonify
from datatoken.service.asset import AssetService
from datatoken.service.job import JobService
from dsb.config import Config
from dsb.runner import Runner
from dsb.utils import get_dt_store
app = Flask(__name__)
CORS(app)
config = Config()
owner_address = config.wallet.address
asset_service = AssetService(config)
job_service = JobService(config)
@app.route('/insertDtStore', methods=['POST'])
def insert_dtstore():
"""
tags:
- services
consumes:
- application/json
parameters:
- name: store_path
in: query
description: data storage location
type: string
required: True
- name: dt
in: query
description: the data token owned by this provider
type: string
required: True
- name: metadata
in: query
description: the metadata for the data
type: string
required: True
responses:
200:
description: Success
400:
description: Error
"""
try:
data = json.loads(request.get_data())
store_path = data.get("store_path")
dt = data.get("dt")
metadata = json.dumps(data.get("metadata"))
conn = sqlite3.connect(config.db_name)
cur = conn.cursor()
sql_update_query = """
UPDATE dtstore
SET datatoken = ?
WHERE filepath = ?
"""
cur.execute(sql_update_query, (dt, store_path))
conn.commit()
return jsonify(store_path=store_path, dt=dt, result="Success"), 200
except Exception as e:
logging.error(f'Exception when inserting datatoken to dtstore: {e}')
return jsonify(error="Error"), 400
@app.route('/grantPermission', methods=['POST'])
def grant_permission():
"""
tags:
- services
consumes:
- application/json
parameters:
- name: algo_dt
in: query
description: the algorithm data token
type: string
required: True
- name: dt
in: query
description: the data token owned by this provider
type: string
required: True
- name: signature
in: query
description: the signature signed by aggregator
type: string
required: True
responses:
200:
description: Success
400:
description: Error
"""
try:
data = json.loads(request.get_data())
algo_dt = data.get("algo_dt")
dt = data.get("dt")
signature = data.get("signature")
data_store = get_dt_store(dt)
if not data_store or not os.path.exists(data_store):
logging.error(f'Asset is not available now: {data_store}')
return jsonify(error="Error"), 400
if not asset_service.check_service_terms(algo_dt, dt, owner_address, signature):
logging.error('Service agreements are not satisfied')
return jsonify(error="Error"), 400
asset_service.grant_dt_perm(dt, algo_dt, config.wallet)
return jsonify(algo_dt=algo_dt, dt=dt, result="Success"), 200
except Exception as e:
logging.error(f'Exception when granting permission: {e}')
return jsonify(error="Error"), 400
@app.route('/onPremiseCompute', methods=['POST'])
def on_premise_compute():
"""
tags:
- services
consumes:
- application/json
parameters:
- name: job_id
in: query
description: the job id for computation
type: int
required: True
- name: algo_dt
in: query
description: the algorithm data token
type: string
required: True
- name: dt
in: query
description: the data token owned by this provider
type: string
required: True
- name: signature
in: query
description: the signature signed by consumer
type: string
required: True
responses:
200:
description: Success
400:
description: Error
"""
try:
data = json.loads(request.get_data())
job_id = data.get("job_id")
algo_dt = data.get("algo_dt")
dt = data.get("dt")
signature = data.get("signature")
data_store = get_dt_store(dt)
if not data_store or not os.path.exists(data_store):
logging.error(f'Asset is not available now: {data_store}')
return jsonify(error="Error"), 400
if not job_service.check_remote_compute(algo_dt, dt, job_id, owner_address, signature):
logging.error('Remote access is not allowed')
return jsonify(error="Error"), 400
runner = Runner()
runner.prepare_resources(job_id, algo_dt, dt)
runner.execute()
return jsonify(job_id=job_id, algo_dt=algo_dt, dt=dt, result="Success"), 200
except Exception as e:
logging.error(f'Exception when lauching computation: {e}')
return jsonify(error="Error"), 400