Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 'Table.upload_from_file'. #1318

Merged
merged 9 commits into from
Jan 29, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions gcloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,18 @@ def dataset(self, dataset_name):
"""
return Dataset(dataset_name, client=self)

def _job_from_resource(self, resource):
def job_from_resource(self, resource):

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

"""Detect correct job type from resource and instantiate.

Helper for :meth:`list_jobs`.

:type resource: dict
:param resource: one job resource from API response

:rtype; One of:
:class:`gcloud.bigquery.job.LoadTableFromStorageJob`,
:class:`gcloud.bigquery.job.CopyJob,
:class:`gcloud.bigquery.job.ExtractTableToStorageJob,
:class:`gcloud.bigquery.job.QueryJob,
:class:`gcloud.bigquery.job.RunSyncQueryJob
:class:`gcloud.bigquery.job.CopyJob`,
:class:`gcloud.bigquery.job.ExtractTableToStorageJob`,
:class:`gcloud.bigquery.job.QueryJob`,
:class:`gcloud.bigquery.job.RunSyncQueryJob`
:returns: the job instance, constructed via the resource
"""
config = resource['configuration']
Expand Down Expand Up @@ -176,8 +174,7 @@ def list_jobs(self, max_results=None, page_token=None, all_users=None,
path = '/projects/%s/jobs' % (self.project,)
resp = self.connection.api_request(method='GET', path=path,
query_params=params)
jobs = [self._job_from_resource(resource)
for resource in resp['jobs']]
jobs = [self.job_from_resource(resource) for resource in resp['jobs']]

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

return jobs, resp.get('nextPageToken')

def load_table_from_storage(self, job_name, destination, *source_uris):
Expand Down
243 changes: 243 additions & 0 deletions gcloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,19 @@
"""Define API Datasets."""

import datetime
import json
import os

import six

from gcloud._helpers import _datetime_from_microseconds
from gcloud._helpers import _microseconds_from_datetime
from gcloud._helpers import _millis_from_datetime
from gcloud.exceptions import NotFound
from gcloud.streaming.http_wrapper import Request
from gcloud.streaming.http_wrapper import make_api_request
from gcloud.streaming.transfer import RESUMABLE_UPLOAD
from gcloud.streaming.transfer import Upload
from gcloud.bigquery._helpers import _rows_from_json


Expand Down Expand Up @@ -693,6 +699,225 @@ def insert_data(self,

return errors

@staticmethod

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

def _configure_job_metadata(metadata, # pylint: disable=R0913

This comment was marked as spam.

This comment was marked as spam.

allow_jagged_rows,
allow_quoted_newlines,
create_disposition,
encoding,
field_delimiter,
ignore_unknown_values,
max_bad_records,
quote_character,
skip_leading_rows,
write_disposition):
"""Helper for :meth:`upload_from_file`."""
load_config = metadata['configuration']['load']

if allow_jagged_rows is not None:
load_config['allowJaggedRows'] = allow_jagged_rows

if allow_quoted_newlines is not None:
load_config['allowQuotedNewlines'] = allow_quoted_newlines

if create_disposition is not None:
load_config['createDisposition'] = create_disposition

if encoding is not None:
load_config['encoding'] = encoding

if field_delimiter is not None:
load_config['fieldDelimiter'] = field_delimiter

if ignore_unknown_values is not None:
load_config['ignoreUnknownValues'] = ignore_unknown_values

if max_bad_records is not None:
load_config['maxBadRecords'] = max_bad_records

if quote_character is not None:
load_config['quote'] = quote_character

if skip_leading_rows is not None:
load_config['skipLeadingRows'] = skip_leading_rows

This comment was marked as spam.

This comment was marked as spam.

if write_disposition is not None:
load_config['writeDisposition'] = write_disposition

def upload_from_file(self, # pylint: disable=R0913,R0914

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

file_obj,
source_format,
rewind=False,
size=None,
num_retries=6,
allow_jagged_rows=None,
allow_quoted_newlines=None,
create_disposition=None,
encoding=None,
field_delimiter=None,
ignore_unknown_values=None,
max_bad_records=None,
quote_character=None,
skip_leading_rows=None,
write_disposition=None,
client=None):
"""Upload the contents of this table from a file-like object.

The content type of the upload will either be
- The value passed in to the function (if any)
- ``text/csv``.

:type file_obj: file
:param file_obj: A file handle open for reading.

:type source_format: string
:param source_format: one of 'CSV' or 'NEWLINE_DELIMITED_JSON'.
job configuration option; see
:meth:`gcloud.bigquery.job.LoadJob`

:type rewind: boolean
:param rewind: If True, seek to the beginning of the file handle before
writing the file to Cloud Storage.

