Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support several variables #1056

Merged
merged 9 commits into from
Oct 8, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 68 additions & 21 deletions cartoframes/data/enrichment/enrichment_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

_ENRICHMENT_ID = 'enrichment_id'
_WORKING_PROJECT = 'carto-do-customers'
_PUBLIC_PROJECT = 'carto-do-public-data'
_PUBLIC_DATASET = 'open_data'


def enrich(query_function, **kwargs):
Expand All @@ -22,9 +24,9 @@ def enrich(query_function, **kwargs):
data_copy = _prepare_data(kwargs['data'], kwargs['data_geom_column'])
tablename = _upload_dataframe(bq_client, user_dataset, data_copy, kwargs['data_geom_column'])

query = _enrichment_query(user_dataset, tablename, query_function, **kwargs)
queries = _enrichment_queries(user_dataset, tablename, query_function, **kwargs)

return _execute_enrichment(bq_client, query, data_copy, kwargs['data_geom_column'])
return _execute_enrichment(bq_client, queries, data_copy, kwargs['data_geom_column'])


def _get_credentials(credentials=None):
Expand Down Expand Up @@ -54,18 +56,29 @@ def _upload_dataframe(bq_client, user_dataset, data_copy, data_geom_column):
return data_tablename


def _enrichment_query(user_dataset, tablename, query_function, **kwargs):
table_data_enrichment, table_geo_enrichment, variables_list = __get_tables_and_variables(kwargs['variables'])
def _enrichment_queries(user_dataset, tablename, query_function, **kwargs):
table_to_geotable, table_to_variables, table_to_project, table_to_dataset =\
__get_tables_and_variables(kwargs['variables'], user_dataset)

filters_str = __process_filters(kwargs['filters'])

return query_function(_ENRICHMENT_ID, filters_str, variables_list, table_data_enrichment,
table_geo_enrichment, user_dataset, _WORKING_PROJECT, tablename, **kwargs)
return query_function(_ENRICHMENT_ID, filters_str, table_to_geotable, table_to_variables, table_to_project,
table_to_dataset, user_dataset, _WORKING_PROJECT, tablename, **kwargs)


def _execute_enrichment(bq_client, queries, data_copy, data_geom_column):

dfs_enriched = list()

for query in queries:
df_enriched = bq_client.query(query).to_dataframe()

def _execute_enrichment(bq_client, query, data_copy, data_geom_column):
df_enriched = bq_client.query(query).to_dataframe()
dfs_enriched.append(df_enriched)

data_copy = data_copy.merge(df_enriched, on=_ENRICHMENT_ID, how='left').drop(_ENRICHMENT_ID, axis=1)
for df in dfs_enriched:
data_copy = data_copy.merge(df, on=_ENRICHMENT_ID, how='left')

data_copy.drop(_ENRICHMENT_ID, axis=1, inplace=True)
data_copy[data_geom_column] = data_copy[data_geom_column].apply(geojson_to_wkt)

return data_copy
Expand Down Expand Up @@ -111,7 +124,7 @@ def __process_filters(filters_dict):
return filters


def __get_tables_and_variables(variables):
def __get_tables_and_variables(variables, user_dataset):

if isinstance(variables, pd.Series):
variables_id = [variables['id']]
Expand All @@ -120,29 +133,63 @@ def __get_tables_and_variables(variables):
else:
raise EnrichmentException('Variable(s) to enrich should be an instance of Series or DataFrame')

table_to_variables = __process_enrichment_variables(variables_id)
table_data_enrichment = list(table_to_variables.keys()).pop()
table_geo_enrichment = __get_name_geotable_from_datatable(table_data_enrichment)
variables_list = list(table_to_variables.values()).pop()
table_to_geotable, table_to_variables, table_to_project, table_to_dataset =\
__process_enrichment_variables(variables_id, user_dataset)

return table_data_enrichment, table_geo_enrichment, variables_list
return table_to_geotable, table_to_variables, table_to_project, table_to_dataset
simon-contreras-deel marked this conversation as resolved.
Show resolved Hide resolved


def __process_enrichment_variables(variables):
def __process_enrichment_variables(variables_id, user_dataset):
table_to_geotable = dict()
table_to_variables = defaultdict(list)
table_to_project = dict()
table_to_dataset = dict()

