diff --git a/dbt/adapters/spark_livy/livysession.py b/dbt/adapters/spark_livy/livysession.py index 7d10532b7..295a8cc45 100644 --- a/dbt/adapters/spark_livy/livysession.py +++ b/dbt/adapters/spark_livy/livysession.py @@ -86,6 +86,9 @@ def create_session(self, data): except requests.exceptions.JSONDecodeError as json_err: raise Exception("Json decode error to get session_id") from json_err + logger.debug(f"Obtained new livy session: {self.session_id}") + logger.debug(f"{self.session_id}: Polling to check if session is idle") + # Wait for started state while True: res = requests.get( @@ -95,6 +98,8 @@ def create_session(self, data): verify=self.verify_ssl_certificate ).json() + logger.debug(f"{self.session_id}: Session state is {res['state']}") + if res['state'] == 'idle': break if res['state'] == 'dead': @@ -104,9 +109,10 @@ def create_session(self, data): ) return + logger.debug(f"{self.session_id}: Sleeping {DEFAULT_POLL_WAIT} seconds before next poll") time.sleep(DEFAULT_POLL_WAIT) - logger.debug(f"Creating new livy session: {self.session_id}") + logger.debug(f"Created new livy session: {self.session_id}") return self.session_id @@ -245,21 +251,24 @@ def _getLivySQL(self, sql): return code - def _getLivyResult(self, res_obj): - json_res = res_obj.json() + def _getLivyResult(self, statement_id): + logger.debug(f"{self.session_id}: {statement_id}: Awaiting query results") while True: res = requests.get( - self.connect_url + '/sessions/' + self.session_id + '/statements/' + repr(json_res['id']), + self.connect_url + '/sessions/' + self.session_id + '/statements/' + statement_id, headers=self.headers, auth=self.auth, verify=self.verify_ssl_certificate ).json() # print(res) + logger.debug(f"{self.session_id}: {statement_id}: Query status: {res['state']}") if res['state'] == 'available': return res + + logger.debug(f"{self.session_id}: {statement_id}: Sleeping {DEFAULT_POLL_WAIT} seconds to poll for status") time.sleep(DEFAULT_POLL_WAIT) def execute(self, sql: str, *parameters: Any) -> None: @@ -287,8 +296,13 @@ def execute(self, sql: str, *parameters: Any) -> None: # TODO: handle parameterised sql - res = self._getLivyResult(self._submitLivyCode(self._getLivySQL(sql))) - + logger.debug(f"{self.session_id}: Submitting query: {sql}") + + res_obj = self._submitLivyCode(self._getLivySQL(sql)) + json_res = res_obj.json() + statement_id = repr(json_res['id']) + res = self._getLivyResult(statement_id) + if (res['output']['status'] == 'ok'): # values = res['output']['data']['application/json'] values = res['output']['data']['application/json'] @@ -300,6 +314,8 @@ def execute(self, sql: str, *parameters: Any) -> None: else: self._rows = [] self._schema = [] + + logger.debug(f"{self.session_id}: {statement_id}: Query execution over") else: self._rows = None self._schema = None @@ -412,6 +428,7 @@ def __init__(self): self.livy_global_session = None def connect(self, connect_url, user, password, auth_type, session_params, verify_ssl_certificate): + logger.debug("Starting livy session") if auth_type and auth_type.lower() == "kerberos": logger.debug("Using Kerberos auth") auth = HTTPKerberosAuth() @@ -431,12 +448,17 @@ def connect(self, connect_url, user, password, auth_type, session_params, verify if (self.livy_global_session == None): self.livy_global_session = LivySession(connect_url, auth, headers, verify_ssl_certificate) + logger.debug("Creating new livy session") self.livy_global_session.create_session(data) + logger.debug(f"New Livy session created: {self.livy_global_session.session_id}") elif not self.livy_global_session.is_valid_session(): + logger.debug(f"Deleting previous invalid session: {self.livy_global_session.session_id}") self.livy_global_session.delete_session() + logger.debug("Creating new livy session") self.livy_global_session.create_session(data) + logger.debug(f"New Livy session created: {self.livy_global_session.session_id}") else: - logger.debug(f"Reusing session: {self.livy_global_session.session_id}") + logger.debug(f"Reusing Livy session: {self.livy_global_session.session_id}") livyConnection = LivyConnection( connect_url, @@ -446,6 +468,7 @@ def connect(self, connect_url, user, password, auth_type, session_params, verify session_params, verify_ssl_certificate ) + logger.debug(f"{self.livy_global_session.session_id}: Livy session connected") return livyConnection