Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk download #171

Closed
wants to merge 3 commits into from
Closed

Bulk download #171

wants to merge 3 commits into from

Conversation

max-sixty
Copy link
Contributor

per #167

Needs some refining, iteration & tests, but please give feedback on approach

return
if timeout_in_seconds:
if datetime.datetime.now() > timeout:
raise TimeoutError

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

F821 undefined name 'TimeoutError'

@max-sixty max-sixty changed the title initial bulk functions Bulk download Apr 29, 2018
bar_format='Waiting for {desc} Elapsed: {elapsed}',
total=10000,
) as progress:
while True:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop can be modified to use .result() with a timeout.

Actually, I think we should refactor https://github.com/pydata/pandas-gbq/blob/master/pandas_gbq/gbq.py#L521-L539 to here, since the way we wait for a query job there is actually how we want to wait for any kind of job.

):

table_name = uuid.uuid4().hex[:6]
create_job = self.create_table_from_query(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing a "create_table_from_query" function? As far as I know all query results do end up making their way to a (temporary) destination table anyway, so I think this is unnecessary.

If we do want such logic, we should set a destination table with an expiration time so that BigQuery cleans it up for us.

Copy link
Contributor Author

@max-sixty max-sixty May 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Do you happen to know whether the temp table is easily available from a QueryJob? It might just be .destination_table

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BigQuery team doesn't want people querying the temporary tables (it used to be possible but they changed it to fail those queries). If all you are interested in is the number of rows, you can get that from the table resource or the getQueryResults response.

See:

"""
reads an entire table from gbq into a dataframe
"""
from google.cloud import storage
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new dependency. We need to

  • add google-cloud-storage as an explicit dependency to setup.py
  • add an import check to _test_google_api_imports()
  • update the scopes in the credential fetching to allow Google Cloud Storage (or possibly expand to full GCP access)


return pd.concat(frames, ignore_index=True)

def read_gbq_bulk(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's going to be some threshold for query results size where it's going to always make sense to do this (not sure exactly what that threshold is right now). For we could look at the number of rows times the number of columns in the query results to figure out which method to use.

Though, since this requires a staging bucket and regular queries don't, you're probably right that it should be explicitly set. I'd like to see us refactor the read_gbq method a bit so that we could reuse the auth & query running portion without the query downloading portion.

One option might be: add a storage_bucket argument to read_gbq. If set, use the bulk download method, otherwise use current method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, agree on the tradeoffs. I think we should at least start by being explicit (maybe a user warning for big queries?), and could then think about defaulting

I think the cleavage is around 500k rows

from time import sleep

import numpy as np
import pandas as pd
from google.cloud.bigquery.job import ExtractJobConfig
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should go inside the method where it's used so that _test_google_api_imports() has a chance to run.

import warnings
from datetime import datetime
from io import BytesIO
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we need to use the six package for Python 2.7 support.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll check. I had thought BytesIO was the same in both

@max-sixty
Copy link
Contributor Author

Also worth noting that this is a bit of a hack - ideally there would be an API method that would return some efficient blob of columnar data - parquet / arrow (or even CSV / efficient JSON)

...rather than us composing a three step pipeline with ~30s of overhead

@max-sixty
Copy link
Contributor Author

@tswast Sorry for leaving this hanging, especially after all your feedback.

Do you think it's worth finishing off, vs improving the HTTP downloads further, or working on the latency issues?

We run this internally v successfully, but I'm not sure if people are pulling 1m+ rows. And some advantages to waiting and seeing if there are alternative download methods in the future.

@tswast
Copy link
Collaborator

tswast commented Aug 22, 2018

My preference right now is to wait and see for alternatives.

@smith-m
Copy link

smith-m commented Aug 30, 2018

@tswast What type of alternative to this process are you expecting to emerge? We also use a similar process internally (table to gcs csv - optionally gzipped - to local - to dataframe). I've been looking for existing alternatives for loading large queries to pandas/dask dataframes and there doesn't seem to be a well documented consensus solution. Meanwhile everyone loading large queries from BQ to pandas either waits for hours to process, has written their own internal solution, or just avoids large datasets.

One potential alternative I haven't seem documented but may be comparable is using the GCP recommended Simba ODBC / JDBC and pyodbc/JayDeBeApi - has anyone explored the performance of this?

@tswast
Copy link
Collaborator

tswast commented Aug 30, 2018

ODBC performance suffers the same issues. The problem is that the tabledata.list call is slow, and the ODBC driver and this module both use that method to get query results.

@tswast tswast mentioned this pull request Aug 31, 2018
9 tasks
Copy link

@danqrodney danqrodney left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tswast @max-sixty This change looks very useful; I added a few suggestions.


df = self.read_gbq_table(
dataset=dataset,
table=table_name,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should bucket be passed in here?

frames.append(pd.read_csv(s, compression='gzip'))
blob.delete()

return pd.concat(frames, ignore_index=True)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reads all the data into memory at once, right? In cases where the data size is too large to read into memory at once, it may make sense to run some of the above logic (everything before 376 I think), but not construct a data frame from the resulting files. (Clients may instead want to read individual files, or use Dask.)

Would it be possible to refactor the code to make that possible?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be interesting. Feel free to clone this PR and make this change. I agree this would be a good initial step for Dask integration.

from google.cloud import storage

storage = storage.Client(self.project, self.credentials)
prefix = 'gbq-exports/{}/{}/'.format(dataset, table)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add an optional directory prefix parameter to this method that defaults to 'gbq-exports'?

@max-sixty
Copy link
Contributor Author

@danqrodney thanks for the suggestions.

@tswast and I decided that waiting for improvements on the BigQuery side was worthwhile; those would enable both large & small batches to be significantly faster

That said, if anyone needs this urgently, they are welcome to finish this off - it's fairly close.

@smith-m
Copy link

smith-m commented Oct 6, 2018

@max-sixty what needs to be done to finish this off? I'm happy to contribute - I see this as a very valuable improvement. There is no documented timeline or even acknowledgement by gcp that improvements to the tabledata api are being worked on. For many datasets, loading through gcs is the difference in being able to use pandas-gbq as a part of a workflow, or not at all.

@max-sixty
Copy link
Contributor Author

@smith-m would definitely accept this as a PR

I think the main things to do are @tswast 's comment, particularly around adding the GCS dependencies and scopes.

I'll also paste all our internal code in a comment below in case there's anything I missed (I may have forgotten to upload create_table_from_query

@max-sixty
Copy link
Contributor Author

Our internal code, which may have some updates since I posted the PR draft:

import datetime
import logging
import os
import random
import tempfile
import time
import uuid
from collections import OrderedDict
from io import BytesIO

import google
import pandas as pd
import pandas_gbq.gbq as gbq
import xarray as xr
from google.cloud import bigquery
from google.cloud.bigquery import CopyJobConfig
from google.cloud.bigquery.job import ExtractJobConfig
from tqdm import tqdm

from sixty.decorators import memory_cache
from sixty.hooks.google_cloud import GOOGLE_CLOUD_PROJECT

logger = logging.getLogger(__name__)
# write disposition
if_exists_map = dict(
    replace='WRITE_TRUNCATE', append='WRITE_APPEND', fail='WRITE_EMPTY')


@memory_cache
def client(project=None, credentials=None):
    return bigquery.Client(project=project, credentials=credentials)


def table(table=None, dataset=None):
    # While generally bad practice to write thin wrappers around external
    # pacakges, this is a frequently used enough and bad API that it's
    # worthwhile
    # ref https://github.com/GoogleCloudPlatform/google-cloud-python/issues/6055  # noqa
    c = client()
    table_ref = c.dataset(dataset).table(table)
    return c.get_table(table_ref)


def create_dataset(dataset, project=None):
    # shouldn't need wrapper for this but
    # https://github.com/GoogleCloudPlatform/google-cloud-python/issues/6055
    c = client(project=project)
    dataset = bigquery.Dataset(c.dataset(dataset))
    dataset.location = 'US'
    try:
        return c.create_dataset(dataset)
    except google.api_core.exceptions.Conflict:
        logger.debug("Dataset {} already created".format(dataset))


def copy_table(table, dataset, project_dest=None, block=True):
    """
    Copies a table from the current project to another, maintaining its name
    and dataset
    """

    # partly from
    # https://cloud.google.com/bigquery/docs/managing-tables#copying_a_single_source_table  # noqa
    c = client()

    source_table = c.dataset(dataset).table(table)
    dest_table = (
        c
        .dataset(dataset, project=project_dest)
        .table(table)
    )

    # annoying API
    config = CopyJobConfig()
    config.create_disposition = 'CREATE_IF_NEEDED'
    config.write_disposition = 'WRITE_TRUNCATE'

    job = c.copy_table(source_table, dest_table, job_config=config)
    if block:
        job.result()

    # can't return job in an airflow task because it's not pickle-able


def create_table_from_query(
        query, dataset, table, block=False,
        if_exists='fail', project=None, credentials=None, **kwargs):
    """
    Create a bigquery table from a query

    Parameters
    ----------
    query : str
        SQL-Like Query to return data values
    dataset : str
        dataset id
    table : str
        name of table
    block : bool (default False)
    if_exists : str (default: 'fail')
        append - Specifies that rows may be appended to an existing table
        fail - Specifies that the output table must be empty
        replace - Specifies that write should replace a table
    project : str (default to env var GOOGLE_CLOUD_PROJECT)
        Google BigQuery Account project ID.
    credentials : GoogleCredentials (optional)
        Name of result column to use for index in results DataFrame

    Returns
    -------
    job: google.cloud.bigquery.job.QueryJob
        Returns the inserted QueryJob

    from https://stackoverflow.com/questions/14622526
    """
    c = client(project, credentials)
    create_dataset(dataset)
    config = bigquery.job.QueryJobConfig()
    config.use_legacy_sql = False
    config.allow_large_results = True
    config.destination = c.dataset(dataset).table(table)
    config.write_disposition = if_exists_map[if_exists]
    job = c.query(query=query, job_config=config)
    if block:
        wait_for_job(job)
    return job


def load_table(source, dataset, table, block=False,
               if_exists='fail', project=None, credentials=None):
    """
    Load a table into BigQuery

    Currently implemented only for loading a
    JSON file from disk; we'll extend to other cases as needed

    source : file-like
    dataset : str
        dataset name
    table : str
        destination table name
    if_exists : str (default: 'fail')
        append - Specifies that rows may be appended to an existing table
        fail - Specifies that the output table must be empty
        replace - Specifies that write should replace a table
    """
    c = client(project, credentials)
    create_dataset(dataset)
    config = bigquery.job.LoadJobConfig()
    # currently implemented for JSON; extend when we use for CSV
    config.autodetect = True
    config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
    config.write_disposition = if_exists_map[if_exists]

    destination = c.dataset(dataset).table(table)
    # currently implemented for files, extent when we use for GCS
    job = c.load_table_from_file(
        source, destination=destination, job_config=config)
    if block:
        wait_for_job(job)
    return job


def list_tables(dataset, project=None):
    """
    List tables in a BigQuery dataset.

    Parameters
    ----------
    dataset : str
        Google BigQuery dataset ID.
    project : str
        Google BigQuery Account project ID.

    Returns
    -------
    list
    """
    c = client(project)
    dataset = c.dataset(dataset)
    return [table.table_id for table in c.list_tables(dataset)]


def read_gbq(query, project=None, private_key=None, **kwargs):
    """
    Execute SQL on BigQuery, returning a DataFrame.

    Parameters
    ----------
    query : str
        SQL-Like Query to return data values
    project : str (default to env var GOOGLE_CLOUD_PROJECT)
        Google BigQuery Account project ID.
    index_col : str (optional)
        Name of result column to use for index in results DataFrame
    col_order : list(str) (optional)
        List of BigQuery column names in the desired order for results
        DataFrame
    **kwargs : Arbitrary keyword arguments
        configuration (dict): query config parameters for job processing.
        For example:
            configuration = {'query': {'useQueryCache': False}}
        For more information see `BigQuery SQL Reference
        <https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query>`
        .. versionadded:: 0.20.0
    Returns
    -------
    df: DataFrame
        DataFrame representing results of query
    """

    return pd.read_gbq(query, project_id=project, dialect='standard',
                       private_key=private_key, **kwargs)


def _write_temp_file(df, filename='df.csv'):
    path = tempfile.mkdtemp()
    file_path = os.path.join(path, filename)
    df.to_csv(file_path, index=False, header=False,
              encoding='utf-8', date_format='%Y-%m-%d %H:%M')
    return file_path


def _bq_schema(df, overrides=None):
    fields = gbq._generate_bq_schema(df)['fields']

    #  maintain ordering but update with overrides
    fields = OrderedDict([(x['name'], x['type']) for x in fields])
    fields.update(overrides or {})

    schema = [bigquery.schema.SchemaField(x, y)
              for x, y in fields.items()]
    return schema


def wait_for_job(job, timeout_in_seconds=None):
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/bigquery/cloud-client/snippets.py
    if timeout_in_seconds:
        start = datetime.datetime.now()
        timeout = start + datetime.timedelta(0, timeout_in_seconds)

    with tqdm(
        bar_format='Waiting for {desc} Elapsed: {elapsed}',
        total=10000,
    ) as progress:
        while True:
            job.reload()  # Refreshes the state via a GET request.
            progress.set_description(str(job))
            if job.state == 'DONE':
                if job.error_result:
                    raise RuntimeError(job.errors)
                progress.bar_format = 'Completed {desc}. Elapsed: {elapsed}'
                return
            if timeout_in_seconds:
                if datetime.datetime.now() > timeout:
                    raise SystemError('Timed out after {} seconds'.format(
                        timeout_in_seconds))
            time.sleep(1)


def write_gbq(df, dataset=None, table=None, project=None,
              credentials=None, block=False, if_exists='fail',
              schema_overrides=None, suffix=None, dataset_id=None,
              table_name=None):
    """Write a DataFrame to a Google BigQuery table.
    Parameters
    ----------
    df : DataFrame
        DataFrame to be written
    dataset : str
        Dataset ID to contain the table
    table : str
        Name of table to be written
    project : str (default to env var GOOGLE_CLOUD_PROJECT)
        Google BigQuery Account project ID.
    credentials : GoogleCredentials
    block : boolean (optional)
        Return after completed writing into BigQuery
    if_exists : {'fail', 'replace', 'append'}, default 'fail'
        'fail': If table exists, raise.
        'replace': If table exists, drop it, recreate it, and insert data.
        'append': If table exists, insert data. Create if does not exist.
    schema_overrides: dict
        Mapping of column name to BQ data type for overriding auto-gen schema
    suffix : str
        Table suffix, e.g. '20180417'
    """
    assert isinstance(df, pd.DataFrame)

    dataset = dataset or dataset_id
    table = table or table_name

    if suffix is not None:
        table = '{}_{}'.format(table, suffix)

    c = client(credentials=credentials, project=project)
    dataset = c.dataset(dataset)
    table = dataset.table(table)

    # drop the index if it is unnamed
    drop_index = df.index.name is None
    df = df.reset_index(drop=drop_index)

    # Sometimes the column names are coming back as string,
    # numpy.string_, unicode, or numpy.unicode_.  Some of those
    # break parquet so ensure it's all string.
    df.columns = df.columns.astype(str)

    config = bigquery.job.LoadJobConfig()
    config.write_disposition = if_exists_map[if_exists]
    config.schema = _bq_schema(df, schema_overrides)

    job = c.load_table_from_dataframe(
        dataframe=df,
        destination=table,
        job_config=config)

    if block:
        wait_for_job(job)

    return job


DEFAULT_BUCKET = '{}-temp'.format(GOOGLE_CLOUD_PROJECT)


def export_table_to_gcs(
    dataset,
    table,
    project=None,
    credentials=None,
    timeout_in_seconds=None,
    bucket=None,
    blob=None,
    zipped=True,
):
    """
    export table to gcs.  returns tuple of (bucket, blob)
    """
    c = client(project, credentials)
    table_ref = c.dataset(dataset).table(table)
    job_config = ExtractJobConfig()
    job_config.compression = 'GZIP' if zipped else 'NONE'
    bucket = bucket or '{}-temp'.format(c.project)
    blob = blob or '{}/{}.csv'.format(dataset, table)
    if zipped and not blob.endswith('.gz'):
        blob += '.gz'
    if not isinstance(bucket, str):
        bucket = bucket.name
    destination_uri = 'gs://{}/{}'.format(bucket, blob)
    extract_job = c.extract_table(
        table_ref, destination_uri, job_config=job_config)
    wait_for_job(extract_job, timeout_in_seconds=timeout_in_seconds)
    logger.debug('Exported {}.{} -> {}'.format(
        dataset, table, destination_uri))
    return bucket, blob


def read_gbq_table(
        dataset,
        table,
        project=None,
        credentials=None,
        timeout_in_seconds=600,
        bucket=DEFAULT_BUCKET,
):
    """
    reads an entire table from gbq into a dataframe
    """
    from sixty.hooks import gcs

    c = gcs.client(project, credentials)
    rnd = random.randint(0, 1e9)
    prefix = 'gbq-exports/{}/{}/{}/'.format(dataset, table, rnd)
    bucket = c.get_bucket(bucket)

    for old_blob in bucket.list_blobs(prefix=prefix):
        old_blob.delete()
        logger.warning('Blob {} unexpectedly exists'.format(old_blob.name))

    export_table_to_gcs(
        dataset=dataset,
        table=table,
        timeout_in_seconds=timeout_in_seconds,
        bucket=bucket,
        blob='{}*.csv.gz'.format(prefix),
        project=project,
    )

    def date_columns(schema):
        date_types = ['DATE', 'DATETIME', 'TIMESTAMP', 'TIME']
        for i, field in enumerate(schema):
            if field.field_type in date_types:
                yield i

    schema = get_table(table, dataset).schema
    date_column_n = list(date_columns(schema))

    frames = []

    downloads = tqdm(list(bucket.list_blobs(prefix=prefix)), unit='file')
    for blob in downloads:
        downloads.set_description('Processing {}, {}MB'.format(
            blob, blob.size / 2**20))
        s = BytesIO(blob.download_as_string())
        frames.append(pd.read_csv(s, compression='gzip',
                                  parse_dates=date_column_n))
        blob.delete()

    return pd.concat(frames, ignore_index=True)


def read_gbq_bulk(
    query,
    project=None,
    bucket=None,
    dataset='pandas_bulk',
    credentials=None,
):

    table = uuid.uuid4().hex[:6]
    create_job = create_table_from_query(
        query=query,
        dataset=dataset,
        table=table,
        project=project,
        credentials=credentials,
        block=True,
    )

    df = read_gbq_table(
        dataset=dataset,
        table=table,
        project=project,
    )

    c = client(project=project, credentials=credentials)
    c.delete_table(create_job.destination)

    return df

@tswast
Copy link
Collaborator

tswast commented Apr 9, 2019

I'm closing this in favor of #270. I'm hoping that the BigQuery Storage API means exporting to GCS is now unnecessary.

@tswast tswast closed this Apr 9, 2019
@max-sixty max-sixty deleted the bulk-load branch April 10, 2019 03:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants