Skip to content

Commit

Permalink
ruff
Browse files Browse the repository at this point in the history
  • Loading branch information
nlebovits committed Dec 5, 2024
1 parent 0b6f407 commit 6496cf4
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 42 deletions.
15 changes: 8 additions & 7 deletions data/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from config.psql import conn
from sqlalchemy import text
import traceback
from sqlalchemy.types import DateTime

from new_etl.classes.slack_pg_reporter import send_pg_stats_to_slack

Expand Down Expand Up @@ -143,7 +142,7 @@
conn,
if_exists="replace", # Replace the table if it already exists
)

# Ensure the `create_date` column exists
conn.execute(
text("""
Expand All @@ -159,7 +158,7 @@
END $$;
""")
)

# Convert the table to a hypertable
try:
conn.execute(
Expand All @@ -173,7 +172,7 @@
print("Table is already a hypertable.")
else:
raise

# Set chunk interval to 1 month
try:
conn.execute(
Expand All @@ -189,14 +188,14 @@
# Enable compression on the hypertable
try:
conn.execute(
text(f"""
text("""
ALTER TABLE vacant_properties_end SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'opa_id'
);
""")
)
print(f"Compression enabled on table vacant_properties_end.")
print("Compression enabled on table vacant_properties_end.")
except Exception as e:
print(f"Error enabling compression on table vacant_properties_end: {e}")

Expand All @@ -214,7 +213,9 @@

# Commit the transaction
conn.commit()
print("Data successfully saved and table prepared with partitioning and compression.")
print(
"Data successfully saved and table prepared with partitioning and compression."
)

except Exception as e:
print(f"Error during the table operation: {e}")
Expand Down
59 changes: 33 additions & 26 deletions data/src/new_etl/classes/featurelayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,7 @@
from google.cloud import storage
from google.cloud.storage.bucket import Bucket
from shapely import wkb
import pandas as pd
import geopandas as gpd
from sqlalchemy.sql import text
from sqlalchemy.types import DateTime
import traceback
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

Expand Down Expand Up @@ -122,25 +117,25 @@ def check_psql(self):

def load_data(self):
log.info(f"Loading data for {self.name} from {self.type}...")

if self.type == "gdf":
return # Skip processing for gdf type

try:
if self.type == "esri":
if not self.esri_rest_urls:
raise ValueError("Must provide a URL to load data from Esri")
gdfs = []
for url in self.esri_rest_urls:
print(f"Processing URL: {url}")

# Use EsriDumper to get features
dumper = EsriDumper(url)
features = [feature for feature in dumper]
if not features:
log.error(f"No features returned for URL: {url}")
continue

geojson_features = {
"type": "FeatureCollection",
"features": features,
Expand All @@ -149,36 +144,38 @@ def load_data(self):
geojson_features, crs=self.input_crs
)
gdf = gdf.to_crs(self.crs)

# Add parcel_type based on the URL
if "Vacant_Indicators_Land" in url:
gdf["parcel_type"] = "Land"
elif "Vacant_Indicators_Bldg" in url:
gdf["parcel_type"] = "Building"

gdfs.append(gdf)

# Concatenate all dataframes
self.gdf = pd.concat(gdfs, ignore_index=True)

elif self.type == "carto":
self._load_carto_data()

# Convert all column names to lowercase
if not self.gdf.empty:
self.gdf.columns = [col.lower() for col in self.gdf.columns]

# Drop columns not in self.cols, if specified
if self.cols:
self.cols = [col.lower() for col in self.cols]
self.cols.append("geometry")
self.gdf = self.gdf[[col for col in self.cols if col in self.gdf.columns]]

self.gdf = self.gdf[
[col for col in self.cols if col in self.gdf.columns]
]

# Save GeoDataFrame to PostgreSQL
self.gdf.to_postgis(
name=self.psql_table,
con=conn,
if_exists="replace", # Replace the table if it already exists
if_exists="replace", # Replace the table if it already exists
chunksize=1000,
)

Expand Down Expand Up @@ -206,7 +203,9 @@ def load_data(self):
SELECT create_hypertable('{self.psql_table}', 'create_date', migrate_data => true);
""")
)
print(f"Table {self.psql_table} successfully converted to a hypertable.")
print(
f"Table {self.psql_table} successfully converted to a hypertable."
)
except Exception as e:
if "already a hypertable" in str(e):
print(f"Table {self.psql_table} is already a hypertable.")
Expand All @@ -220,11 +219,15 @@ def load_data(self):
SELECT set_chunk_time_interval('{self.psql_table}', INTERVAL '1 month');
""")
)
print(f"Chunk time interval set to 1 month for table {self.psql_table}.")
print(
f"Chunk time interval set to 1 month for table {self.psql_table}."
)
except Exception as e:
print(f"Error setting chunk interval for table {self.psql_table}: {e}")
print(
f"Error setting chunk interval for table {self.psql_table}: {e}"
)

# Enable compression on the hypertable
# Enable compression on the hypertable
try:
conn.execute(
text(f"""
Expand All @@ -236,25 +239,29 @@ def load_data(self):
print(f"Compression enabled on table {self.psql_table}.")
except Exception as e:
print(f"Error enabling compression on table {self.psql_table}: {e}")

# Add compression policy for chunks older than 3 months
try:
conn.execute(
text(f"""
SELECT add_compression_policy('{self.psql_table}', INTERVAL '3 months');
""")
)
print(f"Compression policy added for chunks older than 3 months on table {self.psql_table}.")
print(
f"Compression policy added for chunks older than 3 months on table {self.psql_table}."
)
except Exception as e:
print(f"Error adding compression policy for table {self.psql_table}: {e}")
print(
f"Error adding compression policy for table {self.psql_table}: {e}"
)

# Commit the transaction
conn.commit()

except Exception as e:
log.error(f"Error loading data for {self.name}: {e}")
traceback.print_exc()
conn.rollback() # Rollback the transaction in case of failure
conn.rollback() # Rollback the transaction in case of failure
self.gdf = gpd.GeoDataFrame()

def _load_carto_data(self):
Expand Down
23 changes: 14 additions & 9 deletions data/src/new_etl/classes/slack_pg_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
from slack_sdk import WebClient


def send_pg_stats_to_slack(conn):
"""
Report total sizes for all hypertables using hypertable_detailed_size
Expand All @@ -22,22 +23,26 @@ def send_pg_stats_to_slack(conn):
size_result = conn.execute(text(size_query))
for row in size_result:
# Append the total size (row[3] = total_bytes)
detailed_sizes.append({
'hypertable': hypertable,
'total_bytes': row[3],
})
detailed_sizes.append(
{
"hypertable": hypertable,
"total_bytes": row[3],
}
)

# Step 3: Format the message for Slack
message = "*Hypertable Total Sizes:*\n"
for size in detailed_sizes:
total_bytes = size['total_bytes']
total_bytes = size["total_bytes"]
total_size = (
f"{total_bytes / 1073741824:.2f} GB" if total_bytes >= 1073741824 else
f"{total_bytes / 1048576:.2f} MB" if total_bytes >= 1048576 else
f"{total_bytes / 1024:.2f} KB"
f"{total_bytes / 1073741824:.2f} GB"
if total_bytes >= 1073741824
else f"{total_bytes / 1048576:.2f} MB"
if total_bytes >= 1048576
else f"{total_bytes / 1024:.2f} KB"
)
message += f"- {size['hypertable']}: {total_size}\n"

# Step 4: Send to Slack
token = os.getenv("CAGP_SLACK_API_TOKEN")
if token:
Expand Down

0 comments on commit 6496cf4

Please sign in to comment.