Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Iterator.pages and simplify items iteration #2594

Merged
merged 8 commits into from
Oct 25, 2016
37 changes: 23 additions & 14 deletions bigquery/unit_tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def test_ctor(self):
self.assertIs(client.connection.http, http)

def test_list_projects_defaults(self):
import six
from google.cloud.bigquery.client import Project
PROJECT_1 = 'PROJECT_ONE'
PROJECT_2 = 'PROJECT_TWO'
Expand All @@ -60,8 +61,8 @@ def test_list_projects_defaults(self):
conn = client.connection = _Connection(DATA)

iterator = client.list_projects()
iterator.update_page()
projects = list(iterator.page)
page = six.next(iterator.pages)
projects = list(page)
token = iterator.next_page_token

self.assertEqual(len(projects), len(DATA['projects']))
Expand All @@ -78,6 +79,8 @@ def test_list_projects_defaults(self):
self.assertEqual(req['path'], '/%s' % PATH)

def test_list_projects_explicit_response_missing_projects_key(self):
import six

PROJECT = 'PROJECT'
PATH = 'projects'
TOKEN = 'TOKEN'
Expand All @@ -87,8 +90,8 @@ def test_list_projects_explicit_response_missing_projects_key(self):
conn = client.connection = _Connection(DATA)

iterator = client.list_projects(max_results=3, page_token=TOKEN)
iterator.update_page()
projects = list(iterator.page)
page = six.next(iterator.pages)
projects = list(page)
token = iterator.next_page_token

self.assertEqual(len(projects), 0)
Expand All @@ -102,6 +105,7 @@ def test_list_projects_explicit_response_missing_projects_key(self):
{'maxResults': 3, 'pageToken': TOKEN})

def test_list_datasets_defaults(self):
import six
from google.cloud.bigquery.dataset import Dataset
PROJECT = 'PROJECT'
DATASET_1 = 'dataset_one'
Expand All @@ -128,8 +132,8 @@ def test_list_datasets_defaults(self):
conn = client.connection = _Connection(DATA)

iterator = client.list_datasets()
iterator.update_page()
datasets = list(iterator.page)
page = six.next(iterator.pages)
datasets = list(page)
token = iterator.next_page_token

self.assertEqual(len(datasets), len(DATA['datasets']))
Expand All @@ -145,6 +149,8 @@ def test_list_datasets_defaults(self):
self.assertEqual(req['path'], '/%s' % PATH)

def test_list_datasets_explicit_response_missing_datasets_key(self):
import six

PROJECT = 'PROJECT'
PATH = 'projects/%s/datasets' % PROJECT
TOKEN = 'TOKEN'
Expand All @@ -155,8 +161,8 @@ def test_list_datasets_explicit_response_missing_datasets_key(self):

iterator = client.list_datasets(
include_all=True, max_results=3, page_token=TOKEN)
iterator.update_page()
datasets = list(iterator.page)
page = six.next(iterator.pages)
datasets = list(page)
token = iterator.next_page_token

self.assertEqual(len(datasets), 0)
Expand Down Expand Up @@ -189,6 +195,7 @@ def test_job_from_resource_unknown_type(self):
client.job_from_resource({'configuration': {'nonesuch': {}}})

def test_list_jobs_defaults(self):
import six
from google.cloud.bigquery.job import LoadTableFromStorageJob
from google.cloud.bigquery.job import CopyJob
from google.cloud.bigquery.job import ExtractTableToStorageJob
Expand Down Expand Up @@ -301,8 +308,8 @@ def test_list_jobs_defaults(self):
conn = client.connection = _Connection(DATA)

iterator = client.list_jobs()
iterator.update_page()
jobs = list(iterator.page)
page = six.next(iterator.pages)
jobs = list(page)
token = iterator.next_page_token

self.assertEqual(len(jobs), len(DATA['jobs']))
Expand All @@ -319,6 +326,7 @@ def test_list_jobs_defaults(self):
self.assertEqual(req['query_params'], {'projection': 'full'})

