Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to publishing pipeline #108

Merged
merged 14 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ jobs:
run: >-
python -m pytest -ra --cov --cov-report=xml --cov-report=term
--durations=20
env:
DAGSTER_HOME: ${{ runner.temp }}

- name: Upload coverage report
uses: codecov/codecov-action@v4.5.0
162 changes: 162 additions & 0 deletions docs/new_country.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# Defining a new country

## Overview

In general, each self-contained set of census data will correspond to a single
territory, which we loosely refer to as a 'country'. Each country in popgetter
will have a group of assets that it publishes. To add a new country to
popgetter, you will need to:

- Create a new subdirectory `python/popgetter/assets/{COUNTRY_ID}`.
penelopeysm marked this conversation as resolved.
Show resolved Hide resolved
`{COUNTRY_ID}` here is a lowercase, unique identifier for the country. (In
principle, we would like this to correspond to the
`popgetter.metadata.CountryMetadata.id` computed field: thus, for an actual
country, this will be its ISO 3166-1 alpha-3 code (e.g. 'bel' for Belgium);
and for a subdivision of a country, this will be its ISO 3166-2 code. This is
not yet the case.)

- Inside `python/popgetter/assets/__init__.py`, import the new country
subdirectory as a module and add this module to the list of `countries`. This
allows Dagster to detect the assets belonging to the new country.

- Define the requisite assets inside the country subdirectory, as will be
described below. Note that you can structure the code inside the country
subdirectory however you like (e.g. across multiple files), as Dagster will
load all the assets in that subdirectory.

## Required assets

There are, fundamentally, five assets which must be defined for each country.
Their return types are fixed, but they can have any input types (i.e. you can
construct the asset graph in any way you like).

Three of these are metadata assets:

- **Country metadata:** an asset which returns
`popgetter.metadata.CountryMetadata` object(s).
- **Data publisher metadata:** an asset which returns
`popgetter.metadata.DataPublisher` object(s).
- **Source release metadata:** an asset which returns
`popgetter.metadata.SourceDataRelease` object(s).

These metadata assets can return either a single object, a list of objects, or a
dictionary where the values are objects. This flexibility makes it easier to
construct dependencies between assets in Dagster depending on your needs.

- **Geometries:** an asset which returns a list of
`popgetter.cloud_outputs.GeometryOutput` objects

A `GeometryOutput` is essentially a named tuple of
`popgetter.metadata.GeometryMetadata` (which provides metadata about the
geometry), a `geopandas.GeoDataFrame` object (which contains geoIDs and the
geometries themselves), and a `pandas.DataFrame` object (which contains geoIDs
and the names of the regions).

