Skip to content

Commit

Permalink
Merge pull request #20 from cuebook/zeppelin-cluster
Browse files Browse the repository at this point in the history
Scalable zeppelin server for running scheduled notebooks
  • Loading branch information
vikrantcue authored Jul 2, 2021
2 parents 07faf6f + 7c82d11 commit 36cade1
Show file tree
Hide file tree
Showing 16 changed files with 414 additions and 113 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr_checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
export GIT_BRANCH=$GITHUB_HEAD_REF
cd api
source .env.dev
source .env.test
export CC_TEST_REPORTER_ID=${{ secrets.CODE_CLIMATE_TEST_REPORTER_ID }}
Expand Down
1 change: 1 addition & 0 deletions api/.env.dev
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export ENVIRONMENT=dev
export REDIS_BROKER_URL=redis://localhost:6379/0
export ZEPPELIN_HOST=http://localhost
export ZEPPELIN_PORT=8081
Expand Down
23 changes: 23 additions & 0 deletions api/.env.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
export ENVIRONMENT=test
export REDIS_BROKER_URL=redis://localhost:6379/0
export ZEPPELIN_HOST=http://localhost
export ZEPPELIN_PORT=8081

## DB SETTINGS
export POSTGRES_DB_HOST="localhost"
export POSTGRES_DB_USERNAME="postgres"
export POSTGRES_DB_PASSWORD="postgres"
export POSTGRES_DB_SCHEMA="cuelake_db"
export POSTGRES_DB_PORT=5432

## S3 Settings
export S3_BUCKET_NAME="YourBucketName"
export S3_FILES_PREFIX="files/"
export HADOOP_S3_PREFIX="cuelake/"

## Metastore Settings
export METASTORE_POSTGRES_HOST="localhost"
export METASORE_POSTGRES_PORT=5432
export METASORE_POSTGRES_USERNAME="postgres"
export METASORE_POSTGRES_PASSWORD="postgres"
export METASORE_POSTGRES_DATABASE="cuelake_metastore"
18 changes: 18 additions & 0 deletions api/genie/migrations/0016_runstatus_zeppelinserverid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 3.2.1 on 2021-06-28 20:32

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('genie', '0015_alter_runstatus_workflowrun'),
]

operations = [
migrations.AddField(
model_name='runstatus',
name='zeppelinServerId',
field=models.CharField(default='', max_length=200),
),
]
1 change: 1 addition & 0 deletions api/genie/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class RunStatus(models.Model):
message = models.CharField(max_length=5000, null=True, default=None)
workflowRun = models.ForeignKey(WorkflowRun, null=True, blank=True, on_delete=models.SET_NULL)
taskId = models.CharField(max_length=200, default="")
zeppelinServerId = models.CharField(max_length=200, default="")


# Connection Models
Expand Down
34 changes: 2 additions & 32 deletions api/genie/services/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import List
from utils.apiResponse import ApiResponse
from kubernetes import config, client
from utils.kubernetesAPI import Kubernetes

class KubernetesServices:

