diff --git a/parsons/databases/postgres/postgres.py b/parsons/databases/postgres/postgres.py index fc1c8c1642..18c2a61135 100644 --- a/parsons/databases/postgres/postgres.py +++ b/parsons/databases/postgres/postgres.py @@ -3,6 +3,7 @@ from parsons.databases.alchemy import Alchemy from parsons.databases.database_connector import DatabaseConnector from parsons.etl.table import Table +from typing import Optional import logging import os @@ -87,7 +88,7 @@ def copy( self.query_with_connection(sql, connection, commit=False) logger.info(f"{table_name} created.") - sql = f"COPY {table_name} FROM STDIN CSV HEADER;" + sql = f"""COPY "{table_name}" ("{'","'.join(tbl.columns)}") FROM STDIN CSV HEADER;""" with self.cursor(connection) as cursor: cursor.copy_expert(sql, open(tbl.to_csv(), "r")) @@ -102,4 +103,40 @@ def table(self, table_name): class PostgresTable(BaseTable): # Postgres table object. - pass + def max_value(self, column: str): + """Get the max value of this column from the table.""" + return self.db.query( + f""" + SELECT "{column}" + FROM {self.table} + ORDER BY "{column}" DESC + LIMIT 1 + """ + ).first + + def get_updated_rows( + self, + updated_at_column: str, + cutoff_value, + offset: int = 0, + chunk_size: Optional[int] = None, + ) -> Table: + """Get rows that have a greater updated_at_column value than the one provided.""" + sql = f""" + SELECT * + FROM {self.table} + """ + parameters = [] + + if cutoff_value is not None: + sql += f'WHERE "{updated_at_column}" > %s' + parameters.append(cutoff_value) + + if chunk_size: + sql += f" LIMIT {chunk_size}" + + sql += f" OFFSET {offset}" + + result = self.db.query(sql, parameters=parameters) + + return result diff --git a/parsons/databases/postgres/postgres_core.py b/parsons/databases/postgres/postgres_core.py index 080b35a4a9..b19c69890f 100644 --- a/parsons/databases/postgres/postgres_core.py +++ b/parsons/databases/postgres/postgres_core.py @@ -226,9 +226,9 @@ def table_exists_with_connection(self, table_name, connection, view=True): # Extract the table and schema from this. If no schema is detected then # will default to the public schema. try: - schema, table = table_name.lower().split(".", 1) + schema, table = table_name.split(".", 1) except ValueError: - schema, table = "public", table_name.lower() + schema, table = "public", table_name with self.cursor(connection) as cursor: # Check in pg tables for the table