Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port Northern Ireland #98

Merged
merged 42 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
57d0957
Initial set-up for port of Northern Ireland to dagster
sgreenbury May 15, 2024
1524f30
Merge remote-tracking branch 'origin/belgium_spec_new' into ni-tmp
sgreenbury May 16, 2024
649ed1e
Update NI port
sgreenbury May 16, 2024
bf3ea60
Adding transformations to match updated Belgium
sgreenbury May 16, 2024
1992bc8
Merge branch 'main' into 36-port-ni-update
sgreenbury May 17, 2024
eb883ba
Complete and revise DAG for NI, metadata for source table
sgreenbury May 20, 2024
6374fb4
Add remaining required assets, update cloud outputs
sgreenbury May 20, 2024
5ad8d47
Fix metrics asset
sgreenbury May 22, 2024
145166d
Add super data zones
sgreenbury May 22, 2024
7927b16
Refactor, fix source data release
sgreenbury May 22, 2024
017ea20
Fix asset job definition
sgreenbury May 22, 2024
e400dce
Initial implementation using a country base class
sgreenbury May 22, 2024
83668b9
Add README for NI
sgreenbury May 23, 2024
69f74d9
Add country outputs class
sgreenbury May 23, 2024
852c8f8
Add dep
sgreenbury May 23, 2024
b489dfb
Begin update to cover all census tables
sgreenbury May 23, 2024
0d42d27
Add processing for pivoting arbitrary census tables
sgreenbury May 24, 2024
ad64406
Add new metrics asset across all partitions
sgreenbury May 28, 2024
1ef97ea
Change aggfunc to sum to prevent cast as float
sgreenbury May 28, 2024
3661f68
Fix hxltag construction
sgreenbury May 28, 2024
ee18520
Merge branch 'main' into 36-port-ni
sgreenbury May 28, 2024
f069cea
Move metrics into abc
sgreenbury May 28, 2024
01836c0
Refactor to remove obsolete geo levels dict
sgreenbury May 29, 2024
6c33d26
Remove unused reshape metrics method
sgreenbury May 29, 2024
26bf2be
Remove source table from metric specification
sgreenbury May 29, 2024
1428420
Update README
sgreenbury May 29, 2024
ba5d001
Rename module, add doc strings for country class
sgreenbury May 29, 2024
49bb759
Add openpyxl dep for reading Excel files
sgreenbury Jun 6, 2024
2462c09
Fix job description
sgreenbury Jun 6, 2024
01f1ecb
Add key_prefix to init method
sgreenbury Jun 6, 2024
2387f75
Add methods for adding an removing partition keys
sgreenbury Jun 6, 2024
49948df
Remove obsolete attributes
sgreenbury Jun 6, 2024
eab49b6
Remove update to python version in notebook
sgreenbury Jun 6, 2024
adf5456
Complete comment
sgreenbury Jun 6, 2024
b801e0e
Remove all partitions during materialization
sgreenbury Jun 6, 2024
4667efd
Use country method to add partitions
sgreenbury Jun 6, 2024
0ca5bb4
Add key_prefix to init
sgreenbury Jun 6, 2024
f2fbe85
Update key_prefix type and add assignment in init
sgreenbury Jun 6, 2024
baaecbb
Fix condition for partition keys included in metadata construction
sgreenbury Jun 7, 2024
56fff8a
Revert key_prefix to ClassVar over arg of init
sgreenbury Jun 6, 2024
a58a086
Add jobs list towards simplifying popgetter init
sgreenbury Jun 6, 2024
10c2126
Fix ruff lint
sgreenbury Jun 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ dependencies = [
"icecream >=2.1.3", # General debugging tool
"python-slugify >=8.0.4", # Required for generating asset names from GBR Ordnance Survey OpenData Product names
"jcs >=0.2.1", # For generating IDs from class attributes
"beautifulsoup4 >=4.12.3", # For extracting catalogs from web pages
sgreenbury marked this conversation as resolved.
Show resolved Hide resolved
"openpyxl >=3.1.3", # For reading Excel files
]


Expand Down
44 changes: 25 additions & 19 deletions python/popgetter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
*load_assets_from_package_module(assets.us, group_name="us"),
*load_assets_from_package_module(assets.be, group_name="be"),
*load_assets_from_package_module(assets.uk, group_name="uk"),
*load_assets_from_package_module(assets.ni, group_name="ni"),
*load_assets_from_package_module(cloud_outputs, group_name="cloud_outputs"),
*(
load_assets_from_modules([azure_test], group_name="azure_test")
Expand All @@ -62,24 +63,29 @@
),
]

