Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added callback sender #39

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/tesk_core/callback_sender.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import logging

import requests


class CallbackSender:
"""Implemention for the callback sender class."""

def __init__(self, task_id="", url=""):
self.url = url
self.task_id = task_id

def send(self, state):
"""Send the state to the callback receiver.
Args:
state (str): Descriptor of the state of the task.
Returns:
response (requests.models.Response): Response from the callback receiver.
None: if the callback receiver is not set or some error occurs.
"""

if not self.url:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the task ID is not set?

return None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to log something here.

sent = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess instead of a while loop you could use a for loop with range(number_of_retries). That way you could get rid of retries and manual incrementing. You could also use break instead of sent = True and get rid of sent.

retries = 0
response = None
while not sent:
try:
data = {"id": self.task_id, "state": state}
headers = {"Content-Type": "application/json"}
response = requests.post(self.url, json=data, headers=headers)
sent = True
except requests.exceptions.Timeout:
retries += 1
if retries > 3:
logging.error("Callback Timeout")
break
continue
except requests.exceptions.TooManyRedirects as err:
logging.error("Bad URL: %s", err)
break
except requests.exceptions.RequestException as err:
logging.error(err)
break

return response
13 changes: 12 additions & 1 deletion src/tesk_core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
from kubernetes import client, config
from kubernetes.client.rest import ApiException
from tesk_core.Util import pprint
from tesk_core.callback_sender import CallbackSender


logging.basicConfig(format='%(message)s', level=logging.INFO)
class Job:
def __init__(self, body, name='task-job', namespace='default'):
def __init__(self, body, name='task-job', namespace='default', callback_url=None):
self.name = name
self.namespace = namespace
self.status = 'Initialized'
Expand All @@ -17,6 +18,10 @@ def __init__(self, body, name='task-job', namespace='default'):
self.timeout = 240
self.body = body
self.body['metadata']['name'] = self.name
self.callback = None
if callback_url:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to check explicitly with if callback_url is not None:

task_name = '-'.join(name.split('-')[:2])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite sure why you process the task name. Is the processed version the one that the user knows (i.e., the one that is returned when POSTing a new task)?

self.callback = CallbackSender(task_name, callback_url)

def run_to_completion(self, poll_interval, check_cancelled, pod_timeout):

Expand All @@ -34,9 +39,15 @@ def run_to_completion(self, poll_interval, check_cancelled, pod_timeout):
raise ApiException(ex.status, ex.reason)
is_all_pods_running = False
status, is_all_pods_running = self.get_status(is_all_pods_running)
# notify the callback receiver that the job is running
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code is obvious enough that it doesn't need a comment. I think the same goes for all other places where you call callback.send(), perhaps with the exception of this comment in the taskmaster module:

            # send "SYSTEM_ERROR" to callback receiver if taskmaster completes
            # but the output filer fails

if self.callback and status == 'Running':
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, better to compare explicity against None, i.e., if self.callback is not None [...]. Same below.

self.callback.send('RUNNING')
while status == 'Running':
if check_cancelled():
self.delete()
# notify the callback receiver that the task is cancelled
if self.callback:
self.callback.send('CANCELED')
return 'Cancelled'
time.sleep(poll_interval)
status, is_all_pods_running = self.get_status(is_all_pods_running)
Expand Down
37 changes: 31 additions & 6 deletions src/tesk_core/taskmaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,25 @@
import json
import os
import re
from subprocess import call
import sys
import logging
from kubernetes import client, config
from tesk_core.job import Job
from tesk_core.pvc import PVC
from tesk_core.filer_class import Filer
from tesk_core.callback_sender import CallbackSender

created_jobs = []
poll_interval = 5
task_volume_basename = 'task-volume'
args = None
logger = None
callback = CallbackSender()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand this code. At this stage, CallbackSender misses required information, the URL and the task name. Now, it appears that this is filled in when newParser() is called. But when does that happen? And who/what sets the environment variable CALLBACK_URL and when? And can you be sure that data['executors'][0]['metadata']['labels']['taskmaster-name'] is always set and correct when newParser() is called? And can you be sure that callback.url and callback.task_id are always set (and correctly) whenever callback.send is called?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, setting callback as a global and changing it in newParser() is quite a side effect. Wouldn't it be better to add a setter method to CallbackSender and call that somewhere (ideally not in newParser() itself, but in the context where newParser() is called?