:type size: int
:param size: The number of bytes to read from the file handle.
If not provided, we'll try to guess the size using
:func:`os.fstat`. (If the file handle is not from the
filesystem this won't be possible.)

:type num_retries: integer
:param num_retries: Number of upload retries. Defaults to 6.

:type allow_jagged_rows: boolean
:param allow_jagged_rows: job configuration option; see
:meth:`gcloud.bigquery.job.LoadJob`

:type allow_quoted_newlines: boolean
:param allow_quoted_newlines: job configuration option; see
:meth:`gcloud.bigquery.job.LoadJob`

:type create_disposition: string
:param create_disposition: job configuration option; see
:meth:`gcloud.bigquery.job.LoadJob`

:type encoding: string
:param encoding: job configuration option; see
:meth:`gcloud.bigquery.job.LoadJob`

:type field_delimiter: string
:param field_delimiter: job configuration option; see
:meth:`gcloud.bigquery.job.LoadJob`

:type ignore_unknown_values: boolean
:param ignore_unknown_values: job configuration option; see
:meth:`gcloud.bigquery.job.LoadJob`

:type max_bad_records: integer
:param max_bad_records: job configuration option; see
:meth:`gcloud.bigquery.job.LoadJob`

:type quote_character: string
:param quote_character: job configuration option; see
:meth:`gcloud.bigquery.job.LoadJob`

:type skip_leading_rows: integer
:param skip_leading_rows: job configuration option; see
:meth:`gcloud.bigquery.job.LoadJob`

:type write_disposition: string
:param write_disposition: job configuration option; see
:meth:`gcloud.bigquery.job.LoadJob`

:type client: :class:`gcloud.storage.client.Client` or ``NoneType``
:param client: Optional. The client to use. If not passed, falls back
to the ``client`` stored on the current dataset.

:rtype: :class:`gcloud.bigquery.jobs.LoadTableFromStorageJob`
:returns: the job instance used to load the data (e.g., for
querying status)
:raises: :class:`ValueError` if size is not passed in and can not be
determined
"""

This comment was marked as spam.

client = self._require_client(client)
connection = client.connection
content_type = 'application/octet-stream'

# Rewind the file if desired.
if rewind:
file_obj.seek(0, os.SEEK_SET)

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.


# Get the basic stats about the file.
total_bytes = size
if total_bytes is None:
if hasattr(file_obj, 'fileno'):
total_bytes = os.fstat(file_obj.fileno()).st_size
else:
raise ValueError('total bytes could not be determined. Please '
'pass an explicit size.')

This comment was marked as spam.

headers = {
'Accept': 'application/json',
'Accept-Encoding': 'gzip, deflate',
'User-Agent': connection.USER_AGENT,

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

'content-type': 'application/json',
}

metadata = {
'configuration': {
'load': {
'sourceFormat': source_format,
'schema': {
'fields': _build_schema_resource(self._schema),
},
'destinationTable': {
'projectId': self._dataset.project,
'datasetId': self._dataset.name,
'tableId': self.name,
}
}
}
}

self._configure_job_metadata(metadata, allow_jagged_rows,
allow_quoted_newlines, create_disposition,
encoding, field_delimiter,
ignore_unknown_values, max_bad_records,
quote_character, skip_leading_rows,
write_disposition)

upload = Upload(file_obj, content_type, total_bytes,
auto_transfer=False)

url_builder = _UrlBuilder()
upload_config = _UploadConfig()

# Temporary URL, until we know simple vs. resumable.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

base_url = connection.API_BASE_URL + '/upload'
path = '/projects/%s/jobs' % self._dataset.project

This comment was marked as spam.

This comment was marked as spam.

upload_url = connection.build_api_url(api_base_url=base_url, path=path)

# Use apitools 'Upload' facility.
request = Request(upload_url, 'POST', headers,
body=json.dumps(metadata))

upload.configure_request(upload_config, request, url_builder)

This comment was marked as spam.

query_params = url_builder.query_params
base_url = connection.API_BASE_URL + '/upload'
request.url = connection.build_api_url(api_base_url=base_url,
path=path,
query_params=query_params)
upload.initialize_upload(request, connection.http)

if upload.strategy == RESUMABLE_UPLOAD:
http_response = upload.stream_file(use_chunks=True)
else:
http_response = make_api_request(connection.http, request,
retries=num_retries)
response_content = http_response.content
if not isinstance(response_content,
six.string_types): # pragma: NO COVER Python3

This comment was marked as spam.

This comment was marked as spam.

response_content = response_content.decode('utf-8')
return client.job_from_resource(json.loads(response_content))

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.



def _parse_schema_resource(info):
"""Parse a resource fragment into a schema field.
Expand Down Expand Up @@ -739,3 +964,21 @@ def _build_schema_resource(fields):
info['fields'] = _build_schema_resource(field.fields)
infos.append(info)
return infos


class _UploadConfig(object):

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

"""Faux message FBO apitools' 'configure_request'.
"""

This comment was marked as spam.

This comment was marked as spam.

accept = ['*/*']
max_size = None
resumable_multipart = True
resumable_path = u'/upload/bigquery/v2/projects/{project}/jobs'
simple_multipart = True
simple_path = u'/upload/bigquery/v2/projects/{project}/jobs'


class _UrlBuilder(object):
"""Faux builder FBO apitools' 'configure_request'"""
def __init__(self):
self.query_params = {}
self._relative_path = ''
4 changes: 2 additions & 2 deletions gcloud/bigquery/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,12 @@ def test_dataset(self):
self.assertEqual(dataset.name, DATASET)
self.assertTrue(dataset._client is client)

def test__job_from_resource_unknown_type(self):
def test_job_from_resource_unknown_type(self):
PROJECT = 'PROJECT'
creds = _Credentials()
client = self._makeOne(PROJECT, creds)
with self.assertRaises(ValueError):
client._job_from_resource({'configuration': {'nonesuch': {}}})
client.job_from_resource({'configuration': {'nonesuch': {}}})

def test_list_jobs_defaults(self):
from gcloud.bigquery.job import LoadTableFromStorageJob
Expand Down
Loading