From d18b33f8d99c1a106ac4c0d019395060bd7dc5bb Mon Sep 17 00:00:00 2001 From: Kevin Netherton Date: Tue, 22 Oct 2024 14:23:40 -0700 Subject: [PATCH] fix: explicitly using a connection with to_sql call, and reporting on rows after data has been loaded --- data_prep/oradb_lib.py | 39 ++++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/data_prep/oradb_lib.py b/data_prep/oradb_lib.py index cab78ca..893b406 100644 --- a/data_prep/oradb_lib.py +++ b/data_prep/oradb_lib.py @@ -285,22 +285,32 @@ def load_data( """ # debugging to view the data before it gets loaded pandas_df = pd.read_parquet(import_file) - tmp_file = self.get_tmp_file() - LOGGER.debug("tmp_file: %s", str(tmp_file)) - pandas_df.to_csv(str(tmp_file), sep="|", index_label=None) + self.get_connection() # make sure there is an oracle connection - LOGGER.debug("table: %s", table) + LOGGER.debug("loading data for table: %s", table) self.get_sqlalchemy_engine() if purge: self.truncate_table(table.lower()) - pandas_df.to_sql( - table.lower(), - self.sql_alchemy_engine, - schema="THE", - if_exists="append", - index=False, - ) + with self.sql_alchemy_engine.connect() as connection: + with connection.begin(): + pandas_df.to_sql( + table.lower(), + con=connection, + schema="THE", + if_exists="append", + index=False, + ) + # now verify data + sql = f"Select count(*) from {self.schema_2_sync}.{table}" + cur = self.connection.cursor() + cur.execute(sql) + result = cur.fetchall() + rows_loaded = result[0][0] + if not rows_loaded: + LOGGER.error("no rows loaded to table %s", table) + LOGGER.debug("rows loaded to table %s are: %s", table, rows_loaded) + cur.close() def load_data_retry( self, @@ -413,10 +423,9 @@ def purge_data( except ( # noqa: PERF203 sqlalchemy.exc.IntegrityError, DatabaseError, - ) as e: - LOGGER.exception( - "%s purging table %s", - e.__class__.__qualname__, + ): + LOGGER.warning( + "error encountered when attempting to purge table: %s, retrying", table, ) failed_tables.append(table)