From 735609c38d5acd1a04f9c986002c9617331c4af7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Fri, 27 Mar 2020 15:00:23 +0100 Subject: [PATCH] Remove unused BigQueryClient and related tests --- cartoframes/data/clients/bigquery_client.py | 207 ------------------ tests/e2e/data/client/test_bigquery_client.py | 102 --------- .../unit/data/client/test_bigquery_client.py | 80 ------- tests/unit/data/observatory/catalog/mocks.py | 18 -- 4 files changed, 407 deletions(-) delete mode 100644 cartoframes/data/clients/bigquery_client.py delete mode 100644 tests/e2e/data/client/test_bigquery_client.py delete mode 100644 tests/unit/data/client/test_bigquery_client.py delete mode 100644 tests/unit/data/observatory/catalog/mocks.py diff --git a/cartoframes/data/clients/bigquery_client.py b/cartoframes/data/clients/bigquery_client.py deleted file mode 100644 index f9710432b..000000000 --- a/cartoframes/data/clients/bigquery_client.py +++ /dev/null @@ -1,207 +0,0 @@ -import os -import csv -import tqdm -import pandas as pd - -from google.auth.exceptions import RefreshError -from google.cloud import bigquery, storage, bigquery_storage_v1beta1 as bigquery_storage -from google.oauth2.credentials import Credentials as GoogleCredentials -from google.api_core.exceptions import DeadlineExceeded - -from ...auth import get_default_credentials -from ...utils.logger import log -from ...utils.utils import timelogger, is_ipython_notebook -from ...exceptions import DOError - -_GCS_CHUNK_SIZE = 25 * 1024 * 1024 # 25MB. This must be a multiple of 256 KB per the API specification. -_BQS_TIMEOUT = 2 * 3600 # 2 hours in seconds - - -def refresh_clients(func): - def wrapper(self, *args, **kwargs): - try: - return func(self, *args, **kwargs) - except RefreshError: - self._init_clients() - try: - return func(self, *args, **kwargs) - except RefreshError: - raise DOError('Something went wrong accessing data. ' - 'Please, try again in a few seconds or contact support for help.') - return wrapper - - -class BigQueryClient: - - def __init__(self, credentials): - self._credentials = credentials or get_default_credentials() - self.bq_client = None - self.gcs_client = None - self.bq_storage_client = None - - self._gcp_execution_project = None - self.bq_public_project = None - self.bq_project = None - self.bq_dataset = None - self.instant_licensing = None - self._gcs_bucket = None - - self._init_clients() - - def _init_clients(self): - do_credentials = self._credentials.get_do_credentials() - google_credentials = GoogleCredentials(do_credentials.access_token) - - self.bq_client = bigquery.Client( - project=do_credentials.gcp_execution_project, - credentials=google_credentials - ) - - self.gcs_client = storage.Client( - project=do_credentials.bq_project, - credentials=google_credentials - ) - - self.bq_storage_client = bigquery_storage.BigQueryStorageClient( - credentials=google_credentials - ) - - self._gcp_execution_project = do_credentials.gcp_execution_project - self.bq_public_project = do_credentials.bq_public_project - self.bq_project = do_credentials.bq_project - self.bq_dataset = do_credentials.bq_dataset - self.instant_licensing = do_credentials.instant_licensing - self._gcs_bucket = do_credentials.gcs_bucket - - @refresh_clients - def query(self, query, **kwargs): - return self.bq_client.query(query, **kwargs) - - def upload_dataframe(self, dataframe, schema, tablename): - self._upload_dataframe_to_GCS(dataframe, tablename) - self._import_from_GCS_to_BQ(schema, tablename) - - @timelogger - def download_to_file(self, job, file_path, fail_if_exists=False, column_names=None, progress_bar=True): - if fail_if_exists and os.path.isfile(file_path): - raise OSError('The file `{}` already exists.'.format(file_path)) - - try: - rows = self._download_by_bq_storage_api(job) - except Exception: - log.debug('Cannot download using BigQuery Storage API, fallback to standard') - rows = _get_job_result(job, 'Error downloading data') - - try: - _rows_to_file(rows, file_path, column_names, progress_bar) - except DeadlineExceeded: - log.debug('Cannot download using BigQuery Storage API, fallback to standard') - rows = _get_job_result(job, 'Error downloading data') - _rows_to_file(rows, file_path, column_names, progress_bar) - - @timelogger - def download_to_dataframe(self, job): - try: - rows = self._download_by_bq_storage_api(job) - data = list(rows) - return pd.DataFrame(data) - except Exception: - log.debug('Cannot download using BigQuery Storage API, fallback to standard') - - try: - return job.to_dataframe() - except Exception: - if job.errors: - log.error([error['message'] for error in job.errors if 'message' in error]) - - raise DOError('Error downloading data') - - def _download_by_bq_storage_api(self, job, timeout=_BQS_TIMEOUT): - table_ref = job.destination.to_bqstorage() - - parent = 'projects/{}'.format(self._gcp_execution_project) - session = self.bq_storage_client.create_read_session( - table_ref, - parent, - requested_streams=1, - format_=bigquery_storage.enums.DataFormat.AVRO, - # We use a LIQUID strategy because we only read from a - # single stream. Consider BALANCED if requested_streams > 1 - sharding_strategy=(bigquery_storage.enums.ShardingStrategy.LIQUID) - ) - - reader = self.bq_storage_client.read_rows( - bigquery_storage.types.StreamPosition(stream=session.streams[0]), - timeout=timeout - ) - - return reader.rows(session) - - @refresh_clients - @timelogger - def _upload_dataframe_to_GCS(self, dataframe, tablename): - log.debug('Uploading to GCS') - bucket = self.gcs_client.get_bucket(self._gcs_bucket) - blob = bucket.blob(tablename, chunk_size=_GCS_CHUNK_SIZE) - dataframe.to_csv(tablename, index=False, header=False) - try: - blob.upload_from_filename(tablename) - finally: - os.remove(tablename) - - @refresh_clients - @timelogger - def _import_from_GCS_to_BQ(self, schema, tablename): - log.debug('Importing to BQ from GCS') - - dataset_ref = self.bq_client.dataset(self.bq_dataset, project=self.bq_project) - table_ref = dataset_ref.table(tablename) - schema_wrapped = [bigquery.SchemaField(column, dtype) for column, dtype in schema.items()] - - job_config = bigquery.LoadJobConfig() - job_config.schema = schema_wrapped - job_config.source_format = bigquery.SourceFormat.CSV - uri = 'gs://{bucket}/{tablename}'.format(bucket=self._gcs_bucket, tablename=tablename) - - job = self.bq_client.load_table_from_uri( - uri, table_ref, job_config=job_config - ) - - _get_job_result(job, 'Error uploading data') - - def get_table_column_names(self, project, dataset, table): - table_info = self._get_table(project, dataset, table) - return [field.name for field in table_info.schema] - - @refresh_clients - def _get_table(self, project, dataset, table): - full_table_name = '{}.{}.{}'.format(project, dataset, table) - return self.bq_client.get_table(full_table_name) - - -def _rows_to_file(rows, file_path, column_names=None, progress_bar=True): - show_progress_bar = progress_bar and is_ipython_notebook() - - if show_progress_bar: - pb = tqdm.tqdm_notebook(total=rows.total_rows) - - with open(file_path, 'w') as csvfile: - csvwriter = csv.writer(csvfile) - - if column_names: - csvwriter.writerow(column_names) - - for row in rows: - csvwriter.writerow(row.values()) - if show_progress_bar: - pb.update(1) - - -def _get_job_result(job, error_message): - try: - return job.result() - except Exception: - if job.errors: - log.error([error['message'] for error in job.errors if 'message' in error]) - - raise DOError(error_message) diff --git a/tests/e2e/data/client/test_bigquery_client.py b/tests/e2e/data/client/test_bigquery_client.py deleted file mode 100644 index 4b1b308e4..000000000 --- a/tests/e2e/data/client/test_bigquery_client.py +++ /dev/null @@ -1,102 +0,0 @@ -import os -import json -import pytest -import unittest - -from google.auth.exceptions import RefreshError -from google.cloud import bigquery - -from cartoframes.auth import Credentials -from cartoframes.exceptions import DOError -from cartoframes.data.clients.bigquery_client import BigQueryClient - - -_WORKING_PROJECT = 'carto-do-customers' - - -class RefreshTokenChecker(object): - def __init__(self, response, raise_after=1): - self.number_of_calls = 0 - self.response = response - self.raise_after = raise_after - - def query_raiser(self, query, **kwargs): - self.number_of_calls += 1 - if self.number_of_calls < self.raise_after: - return self.response - else: - raise RefreshError() - - -class ResponseMock(list): - def __init__(self, data, **kwargs): - super(ResponseMock, self).__init__(data, **kwargs) - self.total_rows = len(data) - - -class QueryJobMock(object): - def __init__(self, response): - self.response = response - - def result(self): - return ResponseMock(self.response) - - -class TestBigQueryClient(unittest.TestCase): - def setUp(self): - if (os.environ.get('APIKEY') is None or os.environ.get('USERNAME') is None): - creds = json.loads(open('tests/e2e/secret.json').read()) - self.apikey = creds['APIKEY'] - self.username = creds['USERNAME'] - else: - self.apikey = os.environ['APIKEY'] - self.username = os.environ['USERNAME'] - - self.credentials = Credentials(self.username, self.apikey) - self.file_path = '/tmp/test_download.csv' - - def tearDown(self): - if os.path.isfile(self.file_path): - os.remove(self.file_path) - - def test_instantiation(self): - bq_client = BigQueryClient(self.credentials) - assert isinstance(bq_client, BigQueryClient) - - def test_refresh_token_raises_cartoexception(self): - refresh_token_checker = RefreshTokenChecker('', 0) - original_query_method = bigquery.Client.query - bigquery.Client.query = refresh_token_checker.query_raiser - - bq_client = BigQueryClient(self.credentials) - with pytest.raises(DOError): - bq_client.query('select * from') - - bigquery.Client.query = original_query_method - - def test_refresh_token(self): - expected_response = 'ok' - refresh_token_checker = RefreshTokenChecker(expected_response, 2) - original_query_method = bigquery.Client.query - bigquery.Client.query = refresh_token_checker.query_raiser - - bq_client = BigQueryClient(self.credentials) - response = bq_client.query('select * from') - assert response == expected_response - - bigquery.Client.query = original_query_method - - def test_download_using_if_exists(self): - project = _WORKING_PROJECT - dataset = 'fake_dataset' - table = 'fake_table' - file_path = self.file_path - - bq_client = BigQueryClient(self.credentials) - - query = 'SELECT * FROM `{}.{}.{}`'.format(project, dataset, table) - job = bq_client.query(query) - - with open(file_path, 'w'): - with self.assertRaises(OSError): - bq_client.download_to_file(job, file_path, fail_if_exists=True, progress_bar=False) diff --git a/tests/unit/data/client/test_bigquery_client.py b/tests/unit/data/client/test_bigquery_client.py deleted file mode 100644 index b27783179..000000000 --- a/tests/unit/data/client/test_bigquery_client.py +++ /dev/null @@ -1,80 +0,0 @@ -import os -import csv -import pandas as pd - -from unittest.mock import Mock, patch - -from cartoframes.auth import Credentials -from cartoframes.data.clients.bigquery_client import BigQueryClient - - -class ResponseMock(list): - def __init__(self, data, **kwargs): - super(ResponseMock, self).__init__(data, **kwargs) - self.total_rows = len(data) - - -class QueryJobMock(object): - def __init__(self, response): - self.response = response - - def result(self): - return ResponseMock(self.response) - - -class TestBigQueryClient(object): - def setup_method(self): - self.original_init_clients = BigQueryClient._init_clients - BigQueryClient._init_clients = Mock(return_value=(True, True, True)) - self.username = 'username' - self.apikey = 'apikey' - self.credentials = Credentials(self.username, self.apikey) - self.file_path = '/tmp/test_download.csv' - - def teardown_method(self): - self.credentials = None - BigQueryClient._init_clients = self.original_init_clients - - if os.path.isfile(self.file_path): - os.remove(self.file_path) - - @patch.object(BigQueryClient, 'get_table_column_names') - @patch.object(BigQueryClient, '_download_by_bq_storage_api') - def test_download_to_file_full(self, download_mock, column_names_mock): - data = [{'0': 'word', '1': 'word word'}] - columns = ['column1', 'column2'] - - column_names_mock.return_value = Mock(return_value=columns) - download_mock.return_value = data - - file_path = self.file_path - - bq_client = BigQueryClient(self.credentials) - job = QueryJobMock(data) - bq_client.download_to_file(job, file_path, column_names=columns, progress_bar=False) - - rows = [] - with open(file_path) as csvfile: - csvreader = csv.reader(csvfile) - rows.append(next(csvreader)) - rows.append(next(csvreader)) - - assert rows[0] == columns - assert rows[1] == list(data[0].values()) - - @patch.object(BigQueryClient, 'get_table_column_names') - @patch.object(BigQueryClient, '_download_by_bq_storage_api') - def test_download_to_dataframe_full(self, download_mock, column_names_mock): - data = [{'column1': 'word', 'column2': 'word word'}] - columns = ['column1', 'column2'] - - column_names_mock.return_value = Mock(return_value=columns) - download_mock.return_value = data - - expected_df = pd.DataFrame(data, columns=columns) - - bq_client = BigQueryClient(self.credentials) - job = QueryJobMock(data) - df = bq_client.download_to_dataframe(job) - - assert df.equals(expected_df) diff --git a/tests/unit/data/observatory/catalog/mocks.py b/tests/unit/data/observatory/catalog/mocks.py deleted file mode 100644 index e561086d4..000000000 --- a/tests/unit/data/observatory/catalog/mocks.py +++ /dev/null @@ -1,18 +0,0 @@ -class BigQueryClientMock(object): - def __init__(self, exception=None): - self.exception = exception - - self.bq_public_project = 'public_data_project' - self.bq_project = 'user_data_project' - self.bq_dataset = 'username' - self._gcs_bucket = 'bucket_name' - - def query(self, _1): - return True - - def download_to_file(self, _1, _2, column_names=None): - if isinstance(self.exception, Exception): - raise self.exception - - def get_table_column_names(self, _1, _2, _3): - return True