Note that the GeoDataFrame must _only_ contain the geometries and the geoIDs,
and the DataFrame must _only_ contain the geoIDs and the names of the regions.
Additionally, the geoID column in both of these must be named `GEO_ID`; and the
column names in the DataFrame must correspond to
[lowercase ISO 639-3 codes](https://iso639-3.sil.org/code_tables/639/data).

- **Metrics:** an asset which returns a list of
`popgetter.cloud_outputs.MetricsOutput` objects

One `MetricsOutput` in turn comprises a list of
`popgetter.metadata.MetricMetadata` classes (which provides metadata about the
metric), and a `pandas.DataFrame` object (which contains the metric data). Each
element of the metadata list will correspond to one of the columns in the
DataFrame. The DataFrame must also contain a `GEO_ID` column, which contains the
geoIDs that correspond to the geometries.

This asset returns a _list_ of `MetricsOutput` objects because each of the the
individual outputs will be serialised to a separate parquet file. The location
of this parquet file is specified as part of the `MetricMetadata` object.

(Note that because a `MetricMetadata` object includes an ID for the
`SourceDataRelease` that it corresponds to, which _in turn_ contains an ID for
the `GeometryMetadata`, each set of metrics can be tied to one geometry level.)

## Publishing the assets

Defining the assets and importing them should allow you to view the asset graph
in the Dagster UI and materialise the assets. When the assets are materialised,
Dagster will serialise their return values by pickling them and storing them
inside the `$DAGSTER_HOME/storage` directory. However, these files are not
suitable for consumption by downstream tasks such as the popgetter CLI: the CLI
expects data and metadata to be provided in a specific format (see
[Output structure](output_structure.md)).

In the popgetter library, the pipeline which publishes (meta)data in the correct
format is constructed using
[sensors](https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors).
These sensors monitor a list of specified assets for materialisations, and will
publish their return values in the correct format when new materialisations are
observed. (As a bonus, if any of your assets do not have the correct return
types or do not satisfy any of the extra stipulations above, the sensor will
raise an error.)

If the `ENV` environment variable is set to `prod`, the sensors will publish the
data to an Azure blob storage container; otherwise, the data will be published
to `$DAGSTER_HOME/cloud_outputs`. To publish to Azure you will also need to set
the additional environment variable `SAS_TOKEN`.

To attach your newly defined assets to the sensors, all you need to do is to
import the following decorators:

```python
from popgetter.cloud_outputs import (
send_to_metadata_sensor,
send_to_geometry_sensor,
send_to_metrics_sensor,
)
```

and decorate your assets with these. The three metadata assets will use the
`send_to_metadata_sensor` decorator, and likewise for the others. Note that this
decorator expects an asset as an input, so the decorator must be applied as the
outermost decorator, i.e. _above_ Dagster's `@asset` decorator. For example:

```python
@send_to_metadata_sensor
@asset(...)
def country_metadata():
return CountryMetadata(...)
```

## `Country` base class

To simplify the process of defining the assets and the associated relationships
between them, we provide a `Country` base class which you can inherit from.
These abstract away most of Dagster's implementation details, and mean that you
only need to write the actual Python functions to process the data. For example,
instead of the `country_metadata` asset above, you could write:

```python
from popgetter.assets.country import Country


class MyCountry(Country):
def _country_metadata(self, context):
return CountryMetadata(...)


my_country = MyCountry()
country_metadata_asset = my_country.create_country_metadata()
```

The `create_country_metadata` method will generate a Dagster asset and register
it with the metadata sensor for you. The reason why this line is necessary is
that Dagster can only detect assets which are defined at the top level of any
module: so, calling this method binds an asset to a top-level definition which
can then be picked up.

For an example of this, see the implementation of Northern Ireland data in
`python/popgetter/assets/ni`.
sgreenbury marked this conversation as resolved.
Show resolved Hide resolved

Naturally, the implementation of this base class makes some assumptions about
the structure of the data and the relationships between them. We have found
these to be applicable across multiple countries we have worked with. However,
if these are not suitable for your data, you can still define the assets
manually as shown above! It is also possible to override part of the base class
with manual asset implementations to include variations from the structure
assumed by the base class.
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ dependencies = [
"dagster-cloud >=1.6.11,<1.7.0", # Version matched to dagster (see above)
"dagster-azure >=0.22.11", # Only required for production deployments, could be moved to an optional
"pandas >=2.1,<2.2", # Pinned to 2.1 as 2.2 might be causing the failure here https://github.com/Urban-Analytics-Technology-Platform/popgetter/actions/runs/7593850248/job/20684737578
"geopandas >=0.14.1", # This probably won't resolve to the latest version of geopandas, due to version restrictions pandas package (see above).
"geopandas >=0.14.1,<1", # Compatibility with 1.0 to be determined
"docker", # Use and version to be confirmed, see https://github.com/Urban-Analytics-Technology-Platform/popgetter/issues/38#issuecomment-2009350512
"lxml >=4.9.3", # Used by `download_from_wfs` function
"pyarrow", # Used interface with polars-arrow
Expand All @@ -50,6 +50,8 @@ dependencies = [
"jcs >=0.2.1", # For generating IDs from class attributes
"beautifulsoup4 >=4.12.3", # For extracting catalogs from web pages
"openpyxl >=3.1.3", # For reading Excel files
"iso639-lang >=2.2.3", # For checking ISO639-3 language codes
"aiohttp >=3.9.5", # Async HTTP
]


Expand Down
9 changes: 5 additions & 4 deletions python/popgetter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@

from dagster import ExperimentalWarning

# Has to be placed before imports as otherwise importing other modules will
# trigger ExperimentalWarnings themselves...
if "IGNORE_EXPERIMENTAL_WARNINGS" in os.environ:
warnings.filterwarnings("ignore", category=ExperimentalWarning)

from popgetter.io_managers.azure import (
AzureGeneralIOManager,
AzureGeoIOManager,
Expand All @@ -25,10 +30,6 @@
__all__ = ["__version__"]


if "IGNORE_EXPERIMENTAL_WARNINGS" in os.environ:
warnings.filterwarnings("ignore", category=ExperimentalWarning)


import os

from dagster import (
Expand Down
19 changes: 8 additions & 11 deletions python/popgetter/assets/be/census_derived.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
)
from icecream import ic

from popgetter.cloud_outputs import send_to_metrics_sensor
from popgetter.cloud_outputs import MetricsOutput, send_to_metrics_sensor
from popgetter.metadata import MetricMetadata, SourceDataRelease, metadata_to_dataframe

from .belgium import asset_prefix
Expand Down Expand Up @@ -252,7 +252,7 @@ def source_metrics_by_partition(
def derived_metrics_by_partition(
context,
source_metrics_by_partition: tuple[MetricMetadata, pd.DataFrame],
) -> tuple[list[MetricMetadata], pd.DataFrame]:
) -> MetricsOutput:
node = context.partition_key

source_mmd, source_table = source_metrics_by_partition
Expand Down Expand Up @@ -312,7 +312,7 @@ def derived_metrics_by_partition(
},
)

return derived_mmd, joined_metrics
return MetricsOutput(metadata=derived_mmd, metrics=joined_metrics)


@send_to_metrics_sensor
Expand All @@ -328,23 +328,20 @@ def derived_metrics_by_partition(
key_prefix=asset_prefix,
)
def metrics(
context, derived_metrics_by_partition: tuple[list[MetricMetadata], pd.DataFrame]
) -> list[tuple[str, list[MetricMetadata], pd.DataFrame]]:
context,
derived_metrics_by_partition: MetricsOutput,
) -> list[MetricsOutput]:
"""
This asset exists solely to aggregate all the derived tables into one
single unpartitioned asset, which the downstream publishing tasks can use.

Right now it is a bit boring because it only relies on one partition, but
it could be extended when we have more data products.
"""
mmds, table = derived_metrics_by_partition
filepath = mmds[0].metric_parquet_path

context.add_output_metadata(
metadata={
"num_metrics": len(mmds),
"num_metrics": len(derived_metrics_by_partition.metadata),
"num_parquets": 1,
},
)

return [(filepath, mmds, table)]
return [derived_metrics_by_partition]
39 changes: 23 additions & 16 deletions python/popgetter/assets/be/census_geometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
from dataclasses import dataclass
from datetime import date

import geopandas as gpd
import matplotlib.pyplot as plt
import pandas as pd
from dagster import (
AssetIn,
MetadataValue,
Expand All @@ -14,7 +12,11 @@
)
from icecream import ic

from popgetter.cloud_outputs import send_to_geometry_sensor, send_to_metadata_sensor
from popgetter.cloud_outputs import (
GeometryOutput,
send_to_geometry_sensor,
send_to_metadata_sensor,
)
from popgetter.metadata import (
GeometryMetadata,
SourceDataRelease,
Expand Down Expand Up @@ -99,9 +101,7 @@ class BelgiumGeometryLevel:
},
key_prefix=asset_prefix,
)
def geometry(
context, sector_geometries
) -> list[tuple[GeometryMetadata, gpd.GeoDataFrame, pd.DataFrame]]:
def geometry(context, sector_geometries) -> list[GeometryOutput]:
"""
Produces the full set of data / metadata associated with Belgian
municipalities. The outputs, in order, are:
Expand Down Expand Up @@ -140,26 +140,33 @@ def geometry(
)
.loc[:, ["GEO_ID", "nld", "fra", "deu"]]
.drop_duplicates()
.astype({"GEO_ID": str})
)
ic(region_names.head())

geometries_to_return.append(
(geometry_metadata, region_geometries, region_names)
GeometryOutput(
metadata=geometry_metadata, gdf=region_geometries, names_df=region_names
)
)

# Add output metadata
first_metadata, first_gdf, first_names = geometries_to_return[0]
first_joined_gdf = first_gdf.merge(first_names, on="GEO_ID")
first_output = geometries_to_return[0]
first_joined_gdf = first_output.gdf.merge(first_output.names_df, on="GEO_ID")
ax = first_joined_gdf.plot(column="nld", legend=False)
ax.set_title(f"Belgium 2023 {first_metadata.level}")
ax.set_title(f"Belgium 2023 {first_output.metadata.level}")
md_plot = markdown_from_plot(plt)
context.add_output_metadata(
metadata={
"all_geom_levels": MetadataValue.md(
",".join([metadata.level for metadata, _, _ in geometries_to_return])
",".join(
[geom_output.metadata.level for geom_output in geometries_to_return]
)
),
"first_geometry_plot": MetadataValue.md(md_plot),
"first_names_preview": MetadataValue.md(first_names.head().to_markdown()),
"first_names_preview": MetadataValue.md(
first_output.names_df.head().to_markdown()
),
}
)

Expand All @@ -169,13 +176,13 @@ def geometry(
@send_to_metadata_sensor
@asset(key_prefix=asset_prefix)
def source_data_releases(
geometry: list[tuple[GeometryMetadata, gpd.GeoDataFrame, pd.DataFrame]]
geometry: list[GeometryOutput],
) -> dict[str, SourceDataRelease]:
"""
Returns all SourceDataReleases for each geometry level.
"""
return {
geo_metadata.level: SourceDataRelease(
geo_output.metadata.level: SourceDataRelease(
name="StatBel Open Data",
date_published=date(2015, 10, 22),
reference_period_start=date(2015, 10, 22),
Expand All @@ -186,7 +193,7 @@ def source_data_releases(
url="https://statbel.fgov.be/en/open-data",
description="TBC",
data_publisher_id=publisher.id,
geometry_metadata_id=geo_metadata.id,
geometry_metadata_id=geo_output.metadata.id,
)
for geo_metadata, _, _ in geometry
for geo_output in geometry
}
Loading
Loading