Expand All @@ -12,37 +13,6 @@ def getDriversCount():
Gets Driver and executors count
"""
res = ApiResponse()
if os.environ.get("POSTGRES_DB_HOST","") == "localhost":
config.load_kube_config()
else:
config.load_incluster_config()
runningDrivers = 0
runningExecutors = 0
pendingDrivers = 0
pendingExecutors = 0
v1 = client.CoreV1Api()
POD_NAMESPACE = os.environ.get("POD_NAMESPACE","cuelake")
ret = v1.list_namespaced_pod(POD_NAMESPACE, watch=False)
pods = ret.items
pods_name = [pod.metadata.name for pod in pods]
podLabels = [[pod.metadata.labels, pod.status.phase] for pod in pods] # list
podStatus = [pod.status for pod in pods]

for label in podLabels:
if "interpreterSettingName" in label[0] and label[0]["interpreterSettingName"] == "spark" and label[1]=="Running":
runningDrivers += 1

if "interpreterSettingName" in label[0] and label[0]["interpreterSettingName"] == "spark" and label[1]=="Pending":
pendingDrivers += 1
if "spark-role" in label[0] and label[0]["spark-role"] == "executor" and label[1]=="Running":
runningExecutors += 1

if "spark-role" in label[0] and label[0]["spark-role"] == "executor" and label[1]=="Pending":
pendingExecutors += 1
data = {"runningDrivers":runningDrivers,
"pendingDrivers":pendingDrivers,
"runningExecutors":runningExecutors,
"pendingExecutors":pendingExecutors
}
data = Kubernetes.getDriversCount()
res.update(True, "Pods status retrieved successfully", data)
return res
5 changes: 3 additions & 2 deletions api/genie/services/notebookJobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from genie.serializers import NotebookObjectSerializer, RunStatusSerializer
from workflows.models import Workflow, NotebookJob as WorkflowNotebookJob
from utils.apiResponse import ApiResponse
from utils.zeppelinAPI import Zeppelin
from utils.zeppelinAPI import Zeppelin, ZeppelinAPI
from genie.tasks import runNotebookJob as runNotebookJobTask
from django.conf import settings

Expand Down Expand Up @@ -340,7 +340,8 @@ def stopNotebookJob(notebookId: str):
if(notebookRunStatus.status == NOTEBOOK_STATUS_RUNNING):
notebookRunStatus.status = NOTEBOOK_STATUS_ABORT
notebookRunStatus.save()
thread = threading.Thread(target=Zeppelin.stopNotebookJob, args=[notebookId])
zeppelin = ZeppelinAPI(notebookRunStatus.zeppelinServerId)
thread = threading.Thread(target=zeppelin.stopNotebookJob, args=[notebookId])
thread.start()
res.update(True, "Aborting notebook job", None)
return res
Expand Down
128 changes: 92 additions & 36 deletions api/genie/tasks.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
import os
import json
import uuid
import datetime as dt
import dateutil.parser as dp
import requests
import polling
from celery import shared_task
from django.conf import settings

from genie.models import NotebookJob, RunStatus, NOTEBOOK_STATUS_SUCCESS, NOTEBOOK_STATUS_ERROR, NOTEBOOK_STATUS_RUNNING, NOTEBOOK_STATUS_FINISHED, NOTEBOOK_STATUS_ABORT
from genie.models import NotebookJob, RunStatus, NOTEBOOK_STATUS_SUCCESS, NOTEBOOK_STATUS_ERROR, NOTEBOOK_STATUS_RUNNING, NOTEBOOK_STATUS_FINISHED, NOTEBOOK_STATUS_ABORT, NOTEBOOK_STATUS_QUEUED
from system.services import NotificationServices
from utils.zeppelinAPI import Zeppelin
from utils.zeppelinAPI import ZeppelinAPI
from utils.kubernetesAPI import Kubernetes
import logging

# Get an instance of a logger
logger = logging.getLogger(__name__)

ZEPPELIN_API_RETRY_COUNT = 3
ZEPPELIN_SERVER_CONCURRENCY = os.environ.get("ZEPPELIN_SERVER_CONCURRENCY", 5)
ZEPPELIN_JOB_SERVER_PREFIX = "zeppelin-job-server-"

@shared_task
def runNotebookJob(notebookId: str, runStatusId: int = None, runType: str = "Scheduled"):
Expand All @@ -23,39 +28,40 @@ def runNotebookJob(notebookId: str, runStatusId: int = None, runType: str = "Sch
:param notebookId: ID of the zeppelin notebook which to run
:param runStatusId: ID of genie.runStatus model
"""
notebookName = notebookId # Initialize notebook name with notebook id
logger.info(f"Starting notebook job for: {notebookId}")
taskId = runNotebookJob.request.id # Celery task id
taskId = taskId if taskId else ""
runStatus = __getOrCreateRunStatus(runStatusId, notebookId, runType, taskId)
try:
# Check if notebook is already running
isRunning, notebookName = __checkIfNotebookRunning(notebookId)
if isRunning:
runStatus.status=NOTEBOOK_STATUS_ERROR
runStatus.message="Notebook already running"
runStatus.save()
else:
# Clear notebook results
Zeppelin.clearNotebookResults(notebookId)
response = Zeppelin.runNotebookJob(notebookId)
if response:
try:
polling.poll(
lambda: __checkIfNotebookRunningAndStoreLogs(notebookId, runStatus) != True, step=3, timeout=3600*6
)
except Exception as ex:
logger.error(f"Error occured in notebook {notebookId}. Error: {str(ex)}")
runStatus.status = NOTEBOOK_STATUS_ERROR
runStatus.message = str(ex)
runStatus.endTimestamp = dt.datetime.now()
runStatus.save()
NotificationServices.notify(notebookName=notebookName, isSuccess=False, message=str(ex))
else:
logger.error(f"Error occured in notebook {notebookId}. Error: Failed to trigger notebook job")
runStatus.status=NOTEBOOK_STATUS_ERROR
runStatus.message = "Failed running notebook"
zeppelinServerId = __allocateZeppelinServer(runStatus)
logger.info(f"Notebook {notebookId} scheduled to run on {zeppelinServerId}")
zeppelin = ZeppelinAPI(zeppelinServerId)
__waitUntilServerReady(zeppelinServerId, zeppelin)
isRunning, notebookName = __checkIfNotebookRunning(notebookId, zeppelin) # change to get only notebook name
# Clear notebook results
zeppelin.clearNotebookResults(notebookId)
response = zeppelin.runNotebookJob(notebookId)
if response:
try:
polling.poll(
lambda: __checkIfNotebookRunningAndStoreLogs(notebookId, runStatus, zeppelin) != True, step=3, timeout=3600*6
)
__evaluateScaleDownZeppelin()
except Exception as ex:
logger.error(f"Error occured in notebook {notebookId}. Error: {str(ex)}")
runStatus.status = NOTEBOOK_STATUS_ERROR
runStatus.message = str(ex)
runStatus.endTimestamp = dt.datetime.now()
runStatus.save()
NotificationServices.notify(notebookName=notebookName, isSuccess=False, message=str(ex))
else:
logger.error(f"Error occured in notebook {notebookId}. Error: Failed to trigger notebook job")
runStatus.status=NOTEBOOK_STATUS_ERROR
runStatus.message = "Failed running notebook"
runStatus.endTimestamp = dt.datetime.now()
runStatus.save()

