Skip to content

Commit

Permalink
Merge pull request #45 from salehsedghpour/fix/unexpected_output
Browse files Browse the repository at this point in the history
Fix/unexpected output
  • Loading branch information
alekodu authored Apr 18, 2022
2 parents c677f19 + 4f109b2 commit 2c016f0
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 19 deletions.
12 changes: 12 additions & 0 deletions model/restful/resources/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,16 @@ def get(self, endpoint=None):
return res
not_found(endpoint)

def post(self, endpoint=None):
if endpoint is None:
message = {"status": "ok"}
return message
else:
for ep in SERVICE_CONFIG['endpoints']:
if ep['name'] == endpoint:
res = task.run_task(service_endpoint=ep)
return res
not_found(endpoint)



65 changes: 46 additions & 19 deletions model/restful/utils/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

# TODO: So far, we only support a hard-coded namespace. For more flexible support of namespaces we will need to pass that info as part of the config map
# TODO: So far, we only support http client
FORMATTED_REMOTE_URL = "http://{0}:{1}{2}"
FORMATTED_REMOTE_URL = "http://{0}:{1}/{2}"


def getForwardHeaders(request):
Expand All @@ -43,39 +43,67 @@ def getForwardHeaders(request):
return headers


async def run_task(service_endpoint):
def run_task(service_endpoint):
headers = getForwardHeaders(request)

if service_endpoint["forward_requests"] == "asynchronous":
async with ClientSession() as session:
# TODO: CPU-bounded tasks not supported yet
io_tasks = []
asyncio.set_event_loop(asyncio.new_event_loop())
loop = asyncio.get_event_loop()
response = loop.run_until_complete(async_tasks(service_endpoint, headers))
return response
else: # "synchronous"
asyncio.set_event_loop(asyncio.new_event_loop())
loop = asyncio.get_event_loop()
response = loop.run_until_complete(sync_tasks(service_endpoint, headers))

return response


async def async_tasks(service_endpoint, headers):
async with ClientSession() as session:
# TODO: CPU-bounded tasks not supported yet
io_tasks = []

if request.get_data() == bytes("", "utf-8"):
json_data = {}
else:
json_data = request.json
if len(service_endpoint["called_services"]) > 0:
for svc in service_endpoint["called_services"]:
io_task = asyncio.create_task(execute_io_bounded_task(session=session, target_service=svc, json_data=request.json, forward_headers=headers))
io_task = asyncio.create_task(execute_io_bounded_task(session=session, target_service=svc,
json_data=json_data, forward_headers=headers))
io_tasks.append(io_task)
services = await asyncio.gather(*io_tasks)

# Concatenate json responses
response = {}
response["services"] = []
response["statuses"] = []
response = {}
response["services"] = []
response["statuses"] = []

if len(service_endpoint["called_services"]) > 0:
for svc in services:
response["services"] += svc["services"]
response["statuses"] += svc["statuses"]
return response
else: # "synchronous"
async with ClientSession() as session:
# TODO: CPU-bounded tasks not supported yet
response = {}
response["services"] = []
response["statuses"] = []
return response


async def sync_tasks(service_endpoint, headers):
async with ClientSession() as session:
# TODO: CPU-bounded tasks not supported yet
response = {}
response["services"] = []
response["statuses"] = []
if request.get_data() == bytes("", "utf-8"):
json_data = {}
else:
json_data = request.json
if len(service_endpoint["called_services"]) > 0:
for svc in service_endpoint["called_services"]:
res = execute_io_bounded_task(session=session, target_service=svc, json_data=request.json, forward_headers=headers)
res = await execute_io_bounded_task(session=session, target_service=svc, json_data=json_data, forward_headers=headers)
response["services"] += res["services"]
response["statuses"] += res["statuses"]

return response
return response


def execute_cpu_bounded_task(origin_service_name, target_service, headers):
Expand All @@ -101,7 +129,6 @@ def execute_cpu_bounded_task(origin_service_name, target_service, headers):
async def execute_io_bounded_task(session, target_service, json_data, forward_headers={}):
# TODO: Request forwarding to a service on a particular cluster is not supported yet
# TODO: Requests for other protocols than html are not supported yet
logger.info(target_service)

dst = FORMATTED_REMOTE_URL.format(target_service["service"], target_service["port"], target_service["endpoint"])
forward_headers.update({'Content-type' : 'application/json'})
Expand Down

0 comments on commit 2c016f0

Please sign in to comment.