Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add profiling log for livy sessions #51

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
add profiling log for livy sessions
  • Loading branch information
tovganesh committed Dec 5, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 28f1dff1bafb4469e1eb111faa4602b6ffc4aed9
37 changes: 30 additions & 7 deletions dbt/adapters/spark_livy/livysession.py
Original file line number Diff line number Diff line change
@@ -86,6 +86,9 @@ def create_session(self, data):
except requests.exceptions.JSONDecodeError as json_err:
raise Exception("Json decode error to get session_id") from json_err

logger.debug(f"Obtained new livy session: {self.session_id}")
logger.debug(f"{self.session_id}: Polling to check if session is idle")

# Wait for started state
while True:
res = requests.get(
@@ -95,6 +98,8 @@ def create_session(self, data):
verify=self.verify_ssl_certificate
).json()

logger.debug(f"{self.session_id}: Session state is {res['state']}")

if res['state'] == 'idle':
break
if res['state'] == 'dead':
@@ -104,9 +109,10 @@ def create_session(self, data):
)
return

logger.debug(f"{self.session_id}: Sleeping {DEFAULT_POLL_WAIT} seconds before next poll")
time.sleep(DEFAULT_POLL_WAIT)

logger.debug(f"Creating new livy session: {self.session_id}")
logger.debug(f"Created new livy session: {self.session_id}")

return self.session_id

@@ -245,21 +251,24 @@ def _getLivySQL(self, sql):

return code

def _getLivyResult(self, res_obj):
json_res = res_obj.json()
def _getLivyResult(self, statement_id):
logger.debug(f"{self.session_id}: {statement_id}: Awaiting query results")

while True:
res = requests.get(
self.connect_url + '/sessions/' + self.session_id + '/statements/' + repr(json_res['id']),
self.connect_url + '/sessions/' + self.session_id + '/statements/' + statement_id,
headers=self.headers,
auth=self.auth,
verify=self.verify_ssl_certificate
).json()

# print(res)
logger.debug(f"{self.session_id}: {statement_id}: Query status: {res['state']}")

if res['state'] == 'available':
return res

logger.debug(f"{self.session_id}: {statement_id}: Sleeping {DEFAULT_POLL_WAIT} seconds to poll for status")
time.sleep(DEFAULT_POLL_WAIT)

def execute(self, sql: str, *parameters: Any) -> None:
@@ -287,8 +296,13 @@ def execute(self, sql: str, *parameters: Any) -> None:

# TODO: handle parameterised sql

res = self._getLivyResult(self._submitLivyCode(self._getLivySQL(sql)))

logger.debug(f"{self.session_id}: Submitting query: {sql}")

res_obj = self._submitLivyCode(self._getLivySQL(sql))
json_res = res_obj.json()
statement_id = repr(json_res['id'])
res = self._getLivyResult(statement_id)

if (res['output']['status'] == 'ok'):
# values = res['output']['data']['application/json']
values = res['output']['data']['application/json']
@@ -300,6 +314,8 @@ def execute(self, sql: str, *parameters: Any) -> None:
else:
self._rows = []
self._schema = []

logger.debug(f"{self.session_id}: {statement_id}: Query execution over")
else:
self._rows = None
self._schema = None
@@ -412,6 +428,7 @@ def __init__(self):
self.livy_global_session = None

def connect(self, connect_url, user, password, auth_type, session_params, verify_ssl_certificate):
logger.debug("Starting livy session")
if auth_type and auth_type.lower() == "kerberos":
logger.debug("Using Kerberos auth")
auth = HTTPKerberosAuth()
@@ -431,12 +448,17 @@ def connect(self, connect_url, user, password, auth_type, session_params, verify

if (self.livy_global_session == None):
self.livy_global_session = LivySession(connect_url, auth, headers, verify_ssl_certificate)
logger.debug("Creating new livy session")
self.livy_global_session.create_session(data)
logger.debug(f"New Livy session created: {self.livy_global_session.session_id}")
elif not self.livy_global_session.is_valid_session():
logger.debug(f"Deleting previous invalid session: {self.livy_global_session.session_id}")
self.livy_global_session.delete_session()
logger.debug("Creating new livy session")
self.livy_global_session.create_session(data)
logger.debug(f"New Livy session created: {self.livy_global_session.session_id}")
else:
logger.debug(f"Reusing session: {self.livy_global_session.session_id}")
logger.debug(f"Reusing Livy session: {self.livy_global_session.session_id}")

livyConnection = LivyConnection(
connect_url,
@@ -446,6 +468,7 @@ def connect(self, connect_url, user, password, auth_type, session_params, verify
session_params,
verify_ssl_certificate
)
logger.debug(f"{self.livy_global_session.session_id}: Livy session connected")

return livyConnection