for variable_id in variables_id:
variable_split = variable_id.split('.')
project, dataset, table, variable = variable_split

if project != _PUBLIC_PROJECT:
table = '{dataset}_{table}'.format(dataset=dataset,
table=table,
user_dataset=user_dataset)

for variable in variables:
variable_split = variable.split('.')
table, variable = variable_split[-2], variable_split[-1]
if table not in table_to_dataset:
if project != _PUBLIC_PROJECT:
table_to_dataset[table] = user_dataset
else:
table_to_dataset[table] = _PUBLIC_DATASET

if table not in table_to_geotable:
geotable = __get_name_geotable_from_datatable(table)

if project != _PUBLIC_PROJECT:
geotable = '{dataset}_{geotable}'.format(dataset=dataset,
geotable=geotable,
user_dataset=user_dataset)

table_to_geotable[table] = geotable

if table not in table_to_project:
if project == _PUBLIC_PROJECT:
table_to_project[table] = _PUBLIC_PROJECT
else:
table_to_project[table] = _WORKING_PROJECT

table_to_variables[table].append(variable)

return table_to_variables
return table_to_geotable, table_to_variables, table_to_project, table_to_dataset


def __get_name_geotable_from_datatable(datatable):

datatable_split = datatable.split('_')
geo_information = datatable_split[2:5]

if len(datatable_split) == 8:
simon-contreras-deel marked this conversation as resolved.
Show resolved Hide resolved
geo_information = datatable_split[3:6]
elif len(datatable_split) == 7:
geo_information = datatable_split[2:5]

geotable = 'geography_{geo_information_joined}'.format(geo_information_joined='_'.join(geo_information))