job_be: UnresolvedAssetJobDefinition = define_asset_job(
name="job_be",
selection=AssetSelection.groups("be"),
description="Downloads Belgian data.",
partitions_def=assets.be.census_tables.dataset_node_partition,
)

job_us: UnresolvedAssetJobDefinition = define_asset_job(
name="job_us",
selection=AssetSelection.groups("us"),
description="Downloads USA data.",
)

job_uk: UnresolvedAssetJobDefinition = define_asset_job(
name="job_uk",
selection=AssetSelection.groups("uk"),
description="Downloads UK data.",
)
jobs: list[UnresolvedAssetJobDefinition] = [
define_asset_job(
name="job_be",
selection=AssetSelection.groups("be"),
description="Downloads Belgian data.",
partitions_def=assets.be.census_tables.dataset_node_partition,
),
define_asset_job(
name="job_us",
selection=AssetSelection.groups("us"),
description="Downloads USA data.",
),
define_asset_job(
name="job_uk",
selection=AssetSelection.groups("uk"),
description="Downloads UK data.",
),
define_asset_job(
name="job_ni",
selection=AssetSelection.groups("ni"),
description="Downloads Northern Ireland data.",
),
]


def resources_by_env():
Expand Down Expand Up @@ -120,5 +126,5 @@ def resources_by_env():
cloud_outputs.metrics_sensor,
],
resources=resources,
jobs=[job_be, job_us, job_uk],
jobs=jobs,
)
2 changes: 1 addition & 1 deletion python/popgetter/assets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from __future__ import annotations

from . import be, uk, us # noqa: F401
from . import be, ni, uk, us # noqa: F401
239 changes: 239 additions & 0 deletions python/popgetter/assets/country.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import ClassVar

import geopandas as gpd
import pandas as pd
from dagster import AssetDep, DynamicPartitionsDefinition, asset

from popgetter.cloud_outputs import (
send_to_geometry_sensor,
send_to_metadata_sensor,
send_to_metrics_sensor,
)
from popgetter.metadata import (
CountryMetadata,
DataPublisher,
GeometryMetadata,
MetricMetadata,
SourceDataRelease,
)


class Country(ABC):
"""
A general class that can be implemented for a given country providing asset
factories and abstract methods to provide a template for a given country.

Attributes:
key_prefix (str): the prefix for the asset keys (e.g. "be" for Belgium)
dataset_node_partition (DynamicPartitionsDefinition): a dynamic partitions
definition populated at runtime with a partition per census table.

"""

key_prefix: ClassVar[str]
partition_name: str
dataset_node_partition: DynamicPartitionsDefinition

sgreenbury marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self):
self.partition_name = f"{self.key_prefix}_nodes"
self.dataset_node_partition = DynamicPartitionsDefinition(
name=self.partition_name
)

def add_partition_keys(self, context, keys: list[str]):
context.instance.add_dynamic_partitions(
partitions_def_name=self.partition_name,
partition_keys=keys,
)

def remove_all_partition_keys(self, context):
for partition_key in context.instance.get_dynamic_partitions(
self.partition_name
):
context.instance.delete_dynamic_partition(
self.partition_name, partition_key
)

def create_catalog(self):
"""Creates an asset providing a census metedata catalog."""

@asset(key_prefix=self.key_prefix)
def catalog(context) -> pd.DataFrame:
return self._catalog(context)

return catalog

@abstractmethod
def _catalog(self, context) -> pd.DataFrame:
...

def create_country_metadata(self):
"""Creates an asset providing the country metadata."""

@send_to_metadata_sensor
@asset(key_prefix=self.key_prefix)
def country_metadata(context):
return self._country_metadata(context)

return country_metadata

@abstractmethod
def _country_metadata(self, context) -> CountryMetadata:
...

def create_data_publisher(self):
"""Creates an asset providing the data publisher metadata."""

@send_to_metadata_sensor
@asset(key_prefix=self.key_prefix)
def data_publisher(context, country_metadata: CountryMetadata):
return self._data_publisher(context, country_metadata)

return data_publisher

@abstractmethod
def _data_publisher(
self, context, country_metdata: CountryMetadata
) -> DataPublisher:
...

def create_geometry(self):
"""
Creates an asset providing a list of geometries, metadata and names
at different resolutions.
"""

@send_to_geometry_sensor
@asset(key_prefix=self.key_prefix)
def geometry(context):
return self._geometry(context)

return geometry

@abstractmethod
def _geometry(
self, context
) -> list[tuple[GeometryMetadata, gpd.GeoDataFrame, pd.DataFrame]]:
...

