diff --git a/parsons/databases/postgres/postgres.py b/parsons/databases/postgres/postgres.py index f92fd14ada..87345b4143 100644 --- a/parsons/databases/postgres/postgres.py +++ b/parsons/databases/postgres/postgres.py @@ -3,6 +3,7 @@ from parsons.databases.alchemy import Alchemy from parsons.databases.database_connector import DatabaseConnector from parsons.etl.table import Table +from typing import Optional import logging import os @@ -102,4 +103,35 @@ def table(self, table_name): class PostgresTable(BaseTable): # Postgres table object. - pass + def max_value(self, column: str): + """Get the max value of this column from the table.""" + return self.db.query( + f""" + SELECT "{column}" + FROM {self.table} + ORDER BY "{column}" DESC + LIMIT 1 + """ + ).first + + def get_updated_rows( + self, + updated_at_column: str, + cutoff_value, + offset: int = 0, + chunk_size: Optional[int] = None, + ) -> Table: + """Get rows that have a greater updated_at_column value than the one provided.""" + sql = f""" + SELECT * + FROM {self.table} + WHERE "{updated_at_column}" > %s + """ + if chunk_size: + sql += f" LIMIT {chunk_size}" + + sql += f" OFFSET {offset}" + + result = self.db.query(sql, parameters=[cutoff_value]) + + return result