return geotable
49 changes: 28 additions & 21 deletions cartoframes/data/enrichment/points_enrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,31 @@ def enrich_points(data, variables, data_geom_column='geometry', filters=dict(),
return data_enriched


def _prepare_sql(enrichment_id, filters_processed, variables_processed, enrichment_table,
enrichment_geo_table, user_dataset, working_project, data_table, **kwargs):

sql = '''
SELECT data_table.{enrichment_id},
{variables},
ST_Area(enrichment_geo_table.geom) AS area,
NULL AS population
FROM `{working_project}.{user_dataset}.{enrichment_table}` enrichment_table
JOIN `{working_project}.{user_dataset}.{enrichment_geo_table}` enrichment_geo_table
ON enrichment_table.geoid = enrichment_geo_table.geoid
JOIN `{working_project}.{user_dataset}.{data_table}` data_table
ON ST_Within(data_table.{data_geom_column}, enrichment_geo_table.geom)
{filters};
'''.format(enrichment_id=enrichment_id, variables=', '.join(variables_processed),
enrichment_table=enrichment_table, enrichment_geo_table=enrichment_geo_table,
user_dataset=user_dataset, working_project=working_project,
data_table=data_table, data_geom_column=kwargs['data_geom_column'],
filters=filters_processed)

return sql
def _prepare_sql(enrichment_id, filters_processed, table_to_geotable, table_to_variables,
table_to_project, table_to_dataset, user_dataset, working_project, data_table, **kwargs):

sqls = list()

for table, variables in table_to_variables.items():

sql = '''
SELECT data_table.{enrichment_id},
{variables},
ST_Area(enrichment_geo_table.geom) AS {variables_underscored}_area,
NULL AS {variables_underscored}_population
FROM `{project}.{dataset}.{enrichment_table}` enrichment_table
JOIN `{project}.{dataset}.{enrichment_geo_table}` enrichment_geo_table
ON enrichment_table.geoid = enrichment_geo_table.geoid
JOIN `{working_project}.{user_dataset}.{data_table}` data_table
ON ST_Within(data_table.{data_geom_column}, enrichment_geo_table.geom)
{filters};
'''.format(enrichment_id=enrichment_id, variables=', '.join(variables),
variables_underscored='_'.join(variables), enrichment_table=table,
enrichment_geo_table=table_to_geotable[table], user_dataset=user_dataset,
working_project=working_project, data_table=data_table,
data_geom_column=kwargs['data_geom_column'], filters=filters_processed,
project=table_to_project[table], dataset=table_to_dataset[table])

sqls.append(sql)

return sqls
77 changes: 42 additions & 35 deletions cartoframes/data/enrichment/polygons_enrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,43 +11,50 @@ def enrich_polygons(data, variables, agg_operators, data_geom_column='geometry',
return data_enriched


def _prepare_sql(enrichment_id, filters_processed, variables_processed, enrichment_table,
enrichment_geo_table, user_dataset, working_project, data_table, **kwargs):
def _prepare_sql(enrichment_id, filters_processed, table_to_geotable, table_to_variables,
table_to_project, table_to_dataset, user_dataset, working_project, data_table, **kwargs):

grouper = 'group by data_table.{enrichment_id}'.format(enrichment_id=enrichment_id)

if 'agg_operators' in kwargs:
sqls = list()

for table, variables in table_to_variables.items():

if 'agg_operators' in kwargs:

if isinstance(kwargs['agg_operators'], str):
agg_operators = {variable: kwargs['agg_operators'] for variable in variables}
else:
agg_operators = kwargs['agg_operators']

variables_sql = ['{operator}({variable} * \
(ST_Area(ST_Intersection(enrichment_geo_table.geom, data_table.{data_geom_column}))\
/ ST_area(data_table.{data_geom_column}))) as {variable}'.format(variable=variable,
data_geom_column=kwargs['data_geom_column'],
operator=agg_operators[variable]) for variable in variables]

if isinstance(kwargs['agg_operators'], str):
agg_operators = {variable: kwargs['agg_operators'] for variable in variables_processed}
else:
agg_operators = kwargs['agg_operators']

variables_sql = ['{operator}({variable} * \
(ST_Area(ST_Intersection(enrichment_geo_table.geom, data_table.{data_geom_column}))\
/ ST_area(data_table.{data_geom_column}))) as {variable}'.format(variable=variable,
data_geom_column=kwargs['data_geom_column'],
operator=agg_operators[variable]) for variable in variables_processed]

else:
variables_sql = variables_processed + ['ST_Area(ST_Intersection(geo_table.geom, data_table.{data_geom_column}))\
/ ST_area(data_table.{data_geom_column}) AS measures_proportion'.format(
data_geom_column=kwargs['data_geom_column'])]
grouper = ''

sql = '''
SELECT data_table.{enrichment_id}, {variables}
FROM `{working_project}.{user_dataset}.{enrichment_table}` enrichment_table
JOIN `{working_project}.{user_dataset}.{enrichment_geo_table}` enrichment_geo_table
ON enrichment_table.geoid = enrichment_geo_table.geoid
JOIN `{working_project}.{user_dataset}.{data_table}` data_table
ON ST_Intersects(data_table.{data_geom_column}, enrichment_geo_table.geom)
{filters}
{grouper};
'''.format(enrichment_id=enrichment_id, variables=', '.join(variables_sql),
enrichment_table=enrichment_table, enrichment_geo_table=enrichment_geo_table,
user_dataset=user_dataset, working_project=working_project,
data_table=data_table, data_geom_column=kwargs['data_geom_column'],
filters=filters_processed, grouper=grouper)

return sql
variables_sql = variables + ['ST_Area(ST_Intersection(geo_table.geom, data_table.{data_geom_column}))\
/ ST_area(data_table.{data_geom_column}) AS measures_proportion'.format(
data_geom_column=kwargs['data_geom_column'])]
grouper = ''

sql = '''
SELECT data_table.{enrichment_id}, {variables}
FROM `{project}.{dataset}.{enrichment_table}` enrichment_table
JOIN `{project}.{dataset}.{enrichment_geo_table}` enrichment_geo_table
ON enrichment_table.geoid = enrichment_geo_table.geoid
JOIN `{working_project}.{user_dataset}.{data_table}` data_table
ON ST_Intersects(data_table.{data_geom_column}, enrichment_geo_table.geom)
{filters}
{grouper};
'''.format(enrichment_id=enrichment_id, variables=', '.join(variables_sql),
enrichment_table=table, enrichment_geo_table=table_to_geotable[table],
user_dataset=user_dataset, working_project=working_project,
data_table=data_table, data_geom_column=kwargs['data_geom_column'],
filters=filters_processed, grouper=grouper, project=table_to_project[table],
dataset=table_to_dataset[table])

sqls.append(sql)

return sqls
Loading