def test_list_jobs_load_job_wo_sourceUris(self):
import six
from google.cloud.bigquery.job import LoadTableFromStorageJob
PROJECT = 'PROJECT'
DATASET = 'test_dataset'
Expand Down Expand Up @@ -356,8 +364,8 @@ def test_list_jobs_load_job_wo_sourceUris(self):
conn = client.connection = _Connection(DATA)

iterator = client.list_jobs()
iterator.update_page()
jobs = list(iterator.page)
page = six.next(iterator.pages)
jobs = list(page)
token = iterator.next_page_token

self.assertEqual(len(jobs), len(DATA['jobs']))
Expand All @@ -374,6 +382,7 @@ def test_list_jobs_load_job_wo_sourceUris(self):
self.assertEqual(req['query_params'], {'projection': 'full'})

def test_list_jobs_explicit_missing(self):
import six
PROJECT = 'PROJECT'
PATH = 'projects/%s/jobs' % PROJECT
DATA = {}
Expand All @@ -384,8 +393,8 @@ def test_list_jobs_explicit_missing(self):

iterator = client.list_jobs(max_results=1000, page_token=TOKEN,
all_users=True, state_filter='done')
iterator.update_page()
jobs = list(iterator.page)
page = six.next(iterator.pages)
jobs = list(page)
token = iterator.next_page_token

self.assertEqual(len(jobs), 0)
Expand Down
184 changes: 60 additions & 124 deletions core/google/cloud/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,78 +43,44 @@
... break

When iterating, not every new item will send a request to the server.
To monitor these requests, track the current page of the iterator::
To iterate based on each page of items (where a page corresponds to
a request)::

>>> iterator = Iterator(...)
>>> iterator.page_number
0
>>> next(iterator)
<MyItemClass at 0x7f1d3cccf690>
>>> iterator.page_number
1
>>> iterator.page.remaining
1
>>> next(iterator)
<MyItemClass at 0x7f1d3cccfe90>
>>> iterator.page_number
1
>>> iterator.page.remaining
0
>>> next(iterator)
<MyItemClass at 0x7f1d3cccffd0>
>>> iterator.page_number
2
>>> iterator.page.remaining
19

It's also possible to consume an entire page and handle the paging process
manually::

>>> iterator = Iterator(...)
>>> # Manually pull down the first page.
>>> iterator.update_page()
>>> items = list(iterator.page)
>>> items
>>> for page in iterator.pages:
... print('=' * 20)
... print(' Page number: %d' % (iterator.page_number,))
... print(' Items in page: %d' % (page.num_items,))
... print(' First item: %r' % (next(page),))
... print('Items remaining: %d' % (page.remaining,))
... print('Next page token: %s' % (iterator.next_page_token,))
====================
Page number: 1
Items in page: 1
First item: <MyItemClass at 0x7f1d3cccf690>
Items remaining: 0
Next page token: eav1OzQB0OM8rLdGXOEsyQWSG
====================
Page number: 2
Items in page: 19
First item: <MyItemClass at 0x7f1d3cccffd0>
Items remaining: 18
Next page token: None

To consume an entire page::

>>> list(page)
[
<MyItemClass at 0x7fd64a098ad0>,
<MyItemClass at 0x7fd64a098ed0>,
<MyItemClass at 0x7fd64a098e90>,
]
>>> iterator.page.remaining
0
>>> iterator.page.num_items
3
>>> iterator.next_page_token
'eav1OzQB0OM8rLdGXOEsyQWSG'
>>>
>>> # Ask for the next page to be grabbed.
>>> iterator.update_page()
>>> list(iterator.page)
[
<MyItemClass at 0x7fea740abdd0>,
<MyItemClass at 0x7fea740abe50>,
]
>>>
>>> # When there are no more results
>>> iterator.next_page_token is None
True
>>> iterator.update_page()
>>> iterator.page is None
True
"""


import six


_UNSET = object()
_NO_MORE_PAGES_ERR = 'Iterator has no more pages.'
_UNSTARTED_ERR = (
'Iterator has not been started. Either begin iterating, '
'call next(my_iter) or call my_iter.update_page().')
_PAGE_ERR_TEMPLATE = (
'Tried to update the page while current page (%r) still has %d '
'items remaining.')
DEFAULT_ITEMS_KEY = 'items'
"""The dictionary key used to retrieve items from each response."""

Expand Down Expand Up @@ -261,7 +227,6 @@ def __init__(self, client, path, item_to_value,
self.page_number = 0
self.next_page_token = page_token
self.num_results = 0
self._page = _UNSET

def _verify_params(self):
"""Verifies the parameters don't use any reserved parameter.
Expand All @@ -274,82 +239,53 @@ def _verify_params(self):
raise ValueError('Using a reserved parameter',
reserved_in_use)