def create_source_data_releases(self):
"""
Creates an asset providing the corresponding source data release metadata for
each geometry.
"""

@send_to_metadata_sensor
@asset(key_prefix=self.key_prefix)
def source_data_releases(
context,
geometry: list[tuple[GeometryMetadata, gpd.GeoDataFrame, pd.DataFrame]],
data_publisher: DataPublisher,
) -> dict[str, SourceDataRelease]:
return self._source_data_releases(context, geometry, data_publisher)

return source_data_releases

@abstractmethod
def _source_data_releases(
self,
context,
geometry: list[tuple[GeometryMetadata, gpd.GeoDataFrame, pd.DataFrame]],
data_publisher: DataPublisher,
# TODO: consider version without inputs so only output type specified
# **kwargs,
) -> dict[str, SourceDataRelease]:
...

def create_census_tables(self):
"""
Creates an asset providing each census table as a dataframe for each
partition.
"""

@asset(partitions_def=self.dataset_node_partition, key_prefix=self.key_prefix)
def census_tables(context, catalog: pd.DataFrame) -> pd.DataFrame:
return self._census_tables(context, catalog)

return census_tables

@abstractmethod
def _census_tables(self, context, catalog: pd.DataFrame) -> pd.DataFrame:
...

def create_source_metric_metadata(self):
"""
Creates an asset providing the metadata required for downstream metric
derivation.
"""

@asset(partitions_def=self.dataset_node_partition, key_prefix=self.key_prefix)
def source_metric_metadata(
context, catalog, source_data_releases: dict[str, SourceDataRelease]
) -> MetricMetadata:
return self._source_metric_metadata(context, catalog, source_data_releases)

return source_metric_metadata

@abstractmethod
def _source_metric_metadata(
self,
context,
catalog: pd.DataFrame,
source_data_releases: dict[str, SourceDataRelease],
) -> MetricMetadata:
...

def create_derived_metrics(self):
"""
Creates an asset providing the metrics derived from the census tables and the
corresponding source metric metadata.
"""

@asset(partitions_def=self.dataset_node_partition, key_prefix=self.key_prefix)
def derived_metrics(
context,
census_tables: pd.DataFrame,
source_metric_metadata: MetricMetadata,
) -> tuple[list[MetricMetadata], pd.DataFrame]:
return self._derived_metrics(context, census_tables, source_metric_metadata)

return derived_metrics

@abstractmethod
def _derived_metrics(
self,
context,
census_tables: pd.DataFrame,
source_metric_metadata: MetricMetadata,
) -> tuple[list[MetricMetadata], pd.DataFrame]:
...

def create_metrics(self):
"""
Creates an asset combining all partitions across census tables into a combined
list of metric data file names (for output), list of metadata and metric
dataframe.
"""

@send_to_metrics_sensor
# Note: does not seem possible to specify a StaticPartition derived from a DynamicPartition:
# See: https://discuss.dagster.io/t/16717119/i-want-to-be-able-to-populate-a-dagster-staticpartitionsdefi
sgreenbury marked this conversation as resolved.
Show resolved Hide resolved
@asset(deps=[AssetDep("derived_metrics")], key_prefix=self.key_prefix)
def metrics(
context,
catalog: pd.DataFrame,
) -> list[tuple[str, list[MetricMetadata], pd.DataFrame]]:
return self._metrics(context, catalog)

return metrics

@abstractmethod
def _metrics(
self,
context,
catalog: pd.DataFrame,
) -> list[tuple[str, list[MetricMetadata], pd.DataFrame]]:
...
22 changes: 22 additions & 0 deletions python/popgetter/assets/ni/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Northern Ireland

## Summary

Census 2021 is available from
[https://build.nisra.gov.uk](https://build.nisra.gov.uk/en/).

The processing pipeline involves the following steps, achieved by implementing
the [`Country`](../country.py) base class:

- Retrieve the geography data and outputs with standard geometry formats
(`geometry` asset)
- Generate metadata associated with country, data publisher and source data
releases (`country_metadata`, `data_publisher` and `source_data_releases`
assets)
- Generate a catalog by identifying all tables
[available](https://build.nisra.gov.uk/en/standard) (`catalog` asset)
- Read table metadata and census tables, across different geography levels,
currently for Data Zone 2021, Super Data Zone 2021 and Local Government
District 2014 (`census_tables` and `source_metric_metadata` assets)
- Process census tables into metrics per geography ID and any other pre-defined
derived metrics (`metrics` asset)
Loading
Loading