-
Notifications
You must be signed in to change notification settings - Fork 0
11 fetch preprocess gdp #16
Changes from all commits
7cb5b60
838e6f6
4c968c3
2843d29
8af724b
9a60bab
3ca3447
167b7be
e9cf871
f4035a1
6943b12
1e030b7
9466005
c4cf499
4bbd42a
c926220
9e707f2
1df0313
c861fd9
0fddf29
ffb109a
c33036b
23b2790
bdc1113
c56b41c
2f3a5a5
a34945a
8a37546
2d08e93
1f19cdc
2c72203
8c7950b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
# Data getters for official data | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't forget to run ❯ flake8 industrial_taxonomy
industrial_taxonomy/getters/official.py:4:1: F401 'typing.Dict' imported but unused
industrial_taxonomy/getters/official.py:4:1: F401 'typing.Optional' imported but unused
industrial_taxonomy/getters/official.py:6:1: F401 'metaflow.Run' imported but unused
industrial_taxonomy/getters/official.py:15:1: DAR201 Missing "Returns" in Docstring: - return
industrial_taxonomy/getters/official.py:15:1: DAR401 Missing exception(s) in Raises section: -r MetaflowNotFound
industrial_taxonomy/getters/official.py:27:1: DAR201 Missing "Returns" in Docstring: - return
industrial_taxonomy/getters/official.py:43:1: DAR201 Missing "Returns" in Docstring: - return
industrial_taxonomy/getters/official.py:59:1: DAR201 Missing "Returns" in Docstring: - return
industrial_taxonomy/getters/official.py:80:1: DAR201 Missing "Returns" in Docstring: - return
industrial_taxonomy/pipeline/official/utils.py:5:1: F401 'pandas as pd' imported but unused
industrial_taxonomy/pipeline/official/population/utils.py:2:1: F401 'pandas as df' imported but unused
industrial_taxonomy/pipeline/official/population/flow.py:4:1: F401 'metaflow.decorators.StepDecorator' imported but unused
industrial_taxonomy/pipeline/official/population/flow.py:7:112: B950 line too long (111 > 80 characters)
industrial_taxonomy/pipeline/official/nomis/utils.py:14:1: DAR101 Missing parameter(s) in Docstring: - **kwargs
industrial_taxonomy/pipeline/official/nomis/flow.py:6:1: F401 'metaflow.decorators.StepDecorator' imported but unused
industrial_taxonomy/pipeline/official/nomis/flow.py:12:90: B950 line too long (89 > 80 characters)
industrial_taxonomy/pipeline/official/nomis/flow.py:13:101: B950 line too long (100 > 80 characters)
industrial_taxonomy/pipeline/official/nomis/flow.py:14:101: B950 line too long (100 > 80 characters)
industrial_taxonomy/pipeline/official/nomis/flow.py:15:102: B950 line too long (101 > 80 characters)
industrial_taxonomy/pipeline/official/nomis/flow.py:16:109: B950 line too long (108 > 80 characters)
industrial_taxonomy/pipeline/official/nomis/flow.py:17:113: B950 line too long (112 > 80 characters)
industrial_taxonomy/pipeline/official/nomis/flow.py:18:90: B950 line too long (89 > 80 characters)
industrial_taxonomy/pipeline/official/nomis/flow.py:22:90: B950 line too long (89 > 80 characters)
industrial_taxonomy/pipeline/official/nomis/flow.py:23:114: B950 line too long (113 > 80 characters)
industrial_taxonomy/pipeline/official/nomis/flow.py:24:112: B950 line too long (111 > 80 characters)
industrial_taxonomy/pipeline/official/nomis/flow.py:25:114: B950 line too long (113 > 80 characters)
industrial_taxonomy/pipeline/official/nomis/flow.py:26:110: B950 line too long (109 > 80 characters)
industrial_taxonomy/pipeline/official/nomis/flow.py:27:111: B950 line too long (110 > 80 characters)
industrial_taxonomy/pipeline/official/gdp/utils.py:4:1: F401 're' imported but unused There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For a top-level description use doc-string style rather than a |
||
|
||
from functools import lru_cache | ||
|
||
from metaflow import Flow, Run | ||
from metaflow.exception import MetaflowNotFound | ||
from typing import Optional | ||
|
||
try: # Hack for type-hints on attributes | ||
import pandas as pd | ||
except ImportError: | ||
pass | ||
|
||
|
||
@lru_cache() | ||
def get_run(flow_name: str) -> Run: | ||
"""Gets last successful run executed with `--production`""" | ||
runs = Flow(flow_name).runs("project_branch:prod") | ||
try: | ||
return next(filter(lambda run: run.successful, runs)) | ||
except StopIteration as exc: | ||
raise MetaflowNotFound("Matching run not found") from exc | ||
|
||
|
||
def gva_lad(run: Optional[Run] = None): | ||
"""get the GVA in a local authority | ||
|
||
Arguments: | ||
run: what run to get (if None it gets the lastest production run) | ||
|
||
Returns: | ||
Columns: | ||
Name: nuts1_name, dtype: str, NUTS1 region (e.g. Scotland, South East etc) | ||
Name: la_code, dtype: str, local authority code | ||
Name: la_name, dtype: str, local authority name | ||
Name: year dtype: int, year (ranges between 1998 and 2019 | ||
Name: gva dtype: float, £M Gross value added | ||
|
||
""" | ||
|
||
if run is None: | ||
run = get_run("LocalGdpData") | ||
|
||
return ( | ||
pd.DataFrame(run.data.gva) | ||
.melt( | ||
id_vars=["itl1_region", "la_code", "la_name"], | ||
var_name="year", | ||
value_name="gva", | ||
) | ||
.rename(columns={"itl1_region": "nuts1_name"}) | ||
) | ||
|
||
|
||
def population_lad(run: Optional[Run] = None): | ||
"""get the population in a local authority | ||
|
||
Arguments: | ||
run: what run to get (if None it gets the lastest production run) | ||
|
||
Returns: | ||
Columns: | ||
Name: nuts1_name, dtype: str, NUTS1 region (e.g. Scotland, South East etc) | ||
Name: la_code, dtype: str, local authority code | ||
Name: la_name, dtype: str, local authority name | ||
Name: year dtype: int, year (ranges between 1998 and 2019 | ||
Name: pop dtype: float, population | ||
|
||
""" | ||
|
||
if run is None: | ||
run = get_run("LocalGdpData") | ||
|
||
return ( | ||
pd.DataFrame(run.data.pop) | ||
.melt( | ||
id_vars=["itl1_region", "la_code", "la_name"], | ||
var_name="year", | ||
value_name="pop", | ||
) | ||
.rename(columns={"itl1_region": "nuts1_name"}) | ||
) | ||
|
||
|
||
def gva_pc_lad(): | ||
"""Get the GVA per capita in a local authority | ||
|
||
Returns: | ||
Columns: | ||
Name: nuts1_name, dtype: str, NUTS1 region (e.g. Scotland, South East etc) | ||
Name: la_code, dtype: str, local authority code | ||
Name: la_name, dtype: str, local authority name | ||
Name: year dtype: int, year (ranges between 1998 and 2019 | ||
Name: gva_pc dtype: float, GDP per capita | ||
|
||
""" | ||
|
||
gva = gva_lad() | ||
pop = population_lad() | ||
|
||
return ( | ||
gva.merge(pop, on=["nuts1_name", "la_code", "la_name", "year"]) | ||
.assign(gva_pc=lambda df: (1e6 * df["gva"] / df["pop"]).round(2)) | ||
.drop(axis=1, labels=["gva", "pop"]) | ||
) | ||
|
||
|
||
def nomis(run: Optional[Run] = None): | ||
"""Get nomis tables including variables from | ||
Annual Population Survey (APS) | ||
Annual Survey of Hours and Earnings (ASHE) | ||
|
||
Arguments: | ||
run: what run to get (if None it gets the lastest production run) | ||
|
||
Returns: | ||
Columns: | ||
Name: year, dtype: int, year when the data was collected | ||
(in the case of APS it will refer to the last | ||
month of the year when education information is available) | ||
Name: la_code, dtype: str, local authority code | ||
Name: la_name, dtype: str, local authority name | ||
Name: variable, dtype: str, variable including: | ||
Economic activity rate (APS) | ||
Employment rate (APS) | ||
% with tertiary education (APS) | ||
% with no qualification (APS) | ||
Annual pay (gross) £ (ASHE) | ||
Name: value, dtype: float, value for the variable | ||
Name: source, dtype: str, aps or ashe | ||
""" | ||
|
||
# Standardise variables with the other tables | ||
column_name_lookup = { | ||
"date": "year", | ||
"geography_name": "la_name", | ||
"geography_code": "la_code", | ||
} | ||
|
||
if run is None: | ||
run = get_run("NomisTables") | ||
|
||
return pd.DataFrame(run.data.nomis_dict).rename(columns=column_name_lookup) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
"""Fetch GDP data""" | ||
|
||
from typing import Dict, List, Union | ||
from metaflow import FlowSpec, project, step | ||
|
||
try: # Hack for type-hints on attributes | ||
from pandas import DataFrame | ||
except ImportError: | ||
pass | ||
|
||
GDP_URL = ( | ||
"https://www.ons.gov.uk/file?uri=/economy/grossdomesticproductgdp/datasets/" | ||
"regionalgrossdomesticproductlocalauthorities/1998to2019/" | ||
"regionalgrossdomesticproductlocalauthorities.xlsx" | ||
) | ||
# Excel spreadsheets with the data we are interested in | ||
SHEETS = [6, 7] | ||
|
||
|
||
@project(name="industrial_taxonomy") | ||
class LocalGdpData(FlowSpec): | ||
"""Fetch local GDP (including population and GVA) data from the ONS website | ||
|
||
Attributes: | ||
url: location of the original file | ||
pop_clean: population table | ||
gva_clean: GVA table | ||
gva: gva dict | ||
pop: pop dict | ||
""" | ||
|
||
url: str | ||
gva_clean: "DataFrame" | ||
pop_clean: "DataFrame" | ||
pop: List[Dict[str, Union[str, float]]] | ||
gva: List[Dict[str, Union[str, float]]] | ||
|
||
@step | ||
def start(self): | ||
"""Fetch the GDP data from the ONS""" | ||
import pandas as pd | ||
from industrial_taxonomy.pipeline.official.utils import get | ||
|
||
self.url = GDP_URL | ||
gdp_table = get(self.url).content | ||
|
||
# Create dfs for the sheets with relevant information (population and GVA) | ||
self._gva_raw, self._pop_raw = [ | ||
pd.read_excel(gdp_table, sheet_name=sh, skiprows=1) for sh in SHEETS | ||
] | ||
|
||
self.next(self.transform) | ||
|
||
@step | ||
def transform(self): | ||
"""Clean up the data""" | ||
from industrial_taxonomy.pipeline.official.gdp.utils import process_gdp_table | ||
|
||
self.pop_clean = process_gdp_table(self._pop_raw) | ||
self.gva_clean = process_gdp_table(self._gva_raw) | ||
|
||
self.next(self.end) | ||
|
||
@step | ||
def end(self): | ||
"""Save the tables as dicts""" | ||
|
||
for table, name in zip([self.gva_clean, self.pop_clean], ["gva", "pop"]): | ||
|
||
table_dict = table.to_dict(orient="records") | ||
setattr(self, name, table_dict) | ||
|
||
|
||
if __name__ == "__main__": | ||
LocalGdpData() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
"""Clean and process GDP data""" | ||
|
||
import pandas as pd | ||
|
||
YEAR_RANGE = range(1998, 2020) | ||
|
||
|
||
def process_gdp_table(table: pd.DataFrame): | ||
"""Removes table footnotes and renames columns""" | ||
_table = ( | ||
table.dropna( | ||
axis=0, subset=["LA code"] # We are dropping bottom rows without a LA code | ||
) | ||
.rename(columns={"2019\n[note 3]": "2019"}) | ||
.rename(columns={x: str(x) for x in YEAR_RANGE}) | ||
.rename(columns=lambda s: s.lower().replace(" ", "_")) | ||
) | ||
|
||
return _table |
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,94 @@ | ||||||||
"""Flow to collect NOMIS data with the exception of BRES""" | ||||||||
from io import BytesIO | ||||||||
from metaflow import FlowSpec, project, step | ||||||||
from typing import List, Dict, Union | ||||||||
|
||||||||
try: # Hack for type-hints on attributes | ||||||||
from pandas import DataFrame | ||||||||
except ImportError: | ||||||||
pass | ||||||||
|
||||||||
_APS_URL = ( | ||||||||
"https://www.nomisweb.co.uk/api/v01/dataset/NM_17_5.data.csv?" | ||||||||
"geography=1811939329...1811939332,1811939334...1811939336,1811939338..." | ||||||||
"1811939497,1811939499...1811939501,1811939503," | ||||||||
"1811939505...1811939507,1811939509...1811939517," | ||||||||
"1811939519,1811939520,1811939524...1811939570,1811939575...1811939599," | ||||||||
"1811939601...1811939628,1811939630...1811939634,1811939636...1811939647" | ||||||||
"1811939649,1811939655...1811939664,1811939667...1811939680" | ||||||||
",1811939682,1811939683,1811939685,1811939687...1811939704,1811939707,1811939708" | ||||||||
",1811939710,1811939712...1811939717,1811939719,1811939720,1811939722..." | ||||||||
"1811939730&date=2019-12&variable=18,45,290,335,344" | ||||||||
"&measures=20599,21001,21002,21003" | ||||||||
) | ||||||||
|
||||||||
_ASHE_URL = ( | ||||||||
"https://www.nomisweb.co.uk/api/v01/dataset/NM_30_1.data.csv?" | ||||||||
"geography=1811939329...1811939332,1811939334...1811939336,1811939338..." | ||||||||
"1811939497,1811939499...1811939501,1811939503,1811939505..." | ||||||||
"1811939507,1811939509...1811939517,1811939519,1811939520,1811939524..." | ||||||||
"1811939570,1811939575...1811939599,1811939601...1811939628,1811939630..." | ||||||||
"1811939634,1811939636...1811939647,1811939649,1811939655...1811939664," | ||||||||
"1811939667...1811939680,1811939682,1811939683,1811939685,1811939687..." | ||||||||
"1811939704,1811939707,1811939708,1811939710,1811939712...1811939717," | ||||||||
"1811939719,1811939720,1811939722...1811939730&date=latest&sex=8&item=" | ||||||||
"2&pay=7&measures=20100,20701" | ||||||||
) | ||||||||
_APS_PARAMS = { | ||||||||
"indicator_name": "Variable", | ||||||||
"value_column": "VARIABLE_NAME", | ||||||||
"source": "aps", | ||||||||
} | ||||||||
_ASHE_PARAMS = {"indicator_name": "Value", "value_column": "PAY_NAME", "source": "ashe"} | ||||||||
|
||||||||
|
||||||||
@project(name="industrial_taxonomy") | ||||||||
class NomisTables(FlowSpec): | ||||||||
"""Flow to collect APS / ASHE data from NOMIS | ||||||||
|
||||||||
Attributes: | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
url_list: list of urls to collect and process | ||||||||
params_list: list of parameters to use when collecting and processing the data | ||||||||
nomis_table: clean dataset combining all nomis data | ||||||||
nomis_dict: dictionary with the nomis data | ||||||||
""" | ||||||||
|
||||||||
# Type hints | ||||||||
url_list: list | ||||||||
nomis_table: "DataFrame" | ||||||||
nomis_dict: List[Dict[str, Union[str, float]]] | ||||||||
|
||||||||
@step | ||||||||
def start(self): | ||||||||
"""Read the urls and parameters for fetching and processing""" | ||||||||
|
||||||||
self.urls = [_APS_URL, _ASHE_URL] | ||||||||
self.params = [_APS_PARAMS, _ASHE_PARAMS] | ||||||||
|
||||||||
self.next(self.fetch_process) | ||||||||
|
||||||||
@step | ||||||||
def fetch_process(self): | ||||||||
"""Fetch and process the data""" | ||||||||
|
||||||||
import pandas as pd | ||||||||
from utils import process_nomis | ||||||||
from industrial_taxonomy.pipeline.official.utils import get | ||||||||
|
||||||||
self.nomis_table = pd.concat( | ||||||||
[ | ||||||||
process_nomis(pd.read_csv(BytesIO(get(url).content)), **params) | ||||||||
for url, params in zip(self.urls, self.params) | ||||||||
] | ||||||||
) | ||||||||
|
||||||||
self.next(self.end) | ||||||||
|
||||||||
@step | ||||||||
def end(self): | ||||||||
"""Save nomis table as a dict""" | ||||||||
self.nomis_dict = self.nomis_table.to_dict(orient="records") | ||||||||
|
||||||||
|
||||||||
if __name__ == "__main__": | ||||||||
NomisTables() |
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,38 @@ | ||||||||
"""Utilities to fetch Nomis data""" | ||||||||
|
||||||||
import pandas as pd | ||||||||
|
||||||||
|
||||||||
def process_nomis( | ||||||||
df: pd.DataFrame, | ||||||||
indicator_name: str, | ||||||||
value_column: str, | ||||||||
source: str, | ||||||||
indicator_column: str = "MEASURES_NAME", | ||||||||
): | ||||||||
"""Process nomis data | ||||||||
|
||||||||
Arguments: | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
df: nomis table | ||||||||
indicator_name: name of indicator | ||||||||
value_column: value column | ||||||||
source: data source | ||||||||
indicator_column: column that contains the indicator | ||||||||
|
||||||||
Returns: | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
A clean table with secondary data | ||||||||
""" | ||||||||
return ( | ||||||||
df.query(f"{indicator_column}=='{indicator_name}'")[ | ||||||||
["DATE", "GEOGRAPHY_NAME", "GEOGRAPHY_CODE", value_column, "OBS_VALUE"] | ||||||||
] | ||||||||
.reset_index(drop=True) | ||||||||
.rename(columns={"OBS_VALUE": "VALUE", value_column: "VARIABLE"}) | ||||||||
.assign(source=source) | ||||||||
.rename(columns=str.lower) | ||||||||
.assign( | ||||||||
date=lambda df: [ # We parse APS dates returned in format "y-m" | ||||||||
d if type(d) == int else int(d.split("-")[0]) for d in df["date"] | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Not requesting the change but] it may have been a better choice to separate things out a little more if the logic between processing APS and ASHE differs rather than having conditional logic embedded deep within the processing |
||||||||
] | ||||||||
) | ||||||||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add equivalents for nomis, gdp and population?