Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use DODataset to download a dataset from DO #1594

31 changes: 9 additions & 22 deletions cartoframes/data/observatory/catalog/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from abc import ABC

from ...clients.bigquery_client import BigQueryClient
from carto.do_dataset import DODataset
from ....utils.logger import log
from ....exceptions import DOError

Expand Down Expand Up @@ -126,35 +127,21 @@ def _download(self, credentials, file_path=None, limit=None, order_by=None):
if not self._is_available_in('bq'):
raise DOError('{} is not ready for Download. Please, contact us for more information.'.format(self))

bq_client = _get_bigquery_client(credentials)
dgaubert marked this conversation as resolved.
Show resolved Hide resolved

full_remote_table_name = self._get_remote_full_table_name(
bq_client.bq_project,
bq_client.bq_dataset,
bq_client.bq_public_project
)

project, dataset, table = full_remote_table_name.split('.')

column_names = bq_client.get_table_column_names(project, dataset, table)

query = 'SELECT * FROM `{}`'.format(full_remote_table_name)
if order_by:
query = '{} ORDER BY {}'.format(query, order_by)
if limit:
query = '{} LIMIT {}'.format(query, limit)

job = bq_client.query(query)

auth_client = credentials.get_api_key_auth_client()
rows = DODataset(auth_client=auth_client).name(self.id).download_stream(limit=limit, order_by=order_by)
if file_path:
bq_client.download_to_file(job, file_path, column_names=column_names)
with open(file_path, 'w') as csvfile:
for row in rows:
csvfile.write(row.decode('utf-8'))

log.info('Data saved: {}'.format(file_path))
if self.__class__.__name__ == 'Dataset':
log.info(_DATASET_READ_MSG.format(file_path))
elif self.__class__.__name__ == 'Geography':
log.info(_GEOGRAPHY_READ_MSG.format(file_path))
else:
return bq_client.download_to_dataframe(job)
dataframe = pd.read_csv(rows)
return dataframe

def _is_available_in(self, platform=_PLATFORM_BQ):
return self.data['available_in'] and platform in self.data['available_in']
Expand Down