diff --git a/api/genie/services/__init__.py b/api/genie/services/__init__.py index 4fad6bc8..fa17c5df 100644 --- a/api/genie/services/__init__.py +++ b/api/genie/services/__init__.py @@ -2,4 +2,5 @@ from .schemas import Schemas from .connections import Connections from .notebookTemplates import NotebookTemplateService -from .kubernetes import KubernetesServices \ No newline at end of file +from .kubernetes import KubernetesServices +from .schedules import ScheduleService \ No newline at end of file diff --git a/api/genie/services/notebookJobs.py b/api/genie/services/notebookJobs.py index a968c869..c2d15b02 100644 --- a/api/genie/services/notebookJobs.py +++ b/api/genie/services/notebookJobs.py @@ -8,7 +8,7 @@ from django.template import Template, Context # from django_celery_beat.models import CrontabSchedule from genie.models import NOTEBOOK_STATUS_ABORT, NOTEBOOK_STATUS_QUEUED, NOTEBOOK_STATUS_RUNNING, NOTEBOOK_STATUS_ABORTING, NotebookObject, NotebookJob, RunStatus, Connection, NotebookTemplate, CustomSchedule as Schedule -from genie.serializers import NotebookObjectSerializer, ScheduleSerializer, RunStatusSerializer +from genie.serializers import NotebookObjectSerializer, RunStatusSerializer from workflows.models import Workflow, NotebookJob as WorkflowNotebookJob from utils.apiResponse import ApiResponse from utils.zeppelinAPI import Zeppelin @@ -307,19 +307,6 @@ def addNotebookJob(notebookId: str, scheduleId: int): res.update(True, "NotebookJob added successfully", None) return res - @staticmethod - def updateNotebookJob(notebookJobId: int, scheduleId: int): - """ - Service to update crontab of an existing NotebookJob - :param notebookId: ID of the NotebookJob for which to update crontab - :param scheduleId: ID of schedule - """ - res = ApiResponse() - scheduleObj = Schedule.objects.get(crontabschedule_ptr_id=scheduleId) - NotebookJob.objects.filter(id=notebookJobId).update(crontab=scheduleObj) - res.update(True, "NotebookJob updated successfully", None) - return res - @staticmethod def deleteNotebookJob(notebookId: int): """ @@ -331,112 +318,6 @@ def deleteNotebookJob(notebookId: int): res.update(True, "NotebookJob deleted successfully", None) return res - @staticmethod - def toggleNotebookJob(notebookId: int, enabled: bool): - """ - Service to update crontab of an existing NotebookJob - :param notebookId: ID of the NotebookJob for which to update crontab - :param scheduleId: ID of schedule - """ - res = ApiResponse() - NotebookJob.objects.filter(notebookId=notebookId).update(enabled=enabled) - res.update(True, "NotebookJob updated successfully", None) - return res - - @staticmethod - def getSchedules(): - """ - Service to get all schedule objects - """ - res = ApiResponse() - schedules = Schedule.objects.exclude(id=1) - data = ScheduleSerializer(schedules, many=True).data - res.update(True, "Schedules fetched successfully", data) - return res - - @staticmethod - def addSchedule(cron: str, timezone: str = None, name: str = ""): - """ - Service to add Schedule - :param cron: Crontab in string format - :param timezone: Timezone string for which to configure Schedule - :param name: Name of schedule provided by user - """ - res = ApiResponse() - cronElements = cron.split() - if len(cronElements) != 5: - res.update(False, "Crontab must contain five elements") - return res - timezone = timezone if timezone else "UTC" - schedule = Schedule.objects.create( - minute=cronElements[0], - hour=cronElements[1], - day_of_month=cronElements[2], - month_of_year=cronElements[3], - day_of_week=cronElements[4], - timezone=timezone, - name=name, - ) - res.update(True, "Schedule added successfully", schedule.id) - return res - - @staticmethod - def getSingleSchedule(scheduleId: int): - """ - Service to get singleSchedule - :param scheduleId: int - """ - - res = ApiResponse() - schedules = Schedule.objects.filter(crontabschedule_ptr_id=scheduleId) - data = ScheduleSerializer(schedules, many=True).data - res.update(True, "Schedules fetched successfully", data) - return res - - @staticmethod - def updateSchedule(id, crontab, timezone, name): - """ - Service to update Schedule - param id: int - param cron: Crontab in string format - param timezone: Timezone in string format - param name: String - """ - res = ApiResponse() - cronElements = crontab.split(" ") - if len(cronElements) != 5: - res.update(False, "Crontab must contain five elements") - return res - schedule = Schedule.objects.get(crontabschedule_ptr_id=id) - schedule.minute=cronElements[0] - schedule.hour=cronElements[1] - schedule.day_of_month=cronElements[2] - schedule.month_of_year=cronElements[3] - schedule.day_of_week=cronElements[4] - schedule.timezone = timezone - schedule.name = name - schedule.save() - res.update(True, "Schedules updated successfully", []) - return res - - @staticmethod - def deleteSchedule(scheduleId: int): - """ Service to delete schedule of given scheduleId """ - res = ApiResponse() - Schedule.objects.filter(id=scheduleId).delete() - res.update(True, "Schedules deleted successfully", []) - return res - - @staticmethod - def getTimezones(): - """ - Service to fetch all pytz timezones - """ - res = ApiResponse() - timezones = pytz.all_timezones - res.update(True, "Timezones fetched successfully", timezones) - return res - @staticmethod def runNotebookJob(notebookId: str): """ diff --git a/api/genie/services/schedules.py b/api/genie/services/schedules.py new file mode 100644 index 00000000..e0b4baa5 --- /dev/null +++ b/api/genie/services/schedules.py @@ -0,0 +1,104 @@ +import pytz +import logging +# from django_celery_beat.models import CrontabSchedule +from genie.models import CustomSchedule as Schedule +from genie.serializers import ScheduleSerializer +from utils.apiResponse import ApiResponse + +# Get an instance of a logger +logger = logging.getLogger(__name__) + +class ScheduleService: + @staticmethod + def getSchedules(): + """ + Service to get all schedule objects + """ + res = ApiResponse() + schedules = Schedule.objects.exclude(id=1) + data = ScheduleSerializer(schedules, many=True).data + res.update(True, "Schedules fetched successfully", data) + return res + + @staticmethod + def addSchedule(cron: str, timezone: str = None, name: str = ""): + """ + Service to add Schedule + :param cron: Crontab in string format + :param timezone: Timezone string for which to configure Schedule + :param name: Name of schedule provided by user + """ + res = ApiResponse() + cronElements = cron.split() + if len(cronElements) != 5: + res.update(False, "Crontab must contain five elements") + return res + timezone = timezone if timezone else "UTC" + schedule = Schedule.objects.create( + minute=cronElements[0], + hour=cronElements[1], + day_of_month=cronElements[2], + month_of_year=cronElements[3], + day_of_week=cronElements[4], + timezone=timezone, + name=name, + ) + res.update(True, "Schedule added successfully", schedule.id) + return res + + @staticmethod + def getSingleSchedule(scheduleId: int): + """ + Service to get singleSchedule + :param scheduleId: int + """ + + res = ApiResponse() + schedules = Schedule.objects.filter(crontabschedule_ptr_id=scheduleId) + data = ScheduleSerializer(schedules, many=True).data + res.update(True, "Schedules fetched successfully", data) + return res + + @staticmethod + def updateSchedule(id, crontab, timezone, name): + """ + Service to update Schedule + param id: int + param cron: Crontab in string format + param timezone: Timezone in string format + param name: String + """ + res = ApiResponse() + cronElements = crontab.split(" ") + if len(cronElements) != 5: + res.update(False, "Crontab must contain five elements") + return res + schedule = Schedule.objects.get(crontabschedule_ptr_id=id) + schedule.minute=cronElements[0] + schedule.hour=cronElements[1] + schedule.day_of_month=cronElements[2] + schedule.month_of_year=cronElements[3] + schedule.day_of_week=cronElements[4] + schedule.timezone = timezone + schedule.name = name + schedule.save() + res.update(True, "Schedules updated successfully", []) + return res + + @staticmethod + def deleteSchedule(scheduleId: int): + """ Service to delete schedule of given scheduleId """ + res = ApiResponse() + Schedule.objects.filter(id=scheduleId).delete() + res.update(True, "Schedules deleted successfully", []) + return res + + @staticmethod + def getTimezones(): + """ + Service to fetch all pytz timezones + """ + res = ApiResponse() + timezones = pytz.all_timezones + res.update(True, "Timezones fetched successfully", timezones) + return res \ No newline at end of file diff --git a/api/genie/tests/test_views_notebookJobs.py b/api/genie/tests/test_views_notebookJobs.py index 8f33d647..ef200274 100644 --- a/api/genie/tests/test_views_notebookJobs.py +++ b/api/genie/tests/test_views_notebookJobs.py @@ -99,26 +99,6 @@ def test_notebookJob(client, populate_seed_data, mocker): assert response.status_code == 200 assert response.data['success'] == True - # TODO Test and fix update notebook job - # Test Update Notebook Schedule - # mixer.blend("genie.customSchedule", id=2) - # data = {"notebookId": "BX976MDDE", "scheduleId": 2} - # response = client.put(path, data=data, content_type="application/json") - # assert response.status_code == 200 - # assert response.data['success'] == True - - # Test disable notebook job - data = {"notebookId": "BX976MDDE", "enabled": False} - response = client.put(path, data=data, content_type="application/json") - assert response.status_code == 200 - assert response.data['success'] == True - - # Test enabled notebook job - data = {"notebookId": "BX976MDDE", "enabled": True} - response = client.put(path, data=data, content_type="application/json") - assert response.status_code == 200 - assert response.data['success'] == True - # Test delete notebook job response = client.delete(path, data=data, content_type="application/json") assert response.status_code == 200 diff --git a/api/genie/views.py b/api/genie/views.py index 97eb02a0..da8ede3d 100644 --- a/api/genie/views.py +++ b/api/genie/views.py @@ -2,7 +2,7 @@ import json from rest_framework.views import APIView from rest_framework.response import Response -from genie.services import NotebookJobServices, Connections, NotebookTemplateService, KubernetesServices, Schemas +from genie.services import NotebookJobServices, Connections, NotebookTemplateService, KubernetesServices, Schemas, ScheduleService from rest_framework.decorators import api_view class NotebookOperationsView(APIView): @@ -99,16 +99,6 @@ def post(self, request): scheduleId = request.data["scheduleId"] res = NotebookJobServices.addNotebookJob(notebookId=notebookId, scheduleId=scheduleId) return Response(res.json()) - - def put(self, request): - notebookId = request.data["notebookId"] - if "scheduleId" in request.data: - scheduleId = request.data["scheduleId"] - res = NotebookJobServices.updateNotebookJob(notebookId=notebookId, scheduleId=scheduleId) - elif "enabled" in request.data: - enabled = request.data["enabled"] - res = NotebookJobServices.toggleNotebookJob(notebookId=notebookId, enabled=enabled) - return Response(res.json()) def delete(self, request, notebookId=None): res = NotebookJobServices.deleteNotebookJob(notebookId=notebookId) @@ -119,14 +109,14 @@ class ScheduleView(APIView): Class to get and add available crontab schedules """ def get(self, request): - res = NotebookJobServices.getSchedules() + res = ScheduleService.getSchedules() return Response(res.json()) def post(self, request): name = request.data["name"] cron = request.data["crontab"] timezone = request.data["timezone"] - res = NotebookJobServices.addSchedule(cron=cron, timezone=timezone, name=name) + res = ScheduleService.addSchedule(cron=cron, timezone=timezone, name=name) return Response(res.json()) def put(self,request): @@ -134,7 +124,7 @@ def put(self,request): name = request.data["name"] crontab = request.data["crontab"] timezone = request.data["timezone"] - res = NotebookJobServices.updateSchedule(id=id, crontab=crontab, timezone=timezone, name=name) + res = ScheduleService.updateSchedule(id=id, crontab=crontab, timezone=timezone, name=name) return Response(res.json()) @api_view(["GET", "PUT", "DELETE"]) @@ -145,10 +135,10 @@ def schedule(request: HttpRequest, scheduleId: int) -> Response: :param connection_id: Connection Id """ if request.method == "GET": - res = NotebookJobServices.getSingleSchedule(scheduleId) + res = ScheduleService.getSingleSchedule(scheduleId) return Response(res.json()) if request.method == "DELETE": - res = NotebookJobServices.deleteSchedule(scheduleId) + res = ScheduleService.deleteSchedule(scheduleId) return Response(res.json()) @@ -157,7 +147,7 @@ class TimzoneView(APIView): Class to get standard pytz timezones """ def get(self, request): - res = NotebookJobServices.getTimezones() + res = ScheduleService.getTimezones() return Response(res.json()) diff --git a/api/workflows/tests/test_views.py b/api/workflows/tests/test_views.py index e58bae34..b527961a 100644 --- a/api/workflows/tests/test_views.py +++ b/api/workflows/tests/test_views.py @@ -6,7 +6,7 @@ from mixer.backend.django import mixer from conftest import populate_seed_data -from genie.services import NotebookJobServices +from genie.services import ScheduleService from workflows.tasks import runWorkflowJob from genie.tasks import runNotebookJob @@ -17,22 +17,19 @@ @pytest.mark.django_db(transaction=True) def test_workflows(client, populate_seed_data, mocker): - # ======================= create workflow test ======================= + # Create workflow test path = reverse('workflowsPost') data = {'name': 'test', 'notebookIds': [], 'scheduleId': None, 'triggerWorkflowId': None, 'triggerWorkflowStatus': 'ALWAYS'} - response = client.post(path, data=data, content_type="application/json") - assert response.status_code == 200 assert response.data['data'] - workflowId = response.data['data'] - # ======================= update workflow test ====================== + # Update workflow test path = reverse('workflowsPost') data = {'id': workflowId, 'name': 'testWorkflow', @@ -40,17 +37,13 @@ def test_workflows(client, populate_seed_data, mocker): 'scheduleId': None, 'triggerWorkflowId': None, 'triggerWorkflowStatus': 'ALWAYS'} - response = client.post(path, data=data, content_type="application/json") assert response.status_code == 200 assert response.data['data'] - - # ======================= get workflows test ========================= - + # Get workflows test path = reverse('workflows', kwargs={"offset": 0}) response = client.get(path) - exceptedWorkflow = {'id': workflowId, 'lastRun': None, 'name': 'testWorkflow', @@ -58,59 +51,47 @@ def test_workflows(client, populate_seed_data, mocker): 'schedule': None, 'triggerWorkflow': None, 'triggerWorkflowStatus': 'ALWAYS'} - assert response.status_code == 200 assert response.data['data']['total'] == 1 assert response.data['data']['workflows'][0] == exceptedWorkflow - # ===================== assign schedule test ========================= - - res = NotebookJobServices.addSchedule(cron="* 34 * * *", timezone="Africa/Asmara", name="testSchedule") + # Assign schedule test + res = ScheduleService.addSchedule(cron="* 34 * * *", timezone="Africa/Asmara", name="testSchedule") assert res.data scheduleId = res.data - path = reverse('updateSchedule', kwargs={"workflowId": workflowId}) data = {"scheduleId": scheduleId} - response = client.post(path, data=data, content_type="application/json") assert response.status_code == 200 assert response.data['data'] - - - # ===================== assign trigger workflow test ========================= - - # creating workflow + # Assign trigger workflow test path = reverse('workflowsPost') data = {'name': 'triggerWorkflow', 'notebookIds': [], 'scheduleId': None, 'triggerWorkflowId': None, 'triggerWorkflowStatus': 'ALWAYS'} + # Creating workflow response = client.post(path, data=data, content_type="application/json") assert response.status_code == 200 assert response.data['data'] - _workflowId = response.data['data'] - # assigning it as trigger workflow + # Assigning it as trigger workflow path = reverse('updateTriggerWorkflow', kwargs={"workflowId": workflowId}) data = { "triggerWorkflowId": _workflowId, "triggerWorkflowStatus": STATUS_SUCCESS } - response = client.post(path, data=data, content_type="application/json") assert response.status_code == 200 assert response.data['data'] == 1 - - # getting workflows test - + # Getting workflows test path = reverse('workflows', kwargs={"offset": 0}) response = client.get(path) - expectedWorkflows = [{'id': _workflowId, 'lastRun': None, 'name': 'triggerWorkflow', @@ -125,57 +106,45 @@ def test_workflows(client, populate_seed_data, mocker): 'schedule': {'id': scheduleId, 'name': 'testSchedule'}, 'triggerWorkflow': {'id': _workflowId, 'name': 'triggerWorkflow'}, 'triggerWorkflowStatus': STATUS_SUCCESS}] - - assert response.status_code == 200 assert response.data['data']['total'] == 2 case = unittest.TestCase() case.assertListEqual(response.data['data']['workflows'], expectedWorkflows) - # ======================== runWorkflow test =============================== + # Run workflow test runWorkflowJobPatch = mocker.patch("workflows.tasks.runWorkflowJob.delay", new=mock.MagicMock(autospec=True, side_effect=runWorkflowJob)) runNotebookJobPatch = mocker.patch("genie.tasks.runNotebookJob.delay", new=mock.MagicMock(autospec=True, side_effect=runNotebookJob)) runNotebookJobPatch.start() runWorkflowJobPatch.start() - path = reverse("runWorkflow", kwargs={"workflowId": _workflowId}) response = client.get(path) assert response.status_code == 200 assert WorkflowRun.objects.count() == 2 assert RunStatus.objects.count() == 1 - runWorkflowJobPatch.stop() runNotebookJobPatch.stop() - # testing get workflow + # Get workflow test path = reverse('workflows', kwargs={"offset": 0}) response = client.get(path) - expectedLastRunKeys = set(["endTimestamp", "startTimestamp", "status", "workflowRunId"]) - assert response.status_code == 200 assert response.data['data']['total'] == 2 assert set(response.data['data']['workflows'][0]['lastRun'].keys()) == expectedLastRunKeys assert set(response.data['data']['workflows'][1]['lastRun'].keys()) == expectedLastRunKeys - - # ====================== sorting on workflows ================================ - - # create a workflow + # Sorting on workflows test path = reverse('workflowsPost') data = {'name': 'sortTest', 'notebookIds': [], 'scheduleId': None, 'triggerWorkflowId': workflowId, 'triggerWorkflowStatus': 'ALWAYS'} - response = client.post(path, data=data, content_type="application/json") - assert response.status_code == 200 assert response.data['data'] - - # sort on name + # Sort on name test path = reverse("workflows", kwargs={"offset": 0}) + "?sortColumn=name&sortOrder=descend" response = client.get(path) names = [ x['name'] for x in response.data['data']['workflows'] ] @@ -186,7 +155,7 @@ def test_workflows(client, populate_seed_data, mocker): names = [ x['name'] for x in response.data['data']['workflows'] ] assert names == sorted(names) - # trigger workflow name + # Sort on trigger workflow name test path = reverse("workflows", kwargs={"offset": 0}) + "?sortColumn=triggerWorkflow&sortOrder=descend" response = client.get(path) triggerWorkflows = [ x['triggerWorkflow']['name'] for x in response.data['data']['workflows'] if x['triggerWorkflow'] ] @@ -197,7 +166,7 @@ def test_workflows(client, populate_seed_data, mocker): triggerWorkflows = [ x['triggerWorkflow']['name'] for x in response.data['data']['workflows'] if x['triggerWorkflow'] ] assert triggerWorkflows == sorted(triggerWorkflows) - # # last run time + # Sort on last run time test # path = reverse("workflows", kwargs={"offset": 0}) + "?sortColumn=lastRunTime&sortOrder=descend" # response = client.get(path) # lastRunTimestamps = [ x['lastRun']['startTimestamp'] for x in response.data['data']['workflows'] if x['lastRun']] @@ -209,23 +178,15 @@ def test_workflows(client, populate_seed_data, mocker): # assert lastRunTimestamps == sorted(lastRunTimestamps) - # ======================= delete workflow test ========================== + # Delete workflow test path = reverse("workflow", kwargs={"workflowId": _workflowId}) response = client.delete(path) assert response.status_code == 200 assert response.data['success'] - # get workflowRuns test - path = reverse("workflowRuns", kwargs={"workflowId": workflowId, "offset": 0}) - response = client.get(path) - assert response.status_code == 200 - assert response.data['data']['total'] == 1 - - # ========================== get workflow runs ================================ - + # Get workflowRuns test path = reverse("workflowRuns", kwargs={"workflowId": workflowId, "offset": 0}) response = client.get(path) assert response.status_code == 200 assert response.data['data']['total'] == 1 assert set(response.data['data']['workflowRuns'][0].keys()) == set(["endTimestamp", "id", "startTimestamp", "status", "workflow"]) -