Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk download #171

Closed
wants to merge 3 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
import logging
import os
import time
import uuid
import warnings
from datetime import datetime
from io import BytesIO
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we need to use the six package for Python 2.7 support.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll check. I had thought BytesIO was the same in both

from time import sleep

import numpy as np
import pandas as pd
from google.cloud.bigquery.job import ExtractJobConfig
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should go inside the method where it's used so that _test_google_api_imports() has a chance to run.

from pandas import DataFrame, compat
from pandas.compat import lzip

Expand Down Expand Up @@ -566,6 +570,104 @@ def run_query(self, query, **kwargs):

return schema, result_rows

def export_table_to_gcs(
self,
dataset,
table,
timeout_in_seconds=None,
bucket=None,
blob=None,
zipped=True,
):
"""
export table to gcs. returns tuple of (bucket, blob)
"""
client = self.client
table_ref = client.dataset(dataset).table(table)
job_config = ExtractJobConfig()
job_config.compression = 'GZIP' if zipped else 'NONE'
bucket = bucket or '{}-temp'.format(client.project)
blob = blob or '{}/{}.csv'.format(dataset, table)
if zipped and not blob.endswith('.gz'):
blob += '.gz'
if not isinstance(bucket, str):
bucket = bucket.name
destination_uri = 'gs://{}/{}'.format(bucket, blob)
extract_job = client.extract_table(
table_ref, destination_uri, job_config=job_config)
wait_for_job(extract_job, timeout_in_seconds=timeout_in_seconds)
logger.info('Exported {}.{} -> {}'.format(
dataset,
table,
destination_uri)
)
return bucket, blob

def read_gbq_table(
self,
dataset,
table,
bucket,
timeout_in_seconds=600,
):
"""
reads an entire table from gbq into a dataframe
"""
from google.cloud import storage
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new dependency. We need to

  • add google-cloud-storage as an explicit dependency to setup.py
  • add an import check to _test_google_api_imports()
  • update the scopes in the credential fetching to allow Google Cloud Storage (or possibly expand to full GCP access)


storage = storage.Client(self.project, self.credentials)
prefix = 'gbq-exports/{}/{}/'.format(dataset, table)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add an optional directory prefix parameter to this method that defaults to 'gbq-exports'?

bucket = storage.get_bucket(bucket)

for old_blob in bucket.list_blobs(prefix=prefix):
old_blob.delete()
logger.info('Old Blob Deleted: {}'.format(old_blob.name))

self.export_table_to_gcs(
dataset=dataset,
table=table,
timeout_in_seconds=timeout_in_seconds,
bucket=bucket,
blob='{}*.csv.gz'.format(prefix),
)

frames = []

downloads = tqdm(list(bucket.list_blobs(prefix=prefix)), unit='file')
for blob in downloads:
downloads.set_description('Processing {}, {}MB'.format(
blob, blob.size / 2**20))
s = BytesIO(blob.download_as_string())
frames.append(pd.read_csv(s, compression='gzip'))
blob.delete()

return pd.concat(frames, ignore_index=True)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reads all the data into memory at once, right? In cases where the data size is too large to read into memory at once, it may make sense to run some of the above logic (everything before 376 I think), but not construct a data frame from the resulting files. (Clients may instead want to read individual files, or use Dask.)

Would it be possible to refactor the code to make that possible?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be interesting. Feel free to clone this PR and make this change. I agree this would be a good initial step for Dask integration.


def read_gbq_bulk(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's going to be some threshold for query results size where it's going to always make sense to do this (not sure exactly what that threshold is right now). For we could look at the number of rows times the number of columns in the query results to figure out which method to use.

Though, since this requires a staging bucket and regular queries don't, you're probably right that it should be explicitly set. I'd like to see us refactor the read_gbq method a bit so that we could reuse the auth & query running portion without the query downloading portion.

One option might be: add a storage_bucket argument to read_gbq. If set, use the bulk download method, otherwise use current method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, agree on the tradeoffs. I think we should at least start by being explicit (maybe a user warning for big queries?), and could then think about defaulting

I think the cleavage is around 500k rows

self,
query,
project=None,
bucket=None,
dataset='pandas_bulk',
):

table_name = uuid.uuid4().hex[:6]
create_job = self.create_table_from_query(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing a "create_table_from_query" function? As far as I know all query results do end up making their way to a (temporary) destination table anyway, so I think this is unnecessary.

If we do want such logic, we should set a destination table with an expiration time so that BigQuery cleans it up for us.

Copy link
Contributor Author

@max-sixty max-sixty May 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Do you happen to know whether the temp table is easily available from a QueryJob? It might just be .destination_table

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BigQuery team doesn't want people querying the temporary tables (it used to be possible but they changed it to fail those queries). If all you are interested in is the number of rows, you can get that from the table resource or the getQueryResults response.

See:

query=query,
dataset=dataset,
table=table_name,
block=True,
)

df = self.read_gbq_table(
dataset=dataset,
table=table_name,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should bucket be passed in here?

)

self.client.delete_table(create_job.destination)

return df

def load_data(
self, dataframe, dataset_id, table_id, chunksize=None,
schema=None, progress_bar=True):
Expand Down Expand Up @@ -1023,6 +1125,30 @@ def _generate_bq_schema(df, default_type='STRING'):
return schema.generate_bq_schema(df, default_type=default_type)


def wait_for_job(job, timeout_in_seconds=None):
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/bigquery/cloud-client/snippets.py
if timeout_in_seconds:
start = datetime.datetime.now()
timeout = start + datetime.timedelta(0, timeout_in_seconds)

with tqdm(
bar_format='Waiting for {desc} Elapsed: {elapsed}',
total=10000,
) as progress:
while True:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop can be modified to use .result() with a timeout.

Actually, I think we should refactor https://github.com/pydata/pandas-gbq/blob/master/pandas_gbq/gbq.py#L521-L539 to here, since the way we wait for a query job there is actually how we want to wait for any kind of job.

job.reload() # Refreshes the state via a GET request.
progress.set_description(str(job))
if job.state == 'DONE':
if job.error_result:
raise RuntimeError(job.errors)
progress.bar_format = 'Completed {desc}. Elapsed: {elapsed}'
return
if timeout_in_seconds:
if datetime.datetime.now() > timeout:
raise TimeoutError

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

F821 undefined name 'TimeoutError'

time.sleep(1)


class _Table(GbqConnector):

def __init__(self, project_id, dataset_id, reauth=False, private_key=None):
Expand Down