Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that mulitpart bodies are always bytes. #1779

Merged
merged 6 commits into from
May 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions gcloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ def upload_from_file(self,
- ``text/csv``.

:type file_obj: file
:param file_obj: A file handle open for reading.
:param file_obj: A file handle opened in binary mode for reading.

:type source_format: str
:param source_format: one of 'CSV' or 'NEWLINE_DELIMITED_JSON'.
Expand Down Expand Up @@ -809,8 +809,9 @@ def upload_from_file(self,
:rtype: :class:`gcloud.bigquery.jobs.LoadTableFromStorageJob`
:returns: the job instance used to load the data (e.g., for
querying status)
:raises: :class:`ValueError` if size is not passed in and can not be
determined
:raises: :class:`ValueError` if ``size`` is not passed in and can not
be determined, or if the ``file_obj`` can be detected to be
a file opened in text mode.
"""
client = self._require_client(client)
connection = client.connection
Expand All @@ -820,6 +821,12 @@ def upload_from_file(self,
if rewind:
file_obj.seek(0, os.SEEK_SET)

mode = getattr(file_obj, 'mode', None)
if mode is not None and mode != 'rb':
raise ValueError(
"Cannot upload files opened in text mode: use "
"open(filename, mode='rb')")

# Get the basic stats about the file.
total_bytes = size
if total_bytes is None:
Expand Down
24 changes: 19 additions & 5 deletions gcloud/bigquery/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1289,6 +1289,19 @@ def _row_data(row):
self.assertEqual(req['path'], '/%s' % PATH)
self.assertEqual(req['data'], SENT)

def test_upload_from_file_text_mode_file_failure(self):

class TextModeFile(object):
mode = 'r'

conn = _Connection()
client = _Client(project=self.PROJECT, connection=conn)
dataset = _Dataset(client)
file_obj = TextModeFile()
table = self._makeOne(self.TABLE_NAME, dataset=dataset)
with self.assertRaises(ValueError):
table.upload_from_file(file_obj, 'CSV', size=1234)

def test_upload_from_file_size_failure(self):
conn = _Connection()
client = _Client(project=self.PROJECT, connection=conn)
Expand Down Expand Up @@ -1351,12 +1364,14 @@ def _upload_from_file_helper(self, **kw):
return conn.http._requested, PATH, BODY

def test_upload_from_file_w_bound_client_multipart(self):
from email.parser import Parser
import json
from six.moves.urllib.parse import parse_qsl
from six.moves.urllib.parse import urlsplit
from gcloud._helpers import _to_bytes
from gcloud.streaming.test_transfer import _email_chunk_parser

requested, PATH, BODY = self._upload_from_file_helper()
parse_chunk = _email_chunk_parser()

self.assertEqual(len(requested), 1)
req = requested[0]
Expand All @@ -1369,18 +1384,17 @@ def test_upload_from_file_w_bound_client_multipart(self):
self.assertEqual(dict(parse_qsl(qs)),
{'uploadType': 'multipart'})

parser = Parser()
ctype, boundary = [x.strip()
for x in req['headers']['content-type'].split(';')]
self.assertEqual(ctype, 'multipart/related')
self.assertTrue(boundary.startswith('boundary="=='))
self.assertTrue(boundary.endswith('=="'))

divider = '--' + boundary[len('boundary="'):-1]
divider = b'--' + _to_bytes(boundary[len('boundary="'):-1])
chunks = req['body'].split(divider)[1:-1] # discard prolog / epilog
self.assertEqual(len(chunks), 2)

text_msg = parser.parsestr(chunks[0].strip())
text_msg = parse_chunk(chunks[0].strip())
self.assertEqual(dict(text_msg._headers),
{'Content-Type': 'application/json',
'MIME-Version': '1.0'})
Expand All @@ -1394,7 +1408,7 @@ def test_upload_from_file_w_bound_client_multipart(self):
self.assertEqual(load_config['destinationTable'], DESTINATION_TABLE)
self.assertEqual(load_config['sourceFormat'], 'CSV')

app_msg = parser.parsestr(chunks[1].strip())
app_msg = parse_chunk(chunks[1].strip())
self.assertEqual(dict(app_msg._headers),
{'Content-Type': 'application/octet-stream',
'Content-Transfer-Encoding': 'binary',
Expand Down
24 changes: 18 additions & 6 deletions gcloud/streaming/test_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@ def test_configure_request_w_simple_wo_body(self):
self.assertEqual(request.loggable_body, '<media body>')

def test_configure_request_w_simple_w_body(self):
from email.parser import Parser
from gcloud._helpers import _to_bytes
from gcloud.streaming.transfer import SIMPLE_UPLOAD
CONTENT = b'CONTENT'
BODY = b'BODY'
Expand All @@ -1045,31 +1045,31 @@ def test_configure_request_w_simple_w_body(self):
self.assertEqual(url_builder.query_params, {'uploadType': 'multipart'})
self.assertEqual(url_builder.relative_path, config.simple_path)

parser = Parser()
self.assertEqual(list(request.headers), ['content-type'])
ctype, boundary = [x.strip()
for x in request.headers['content-type'].split(';')]
self.assertEqual(ctype, 'multipart/related')
self.assertTrue(boundary.startswith('boundary="=='))
self.assertTrue(boundary.endswith('=="'))

divider = '--' + boundary[len('boundary="'):-1]
divider = b'--' + _to_bytes(boundary[len('boundary="'):-1])
chunks = request.body.split(divider)[1:-1] # discard prolog / epilog
self.assertEqual(len(chunks), 2)

text_msg = parser.parsestr(chunks[0].strip())
parse_chunk = _email_chunk_parser()
text_msg = parse_chunk(chunks[0].strip())
self.assertEqual(dict(text_msg._headers),
{'Content-Type': 'text/plain',
'MIME-Version': '1.0'})
self.assertEqual(text_msg._payload, BODY.decode('ascii'))

app_msg = parser.parsestr(chunks[1].strip())
app_msg = parse_chunk(chunks[1].strip())
self.assertEqual(dict(app_msg._headers),
{'Content-Type': self.MIME_TYPE,
'Content-Transfer-Encoding': 'binary',
'MIME-Version': '1.0'})
self.assertEqual(app_msg._payload, CONTENT.decode('ascii'))
self.assertTrue('<media body>' in request.loggable_body)
self.assertTrue(b'<media body>' in request.loggable_body)

def test_configure_request_w_resumable_wo_total_size(self):
from gcloud.streaming.transfer import RESUMABLE_UPLOAD
Expand Down Expand Up @@ -1784,6 +1784,18 @@ def test__send_chunk_w_total_size_stream_exhausted(self):
self.assertEqual(end, SIZE)


def _email_chunk_parser():
import six
if six.PY3: # pragma: NO COVER Python3
from email.parser import BytesParser
parser = BytesParser()
return parser.parsebytes
else:
from email.parser import Parser
parser = Parser()
return parser.parsestr


class _Dummy(object):
def __init__(self, **kw):
self.__dict__.update(kw)
Expand Down
24 changes: 13 additions & 11 deletions gcloud/streaming/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import six
from six.moves import http_client

from gcloud._helpers import _to_bytes
from gcloud.streaming.buffered_stream import BufferedStream
from gcloud.streaming.exceptions import CommunicationError
from gcloud.streaming.exceptions import HttpError
Expand Down Expand Up @@ -849,24 +850,25 @@ def _configure_multipart_request(self, http_request):
msg.set_payload(self.stream.read())
msg_root.attach(msg)

# NOTE: We encode the body, but can't use
# `email.message.Message.as_string` because it prepends
# `> ` to `From ` lines.
# NOTE: We must use six.StringIO() instead of io.StringIO() since the
# `email` library uses cStringIO in Py2 and io.StringIO in Py3.
stream = six.StringIO()
generator = email_generator.Generator(stream, mangle_from_=False)
# NOTE: generate multipart message as bytes, not text
stream = six.BytesIO()
if six.PY3: # pragma: NO COVER Python3
generator_class = email_generator.BytesGenerator
else:
generator_class = email_generator.Generator
generator = generator_class(stream, mangle_from_=False)
generator.flatten(msg_root, unixfrom=False)
http_request.body = stream.getvalue()

multipart_boundary = msg_root.get_boundary()
http_request.headers['content-type'] = (
'multipart/related; boundary="%s"' % multipart_boundary)

body_components = http_request.body.split(multipart_boundary)
headers, _, _ = body_components[-2].partition('\n\n')
body_components[-2] = '\n\n'.join([headers, '<media body>\n\n--'])
http_request.loggable_body = multipart_boundary.join(body_components)
boundary_bytes = _to_bytes(multipart_boundary)
body_components = http_request.body.split(boundary_bytes)
headers, _, _ = body_components[-2].partition(b'\n\n')
body_components[-2] = b'\n\n'.join([headers, b'<media body>\n\n--'])
http_request.loggable_body = boundary_bytes.join(body_components)

def _configure_resumable_request(self, http_request):
"""Helper for 'configure_request': set up resumable request."""
Expand Down