except Exception as ex:
logger.error(f"Error occured in notebook {notebookId}. Error: {str(ex)}")
runStatus.status=NOTEBOOK_STATUS_ERROR
Expand All @@ -64,6 +70,43 @@ def runNotebookJob(notebookId: str, runStatusId: int = None, runType: str = "Sch
runStatus.save()
NotificationServices.notify(notebookName=notebookName if notebookName else notebookId, isSuccess=False, message=str(ex))

def __allocateZeppelinServer(runStatus: RunStatus):
"""
Creates or allocates a zeppelin server to run the notebook on
"""
zeppelinServerNotebookMap = __getZeppelinServerNotebookMap()
zeppelinServerId = __getOrCreateZeppelinServerId(zeppelinServerNotebookMap)
runStatus.zeppelinServerId = zeppelinServerId
runStatus.save()
return zeppelinServerId

def __getZeppelinServerNotebookMap():
notebookRuns = RunStatus.objects.filter(status__in=[NOTEBOOK_STATUS_RUNNING, NOTEBOOK_STATUS_QUEUED])
zeppelinServerNotebookMap = {} # this contains number of running jobs per zeppelinServerId
for notebookRun in notebookRuns:
if notebookRun.zeppelinServerId != "" and notebookRun.zeppelinServerId in zeppelinServerNotebookMap:
zeppelinServerNotebookMap[notebookRun.zeppelinServerId] += 1
elif notebookRun.zeppelinServerId != "":
zeppelinServerNotebookMap[notebookRun.zeppelinServerId] = 1
return zeppelinServerNotebookMap

def __getOrCreateZeppelinServerId(zeppelinServerMap):
for zeppelinServerId, runningNotebooks in zeppelinServerMap.items():
if runningNotebooks < ZEPPELIN_SERVER_CONCURRENCY:
return zeppelinServerId
randomId = uuid.uuid4().hex.lower()[0:20]
zeppelinServerId = ZEPPELIN_JOB_SERVER_PREFIX + randomId
Kubernetes.addZeppelinServer(zeppelinServerId)
return zeppelinServerId

def __waitUntilServerReady(zeppelinServerId: str, zeppelin: ZeppelinAPI):
polling.poll(
lambda: Kubernetes.getPodStatus(zeppelinServerId) == 'Running', step=3, timeout=3600*6
)
polling.poll(
lambda: zeppelin.healthCheck() != False, step=3, timeout=3600*6
)

def __getOrCreateRunStatus(runStatusId: int, notebookId: str, runType: str, taskId: str):
"""
Gets or creates a notebook run status object
Expand All @@ -78,39 +121,40 @@ def __getOrCreateRunStatus(runStatusId: int, notebookId: str, runType: str, task
runStatus.save()
return runStatus

def __checkIfNotebookRunning(notebookId: str):
def __checkIfNotebookRunning(notebookId: str, zeppelin: ZeppelinAPI):
"""
Checks if notebook is running and returns tuple of isNotebookRunning, notebookName
"""
response = Zeppelin.getNotebookDetails(notebookId)
response = zeppelin.getNotebookDetails(notebookId)
isNotebookRunning = False
notebookName = ""
if response:
isNotebookRunning = response.get("info", {}).get("isRunning", False)
notebookName = response.get("name", "")
return isNotebookRunning, notebookName

def __checkIfNotebookRunningAndStoreLogs(notebookId, runStatus):
def __checkIfNotebookRunningAndStoreLogs(notebookId: str, runStatus: RunStatus, zeppelin: ZeppelinAPI):
"""
Checks if notebook is running and stores logs
"""
response = Zeppelin.getNotebookDetailsWithRetry(notebookId)
response = zeppelin.getNotebookDetailsWithRetry(notebookId)
logger.info(response)
if response:
runStatus.logs = json.dumps(response)
runStatus.save()
isNotebookRunning = response.get("info", {}).get("isRunning", False)
if not isNotebookRunning:
if(__checkIfRetryable(response)):
__rerunNotebook(notebookId)
__rerunNotebook(notebookId, zeppelin)
return True
__setNotebookStatus(response, runStatus)
return isNotebookRunning
else:
__setNotebookStatus(response, runStatus)
return False

def __rerunNotebook(notebookId):
Zeppelin.runNotebookJob(notebookId)
def __rerunNotebook(notebookId: str, zeppelin: ZeppelinAPI):
zeppelin.runNotebookJob(notebookId)

def __checkIfRetryable(response):
responseString = json.dumps(response)
Expand Down Expand Up @@ -142,4 +186,16 @@ def __setNotebookStatus(response, runStatus: RunStatus):
runStatus.status=NOTEBOOK_STATUS_SUCCESS if response else NOTEBOOK_STATUS_ERROR
runStatus.endTimestamp = dt.datetime.now()
runStatus.save()
NotificationServices.notify(notebookName=notebookName, isSuccess=True, message="Run successful")
NotificationServices.notify(notebookName=notebookName, isSuccess=True, message="Run successful")

def __evaluateScaleDownZeppelin():
pods = Kubernetes.getPods()
zeppelinServerPods = []
for pod in pods:
if ZEPPELIN_JOB_SERVER_PREFIX in pod.metadata.name:
zeppelinServerPods.append(pod)
zeppelinServerNotebookMap = __getZeppelinServerNotebookMap()
for pod in zeppelinServerPods:
if pod.metadata.name not in zeppelinServerNotebookMap:
logger.info(f"Removing zeppelin server: {pod.metadata.name}")
Kubernetes.removeZeppelinServer(pod.metadata.name)
10 changes: 5 additions & 5 deletions api/seeddata/crontabschedule.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
"model": "django_celery_beat.crontabschedule",
"pk": 1,
"fields": {
"minute": "*",
"hour": "*",
"day_of_week": "*",
"day_of_month": "*/30",
"month_of_year": "*/2",
"minute": "0",
"hour": "0",
"day_of_week": "0",
"day_of_month": "30",
"month_of_year": "2",
"timezone": "Asia/Kolkata"
}
},
Expand Down
2 changes: 1 addition & 1 deletion api/start_server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ python manage.py loaddata seeddata/*.json
chmod -R 777 db
chown -R www-data:www-data db
(gunicorn app.wsgi --user www-data --bind 0.0.0.0:8000 --workers 3) &
(celery -A app worker --concurrency=6 -l INFO --purge) &
(celery -A app worker --concurrency=20 -l INFO --purge) &
(celery -A app beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler) &
nginx -g "daemon off;"
Loading

0 comments on commit 36cade1

Please sign in to comment.