@property
def page(self):
"""The current page of results that has been retrieved.

If there are no more results, will return :data:`None`.
def _pages_iter(self):
"""Generator of pages of API responses.

:rtype: :class:`Page`
:returns: The page of items that has been retrieved.
:raises AttributeError: If the page has not been set.
Yields :class:`Page` instances.
"""
if self._page is _UNSET:
raise AttributeError(_UNSTARTED_ERR)
return self._page
while self._has_next_page():
response = self._get_next_page_response()
page = Page(self, response, self._items_key,
self._item_to_value)
self._page_start(self, page, response)
self.num_results += page.num_items
yield page

def __iter__(self):
"""The :class:`Iterator` is an iterator.
@property
def pages(self):
"""Iterator of pages in the response.

:rtype: :class:`Iterator`
:returns: Current instance.
:rtype: :class:`~types.GeneratorType`

This comment was marked as spam.

This comment was marked as spam.

:returns: A generator of :class:`Page` instances.
:raises ValueError: If the iterator has already been started.
"""
if self._started:
raise ValueError('Iterator has already started', self)
self._started = True
return self

def update_page(self, require_empty=True):
"""Move to the next page in the result set.

If the current page is not empty and ``require_empty`` is :data:`True`
then an exception will be raised. If the current page is not empty
and ``require_empty`` is :data:`False`, then this will return
without updating the current page.

If the current page **is** empty, but there are no more results,
sets the current page to :data:`None`.
return self._pages_iter()

def _items_iter(self):
"""Iterator for each item returned."""
for page in self._pages_iter():
# Decrement the total results since the pages iterator adds
# to it when each page is encountered.
self.num_results -= page.num_items
for item in page:
self.num_results += 1
yield item

If there are no more pages, throws an exception.

:type require_empty: bool
:param require_empty: (Optional) Flag to indicate if the current page
must be empty before updating.
def __iter__(self):
"""Iterator for each item returned.

:raises ValueError: If ``require_empty`` is :data:`True` but the
current page is not empty.
:raises ValueError: If there are no more pages.
:rtype: :class:`~types.GeneratorType`
:returns: A generator of items from the API.
:raises ValueError: If the iterator has already been started.
"""
if self._page is None:
raise ValueError(_NO_MORE_PAGES_ERR)

# NOTE: This assumes Page.remaining can never go below 0.
page_empty = self._page is _UNSET or self._page.remaining == 0
if page_empty:
if self._has_next_page():
response = self._get_next_page_response()
self._page = Page(self, response, self._items_key,
self._item_to_value)
self._page_start(self, self._page, response)
else:
self._page = None
else:
if require_empty:
msg = _PAGE_ERR_TEMPLATE % (self._page, self.page.remaining)
raise ValueError(msg)

def next(self):
"""Get the next item from the request."""
self.update_page(require_empty=False)
if self.page is None:
raise StopIteration
item = six.next(self.page)
self.num_results += 1
return item

# Alias needed for Python 2/3 support.
__next__ = next
if self._started:
raise ValueError('Iterator has already started', self)
self._started = True
return self._items_iter()

def _has_next_page(self):
"""Determines whether or not there are more pages with results.
Expand Down
Loading