def run_executor(executor, namespace, pvc=None):
# notify the callback receiver that an executor is queued
callback.send('QUEUED')
jobname = executor['metadata']['name']
spec = executor['spec']['template']['spec']

Expand All @@ -31,14 +36,17 @@ def run_executor(executor, namespace, pvc=None):
volumes.extend([{'name': task_volume_basename, 'persistentVolumeClaim': {
'readonly': False, 'claimName': pvc.name}}])
logger.debug('Created job: ' + jobname)
job = Job(executor, jobname, namespace)
job = Job(executor, jobname, namespace, callback.url)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point you already have a callback instance. Why not use that?

logger.debug('Job spec: ' + str(job.body))

global created_jobs
created_jobs.append(job)

status = job.run_to_completion(poll_interval, check_cancelled,args.pod_timeout)
if status != 'Complete':
# notify the callback receiver about the error
if status in ('Failed', 'Error'):
callback.send('EXECUTOR_ERROR')
if status == 'Error':
job.delete()
exit_cancelled('Got status ' + status)
Expand Down Expand Up @@ -98,6 +106,8 @@ def generate_mounts(data, pvc):


def init_pvc(data, filer):
# notify the callback receiver that pvc initialization is queued
callback.send('QUEUED')
task_name = data['executors'][0]['metadata']['labels']['taskmaster-name']
pvc_name = task_name + '-pvc'
pvc_size = data['resources']['disk_gb']
Expand Down Expand Up @@ -127,6 +137,9 @@ def init_pvc(data, filer):
# filerjob.run_to_completion(poll_interval)
status = filerjob.run_to_completion(poll_interval, check_cancelled, args.pod_timeout)
if status != 'Complete':
# notify the callback receiver about the error
if status in ('Failed', 'Error'):
callback.send('SYSTEM_ERROR')
exit_cancelled('Got status ' + status)

return pvc
Expand All @@ -150,10 +163,10 @@ def run_task(data, filer_name, filer_version):

pvc = init_pvc(data, filer)

# run executors
for executor in data['executors']:
run_executor(executor, args.namespace, pvc)

# run executors
logging.debug("Finished running executors")

# upload files and delete pvc
Expand All @@ -167,12 +180,18 @@ def run_task(data, filer_name, filer_version):
created_jobs.append(filerjob)

# filerjob.run_to_completion(poll_interval)
status = filerjob.run_to_completion(poll_interval, check_cancelled, args.pod_timeout)
if status != 'Complete':
exit_cancelled('Got status ' + status)
filer_status = filerjob.run_to_completion(poll_interval, check_cancelled, args.pod_timeout)
if filer_status != 'Complete':
# send "SYSTEM_ERROR" to callback receiver if taskmaster completes
# but the output filer fails
callback.send('SYSTEM_ERROR')
exit_cancelled('Got status ' + filer_status)
else:
pvc.delete()

# notify the callback receiver upon task completion
callback.send('COMPLETE')


def newParser():

Expand Down Expand Up @@ -283,10 +302,17 @@ def main():
global created_pvc
created_pvc = None

# Fill information for callback object
callback.url = os.getenv('CALLBACK_URL', '')
callback.task_id = data['executors'][0]['metadata']['labels']['taskmaster-name']

# Check if we're cancelled during init
if check_cancelled():
callback.send('CANCELED')
exit_cancelled('Cancelled during init')

# notify the callback receiver upon its initialization
callback.send('INITIALIZING')
run_task(data, args.filer_name, args.filer_version)


Expand All @@ -297,7 +323,6 @@ def clean_on_interrupt():
job.delete()



def exit_cancelled(reason='Unknown reason'):
logger.error('Cancelling taskmaster: ' + reason)
sys.exit(0)
Expand Down