diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f1de2121..bbc0beecc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ## Changed - Remove pandas extension in catalog classes (#1038) +- Download dataset and geographies (#1050) ## [1.0b3] - 2019-08-27 ### Added @@ -282,4 +283,4 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Adds a compression option for write operations ### Fixed -- Fixes file system path creation to be generic to OS \ No newline at end of file +- Fixes file system path creation to be generic to OS diff --git a/cartoframes/auth/credentials.py b/cartoframes/auth/credentials.py index 27033b410..c3b03ea6e 100644 --- a/cartoframes/auth/credentials.py +++ b/cartoframes/auth/credentials.py @@ -12,6 +12,9 @@ from ..__version__ import __version__ +from warnings import filterwarnings +filterwarnings("ignore", category=FutureWarning, module="carto") + if sys.version_info >= (3, 0): from urllib.parse import urlparse else: @@ -208,6 +211,9 @@ def get_do_token(self): return token.access_token + def get_do_dataset(self): + return self._username.replace('-', '_') + def get_api_key_auth_client(self): if not self._api_key_auth_client: self._api_key_auth_client = APIKeyAuthClient( diff --git a/cartoframes/data/clients/bigquery_client.py b/cartoframes/data/clients/bigquery_client.py index 25b0a44ef..b3e608685 100644 --- a/cartoframes/data/clients/bigquery_client.py +++ b/cartoframes/data/clients/bigquery_client.py @@ -1,5 +1,10 @@ from __future__ import absolute_import +import os +import appdirs +import csv +import tqdm + from google.cloud import bigquery from google.oauth2.credentials import Credentials as GoogleCredentials from google.auth.exceptions import RefreshError @@ -8,6 +13,8 @@ from ...auth import get_default_credentials +_USER_CONFIG_DIR = appdirs.user_config_dir('cartoframes') + def refresh_client(func): def wrapper(self, *args, **kwargs): @@ -52,6 +59,54 @@ def upload_dataframe(self, dataframe, schema, tablename, project, dataset): @refresh_client def query(self, query, **kwargs): - response = self.client.query(query, **kwargs) + return self.client.query(query, **kwargs) + + @refresh_client + def get_table(self, project, dataset, table): + full_table_name = '{}.{}.{}'.format(project, dataset, table) + return self.client.get_table(full_table_name) + + def get_table_column_names(self, project, dataset, table): + table_info = self.get_table(project, dataset, table) + return [field.name for field in table_info.schema] + + def download_to_file(self, project, dataset, table, limit=None, offset=None, + file_path=None, fail_if_exists=False, progress_bar=True): + if not file_path: + file_name = '{}.{}.{}.csv'.format(project, dataset, table) + file_path = os.path.join(_USER_CONFIG_DIR, file_name) + + if fail_if_exists and os.path.isfile(file_path): + raise CartoException('The file `{}` already exists.'.format(file_path)) + + column_names = self.get_table_column_names(project, dataset, table) + + query = _download_query(project, dataset, table, limit, offset) + rows_iter = self.query(query).result() + + if progress_bar: + pb = tqdm.tqdm_notebook(total=rows_iter.total_rows) + + with open(file_path, 'w') as csvfile: + csvwriter = csv.writer(csvfile) + + csvwriter.writerow(column_names) + + for row in rows_iter: + csvwriter.writerow(row.values()) + if progress_bar: + pb.update(1) + + return file_path + + +def _download_query(project, dataset, table, limit=None, offset=None): + full_table_name = '`{}.{}.{}`'.format(project, dataset, table) + query = 'SELECT * FROM {}'.format(full_table_name) + + if limit: + query += ' LIMIT {}'.format(limit) + if offset: + query += ' OFFSET {}'.format(offset) - return response + return query diff --git a/cartoframes/data/enrichment/enrichment_service.py b/cartoframes/data/enrichment/enrichment_service.py index 3a1d711a1..c6349b0d5 100644 --- a/cartoframes/data/enrichment/enrichment_service.py +++ b/cartoframes/data/enrichment/enrichment_service.py @@ -16,7 +16,7 @@ def enrich(query_function, **kwargs): credentials = _get_credentials(kwargs['credentials']) - user_dataset = credentials.username.replace('-', '_') + user_dataset = credentials.get_do_dataset() bq_client = _get_bigquery_client(_WORKING_PROJECT, credentials) data_copy = _prepare_data(kwargs['data'], kwargs['data_geom_column']) diff --git a/cartoframes/data/observatory/dataset.py b/cartoframes/data/observatory/dataset.py index 101a25641..7c4407cd2 100644 --- a/cartoframes/data/observatory/dataset.py +++ b/cartoframes/data/observatory/dataset.py @@ -73,3 +73,16 @@ def is_public_data(self): @property def summary(self): return self.data['summary_jsonb'] + + def download(self, credentials=None): + """Download Dataset data. + + Args: + credentials (:py:class:`Credentials `, optional): + credentials of CARTO user account. If not provided, + a default credentials (if set with :py:meth:`set_default_credentials + `) will be attempted to be + used. + """ + + return self._download(credentials) diff --git a/cartoframes/data/observatory/entity.py b/cartoframes/data/observatory/entity.py index d5daa0619..2e6af024e 100644 --- a/cartoframes/data/observatory/entity.py +++ b/cartoframes/data/observatory/entity.py @@ -1,4 +1,12 @@ import pandas as pd +from warnings import warn + +from google.api_core.exceptions import NotFound + +from carto.exceptions import CartoException + +from ..clients.bigquery_client import BigQueryClient +from ...auth import get_default_credentials try: from abc import ABC, abstractmethod @@ -6,6 +14,8 @@ from abc import ABCMeta, abstractmethod ABC = ABCMeta('ABC', (object,), {'__slots__': ()}) +_WORKING_PROJECT = 'carto-do-customers' + class CatalogEntity(ABC): @@ -45,6 +55,32 @@ def __str__(self): def __repr__(self): return '{classname}({entity_id})'.format(classname=self.__class__.__name__, entity_id=self.id) + def _download(self, credentials=None): + credentials = _get_credentials(credentials) + user_dataset = credentials.get_do_dataset() + bq_client = _get_bigquery_client(_WORKING_PROJECT, credentials) + + project, dataset, table = self.id.split('.') + view = 'view_{}_{}'.format(dataset.replace('-', '_'), table) + + try: + file_path = bq_client.download_to_file(_WORKING_PROJECT, user_dataset, view) + except NotFound: + raise CartoException('You have not purchased the dataset `{}` yet'.format(self.id)) + + warn('Data saved: {}.'.format(file_path)) + warn("To read it you can do: `pandas.read_csv('{}')`.".format(file_path)) + + return file_path + + +def _get_credentials(credentials=None): + return credentials or get_default_credentials() + + +def _get_bigquery_client(project, credentials): + return BigQueryClient(project, credentials) + class CatalogList(list): diff --git a/cartoframes/data/observatory/geography.py b/cartoframes/data/observatory/geography.py index 7530a9817..8162e3ad4 100644 --- a/cartoframes/data/observatory/geography.py +++ b/cartoframes/data/observatory/geography.py @@ -52,3 +52,16 @@ def is_public_data(self): @property def summary(self): return self.data['summary_jsonb'] + + def download(self, credentials=None): + """Download Geography data. + + Args: + credentials (:py:class:`Credentials `, optional): + credentials of CARTO user account. If not provided, + a default credentials (if set with :py:meth:`set_default_credentials + `) will attempted to be + used. + """ + + return self._download(credentials) diff --git a/examples/08_data_observatory/download.ipynb b/examples/08_data_observatory/download.ipynb new file mode 100644 index 000000000..a6720e796 --- /dev/null +++ b/examples/08_data_observatory/download.ipynb @@ -0,0 +1,137 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from cartoframes.auth import Credentials\n", + "credentials = Credentials.from_file()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Catalog Dataset" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from cartoframes.data.observatory.catalog import Catalog\n", + "dataset = Catalog().categories.get('financial').datasets.get('{dataset_id}')\n", + "dataset.to_series()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dataset.download(credentials)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Catalog Geography " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from cartoframes.data.observatory.geography import Geography\n", + "geography = Geography.get(dataset.geography)\n", + "geography.to_series()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "file_path = geography.download(credentials)\n", + "file_path" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Upload downloaded csv file to CARTO " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "\n", + "df = pd.read_csv(file_path)\n", + "df.head()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from cartoframes.data import Dataset\n", + "\n", + "Dataset(df).upload(table_name='test_do_geography', credentials=credentials, if_exists='replace')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Visualize it" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from cartoframes.viz import Map, Layer\n", + "Map(Layer('test_do_geography', credentials=credentials))" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/test/data/client/test_bigquery_client.py b/test/data/client/test_bigquery_client.py index ce4be7fe2..37d8f452b 100644 --- a/test/data/client/test_bigquery_client.py +++ b/test/data/client/test_bigquery_client.py @@ -1,15 +1,21 @@ import unittest import os import json +import csv from google.auth.exceptions import RefreshError from carto.exceptions import CartoException from cartoframes.auth import Credentials -from cartoframes.data.clients.bigquery_client import BigQueryClient +from cartoframes.data.clients.bigquery_client import BigQueryClient, _download_query from google.cloud import bigquery +try: + from unittest.mock import Mock +except ImportError: + from mock import Mock + _WORKING_PROJECT = 'carto-do-customers' @@ -27,6 +33,20 @@ def query_raiser(self, query, **kwargs): raise RefreshError() +class ResponseMock(list): + def __init__(self, data, **kwargs): + super(ResponseMock, self).__init__(data, **kwargs) + self.total_rows = len(data) + + +class QueryJobMock(object): + def __init__(self, response): + self.response = response + + def result(self): + return ResponseMock(self.response) + + class TestBigQueryClient(unittest.TestCase): def setUp(self): if (os.environ.get('APIKEY') is None or os.environ.get('USERNAME') is None): @@ -38,6 +58,11 @@ def setUp(self): self.username = os.environ['USERNAME'] self.credentials = Credentials(self.username, self.apikey) + self.file_path = '/tmp/test_download.csv' + + def tearDown(self): + if os.path.isfile(self.file_path): + os.remove(self.file_path) def test_instantiation(self): bq_client = BigQueryClient(_WORKING_PROJECT, self.credentials) @@ -65,3 +90,97 @@ def test_refresh_token(self): self.assertEqual(response, expected_response) bigquery.Client.query = original_query_method + + def test_download_full(self): + data = [{'0': 'word', '1': 'word word'}] + columns = ['column1', 'column2'] + + original_query = BigQueryClient.query + BigQueryClient.query = Mock(return_value=QueryJobMock(data)) + original_get_table_column_names = BigQueryClient.get_table_column_names + BigQueryClient.get_table_column_names = Mock(return_value=columns) + + project = _WORKING_PROJECT + dataset = 'fake_dataset' + table = 'fake_table' + file_path = self.file_path + + bq_client = BigQueryClient(project, self.credentials) + bq_client.download_to_file(project, dataset, table, file_path=file_path, progress_bar=False) + + self.assertTrue(os.path.isfile(file_path)) + + rows = [] + with open(file_path) as csvfile: + csvreader = csv.reader(csvfile) + rows.append(next(csvreader)) + rows.append(next(csvreader)) + + self.assertEqual(rows[0], columns) + self.assertEqual(rows[1], list(data[0].values())) + + BigQueryClient.query = original_query + BigQueryClient.get_table_column_names = original_get_table_column_names + + def test_download_using_if_exists(self): + project = _WORKING_PROJECT + dataset = 'fake_dataset' + table = 'fake_table' + file_path = self.file_path + + bq_client = BigQueryClient(project, self.credentials) + + with open(file_path, 'w'): + with self.assertRaises(CartoException): + bq_client.download_to_file(project, dataset, table, file_path=file_path, + fail_if_exists=True, progress_bar=False) + + +class TestBigQueryClientUnit(unittest.TestCase): + def test_download_query_simple(self): + project = 'fake_project' + dataset = 'fake_dataset' + table = 'fake_table' + limit = None + offset = None + expected_query = 'SELECT * FROM `{}.{}.{}`'.format(project, dataset, table) + + query = _download_query(project, dataset, table, limit, offset) + + self.assertEqual(query, expected_query) + + def test_download_query_limit(self): + project = 'fake_project' + dataset = 'fake_dataset' + table = 'fake_table' + limit = 10 + offset = None + expected_query = 'SELECT * FROM `{}.{}.{}` LIMIT {}'.format(project, dataset, table, limit) + + query = _download_query(project, dataset, table, limit, offset) + + self.assertEqual(query, expected_query) + + def test_download_query_offset(self): + project = 'fake_project' + dataset = 'fake_dataset' + table = 'fake_table' + limit = None + offset = 10 + expected_query = 'SELECT * FROM `{}.{}.{}` OFFSET {}'.format(project, dataset, table, offset) + + query = _download_query(project, dataset, table, limit, offset) + + self.assertEqual(query, expected_query) + + def test_download_query_limit_offset(self): + project = 'fake_project' + dataset = 'fake_dataset' + table = 'fake_table' + limit = 10 + offset = 20 + expected_query = 'SELECT * FROM `{}.{}.{}` LIMIT {} OFFSET {}'.format(project, dataset, table, limit, offset) + + query = _download_query(project, dataset, table, limit, offset) + + self.assertEqual(query, expected_query) diff --git a/test/data/observatory/examples.py b/test/data/observatory/examples.py index 0324736b2..8a0bd1924 100644 --- a/test/data/observatory/examples.py +++ b/test/data/observatory/examples.py @@ -56,7 +56,7 @@ test_geographies = CatalogList([test_geography1, test_geography2]) db_dataset1 = { - 'id': 'basicstats-census', + 'id': 'project.dataset.basicstats-census', 'name': 'Basic Stats - Census', 'description': 'Basic stats on 2019 Spanish census', 'provider_id': 'bbva', diff --git a/test/data/observatory/mocks.py b/test/data/observatory/mocks.py new file mode 100644 index 000000000..0465e4260 --- /dev/null +++ b/test/data/observatory/mocks.py @@ -0,0 +1,17 @@ +class BigQueryClientMock(object): + def __init__(self, response): + self.response = response + + def download_to_file(self, _1, _2, _3): + if isinstance(self.response, Exception): + raise self.response + else: + return self.response + + +class CredentialsMock(object): + def __init__(self, username): + self.username = username + + def get_do_dataset(self): + return self.username.replace('-', '_') diff --git a/test/data/observatory/test_dataset.py b/test/data/observatory/test_dataset.py index 661003cc7..d85c6e0b1 100644 --- a/test/data/observatory/test_dataset.py +++ b/test/data/observatory/test_dataset.py @@ -1,6 +1,10 @@ import unittest import pandas as pd +from google.api_core.exceptions import NotFound + +from carto.exceptions import CartoException + from cartoframes.data.observatory.entity import CatalogList from cartoframes.data.observatory.dataset import Dataset from cartoframes.data.observatory.repository.variable_repo import VariableRepository @@ -8,6 +12,7 @@ from cartoframes.data.observatory.repository.dataset_repo import DatasetRepository from .examples import test_dataset1, test_datasets, test_variables, test_variables_groups, db_dataset1, test_dataset2, \ db_dataset2 +from .mocks import BigQueryClientMock, CredentialsMock try: from unittest.mock import Mock, patch @@ -206,3 +211,39 @@ def test_datasets_are_exported_as_dataframe(self): assert isinstance(dataset_df, pd.DataFrame) assert isinstance(sliced_dataset, pd.Series) assert sliced_dataset.equals(dataset.to_series()) + + @patch.object(DatasetRepository, 'get_by_id') + @patch('cartoframes.data.observatory.entity._get_bigquery_client') + def test_dataset_download(self, mocked_bq_client, mocked_repo): + # mock dataset + mocked_repo.return_value = test_dataset1 + + # mock big query client + file_path = 'fake_path' + mocked_bq_client.return_value = BigQueryClientMock(file_path) + + # test + username = 'fake_user' + credentials = CredentialsMock(username) + + dataset = Dataset.get(test_dataset1.id) + response = dataset.download(credentials) + + assert response == file_path + + @patch.object(DatasetRepository, 'get_by_id') + @patch('cartoframes.data.observatory.entity._get_bigquery_client') + def test_dataset_download_raises_with_nonpurchased(self, mocked_bq_client, mocked_repo): + # mock dataset + mocked_repo.return_value = test_dataset1 + + # mock big query client + mocked_bq_client.return_value = BigQueryClientMock(NotFound('Fake error')) + + # test + username = 'fake_user' + credentials = CredentialsMock(username) + + dataset = Dataset.get(test_dataset1.id) + with self.assertRaises(CartoException): + dataset.download(credentials) diff --git a/test/data/observatory/test_geography.py b/test/data/observatory/test_geography.py index 8eedbc7aa..2e6905c75 100644 --- a/test/data/observatory/test_geography.py +++ b/test/data/observatory/test_geography.py @@ -1,11 +1,16 @@ import unittest import pandas as pd +from google.api_core.exceptions import NotFound + +from carto.exceptions import CartoException + from cartoframes.data.observatory.entity import CatalogList from cartoframes.data.observatory.geography import Geography from cartoframes.data.observatory.repository.geography_repo import GeographyRepository from cartoframes.data.observatory.repository.dataset_repo import DatasetRepository from .examples import test_geography1, test_geographies, test_datasets, db_geography1, test_geography2, db_geography2 +from .mocks import BigQueryClientMock, CredentialsMock try: from unittest.mock import Mock, patch @@ -185,3 +190,39 @@ def test_geographies_are_exported_as_dataframe(self): assert isinstance(geography_df, pd.DataFrame) assert isinstance(sliced_geography, pd.Series) assert sliced_geography.equals(geography.to_series()) + + @patch.object(GeographyRepository, 'get_by_id') + @patch('cartoframes.data.observatory.entity._get_bigquery_client') + def test_dataset_download(self, mocked_bq_client, mocked_repo): + # mock geography + mocked_repo.return_value = test_geography1 + + # mock big query client + file_path = 'fake_path' + mocked_bq_client.return_value = BigQueryClientMock(file_path) + + # test + username = 'fake_user' + credentials = CredentialsMock(username) + + dataset = Geography.get(test_geography1.id) + response = dataset.download(credentials) + + assert response == file_path + + @patch.object(GeographyRepository, 'get_by_id') + @patch('cartoframes.data.observatory.entity._get_bigquery_client') + def test_dataset_download_raises_with_nonpurchased(self, mocked_bq_client, mocked_repo): + # mock geography + mocked_repo.return_value = test_geography1 + + # mock big query client + mocked_bq_client.return_value = BigQueryClientMock(NotFound('Fake error')) + + # test + username = 'fake_user' + credentials = CredentialsMock(username) + + dataset = Geography.get(test_geography1.id) + with self.assertRaises(CartoException): + dataset.download(credentials)