Skip to content

Commit

Permalink
Merge pull request #2594 from dhermes/separate-pages-iter
Browse files Browse the repository at this point in the history
Implement Iterator.pages and simplify items iteration
  • Loading branch information
dhermes committed Oct 25, 2016
2 parents 43c017e + 9a504c6 commit 090c5d5
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 235 deletions.
37 changes: 23 additions & 14 deletions bigquery/unit_tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def test_ctor(self):
self.assertIs(client.connection.http, http)

def test_list_projects_defaults(self):
import six
from google.cloud.bigquery.client import Project
PROJECT_1 = 'PROJECT_ONE'
PROJECT_2 = 'PROJECT_TWO'
Expand All @@ -60,8 +61,8 @@ def test_list_projects_defaults(self):
conn = client.connection = _Connection(DATA)

iterator = client.list_projects()
iterator.update_page()
projects = list(iterator.page)
page = six.next(iterator.pages)
projects = list(page)
token = iterator.next_page_token

self.assertEqual(len(projects), len(DATA['projects']))
Expand All @@ -78,6 +79,8 @@ def test_list_projects_defaults(self):
self.assertEqual(req['path'], '/%s' % PATH)

def test_list_projects_explicit_response_missing_projects_key(self):
import six

PROJECT = 'PROJECT'
PATH = 'projects'
TOKEN = 'TOKEN'
Expand All @@ -87,8 +90,8 @@ def test_list_projects_explicit_response_missing_projects_key(self):
conn = client.connection = _Connection(DATA)

iterator = client.list_projects(max_results=3, page_token=TOKEN)
iterator.update_page()
projects = list(iterator.page)
page = six.next(iterator.pages)
projects = list(page)
token = iterator.next_page_token

self.assertEqual(len(projects), 0)
Expand All @@ -102,6 +105,7 @@ def test_list_projects_explicit_response_missing_projects_key(self):
{'maxResults': 3, 'pageToken': TOKEN})

def test_list_datasets_defaults(self):
import six
from google.cloud.bigquery.dataset import Dataset
PROJECT = 'PROJECT'
DATASET_1 = 'dataset_one'
Expand All @@ -128,8 +132,8 @@ def test_list_datasets_defaults(self):
conn = client.connection = _Connection(DATA)

iterator = client.list_datasets()
iterator.update_page()
datasets = list(iterator.page)
page = six.next(iterator.pages)
datasets = list(page)
token = iterator.next_page_token

self.assertEqual(len(datasets), len(DATA['datasets']))
Expand All @@ -145,6 +149,8 @@ def test_list_datasets_defaults(self):
self.assertEqual(req['path'], '/%s' % PATH)

def test_list_datasets_explicit_response_missing_datasets_key(self):
import six

PROJECT = 'PROJECT'
PATH = 'projects/%s/datasets' % PROJECT
TOKEN = 'TOKEN'
Expand All @@ -155,8 +161,8 @@ def test_list_datasets_explicit_response_missing_datasets_key(self):

iterator = client.list_datasets(
include_all=True, max_results=3, page_token=TOKEN)
iterator.update_page()
datasets = list(iterator.page)
page = six.next(iterator.pages)
datasets = list(page)
token = iterator.next_page_token

self.assertEqual(len(datasets), 0)
Expand Down Expand Up @@ -189,6 +195,7 @@ def test_job_from_resource_unknown_type(self):
client.job_from_resource({'configuration': {'nonesuch': {}}})

def test_list_jobs_defaults(self):
import six
from google.cloud.bigquery.job import LoadTableFromStorageJob
from google.cloud.bigquery.job import CopyJob
from google.cloud.bigquery.job import ExtractTableToStorageJob
Expand Down Expand Up @@ -301,8 +308,8 @@ def test_list_jobs_defaults(self):
conn = client.connection = _Connection(DATA)

iterator = client.list_jobs()
iterator.update_page()
jobs = list(iterator.page)
page = six.next(iterator.pages)
jobs = list(page)
token = iterator.next_page_token

self.assertEqual(len(jobs), len(DATA['jobs']))
Expand All @@ -319,6 +326,7 @@ def test_list_jobs_defaults(self):
self.assertEqual(req['query_params'], {'projection': 'full'})

def test_list_jobs_load_job_wo_sourceUris(self):
import six
from google.cloud.bigquery.job import LoadTableFromStorageJob
PROJECT = 'PROJECT'
DATASET = 'test_dataset'
Expand Down Expand Up @@ -356,8 +364,8 @@ def test_list_jobs_load_job_wo_sourceUris(self):
conn = client.connection = _Connection(DATA)

iterator = client.list_jobs()
iterator.update_page()
jobs = list(iterator.page)
page = six.next(iterator.pages)
jobs = list(page)
token = iterator.next_page_token

self.assertEqual(len(jobs), len(DATA['jobs']))
Expand All @@ -374,6 +382,7 @@ def test_list_jobs_load_job_wo_sourceUris(self):
self.assertEqual(req['query_params'], {'projection': 'full'})

def test_list_jobs_explicit_missing(self):
import six
PROJECT = 'PROJECT'
PATH = 'projects/%s/jobs' % PROJECT
DATA = {}
Expand All @@ -384,8 +393,8 @@ def test_list_jobs_explicit_missing(self):

iterator = client.list_jobs(max_results=1000, page_token=TOKEN,
all_users=True, state_filter='done')
iterator.update_page()
jobs = list(iterator.page)
page = six.next(iterator.pages)
jobs = list(page)
token = iterator.next_page_token

self.assertEqual(len(jobs), 0)
Expand Down
184 changes: 60 additions & 124 deletions core/google/cloud/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,78 +43,44 @@
... break
When iterating, not every new item will send a request to the server.
To monitor these requests, track the current page of the iterator::
To iterate based on each page of items (where a page corresponds to
a request)::
>>> iterator = Iterator(...)
>>> iterator.page_number
0
>>> next(iterator)
<MyItemClass at 0x7f1d3cccf690>
>>> iterator.page_number
1
>>> iterator.page.remaining
1
>>> next(iterator)
<MyItemClass at 0x7f1d3cccfe90>
>>> iterator.page_number
1
>>> iterator.page.remaining
0
>>> next(iterator)
<MyItemClass at 0x7f1d3cccffd0>
>>> iterator.page_number
2
>>> iterator.page.remaining
19
It's also possible to consume an entire page and handle the paging process
manually::
>>> iterator = Iterator(...)
>>> # Manually pull down the first page.
>>> iterator.update_page()
>>> items = list(iterator.page)
>>> items
>>> for page in iterator.pages:
... print('=' * 20)
... print(' Page number: %d' % (iterator.page_number,))
... print(' Items in page: %d' % (page.num_items,))
... print(' First item: %r' % (next(page),))
... print('Items remaining: %d' % (page.remaining,))
... print('Next page token: %s' % (iterator.next_page_token,))
====================
Page number: 1
Items in page: 1
First item: <MyItemClass at 0x7f1d3cccf690>
Items remaining: 0
Next page token: eav1OzQB0OM8rLdGXOEsyQWSG
====================
Page number: 2
Items in page: 19
First item: <MyItemClass at 0x7f1d3cccffd0>
Items remaining: 18
Next page token: None
To consume an entire page::
>>> list(page)
[
<MyItemClass at 0x7fd64a098ad0>,
<MyItemClass at 0x7fd64a098ed0>,
<MyItemClass at 0x7fd64a098e90>,
]
>>> iterator.page.remaining
0
>>> iterator.page.num_items
3
>>> iterator.next_page_token
'eav1OzQB0OM8rLdGXOEsyQWSG'
>>>
>>> # Ask for the next page to be grabbed.
>>> iterator.update_page()
>>> list(iterator.page)
[
<MyItemClass at 0x7fea740abdd0>,
<MyItemClass at 0x7fea740abe50>,
]
>>>
>>> # When there are no more results
>>> iterator.next_page_token is None
True
>>> iterator.update_page()
>>> iterator.page is None
True
"""


import six


_UNSET = object()
_NO_MORE_PAGES_ERR = 'Iterator has no more pages.'
_UNSTARTED_ERR = (
'Iterator has not been started. Either begin iterating, '
'call next(my_iter) or call my_iter.update_page().')
_PAGE_ERR_TEMPLATE = (
'Tried to update the page while current page (%r) still has %d '
'items remaining.')
DEFAULT_ITEMS_KEY = 'items'
"""The dictionary key used to retrieve items from each response."""

Expand Down Expand Up @@ -261,7 +227,6 @@ def __init__(self, client, path, item_to_value,
self.page_number = 0
self.next_page_token = page_token
self.num_results = 0
self._page = _UNSET

def _verify_params(self):
"""Verifies the parameters don't use any reserved parameter.
Expand All @@ -274,82 +239,53 @@ def _verify_params(self):
raise ValueError('Using a reserved parameter',
reserved_in_use)

@property
def page(self):
"""The current page of results that has been retrieved.
If there are no more results, will return :data:`None`.
def _pages_iter(self):
"""Generator of pages of API responses.
:rtype: :class:`Page`
:returns: The page of items that has been retrieved.
:raises AttributeError: If the page has not been set.
Yields :class:`Page` instances.
"""
if self._page is _UNSET:
raise AttributeError(_UNSTARTED_ERR)
return self._page
while self._has_next_page():
response = self._get_next_page_response()
page = Page(self, response, self._items_key,
self._item_to_value)
self._page_start(self, page, response)
self.num_results += page.num_items
yield page

def __iter__(self):
"""The :class:`Iterator` is an iterator.
@property
def pages(self):
"""Iterator of pages in the response.
:rtype: :class:`Iterator`
:returns: Current instance.
:rtype: :class:`~types.GeneratorType`
:returns: A generator of :class:`Page` instances.
:raises ValueError: If the iterator has already been started.
"""
if self._started:
raise ValueError('Iterator has already started', self)
self._started = True
return self

def update_page(self, require_empty=True):
"""Move to the next page in the result set.
If the current page is not empty and ``require_empty`` is :data:`True`
then an exception will be raised. If the current page is not empty
and ``require_empty`` is :data:`False`, then this will return
without updating the current page.
If the current page **is** empty, but there are no more results,
sets the current page to :data:`None`.
return self._pages_iter()

def _items_iter(self):
"""Iterator for each item returned."""
for page in self._pages_iter():
# Decrement the total results since the pages iterator adds
# to it when each page is encountered.
self.num_results -= page.num_items
for item in page:
self.num_results += 1
yield item

If there are no more pages, throws an exception.
:type require_empty: bool
:param require_empty: (Optional) Flag to indicate if the current page
must be empty before updating.
def __iter__(self):
"""Iterator for each item returned.
:raises ValueError: If ``require_empty`` is :data:`True` but the
current page is not empty.
:raises ValueError: If there are no more pages.
:rtype: :class:`~types.GeneratorType`
:returns: A generator of items from the API.
:raises ValueError: If the iterator has already been started.
"""
if self._page is None:
raise ValueError(_NO_MORE_PAGES_ERR)

# NOTE: This assumes Page.remaining can never go below 0.
page_empty = self._page is _UNSET or self._page.remaining == 0
if page_empty:
if self._has_next_page():
response = self._get_next_page_response()
self._page = Page(self, response, self._items_key,
self._item_to_value)
self._page_start(self, self._page, response)
else:
self._page = None
else:
if require_empty:
msg = _PAGE_ERR_TEMPLATE % (self._page, self.page.remaining)
raise ValueError(msg)

def next(self):
"""Get the next item from the request."""
self.update_page(require_empty=False)
if self.page is None:
raise StopIteration
item = six.next(self.page)
self.num_results += 1
return item

# Alias needed for Python 2/3 support.
__next__ = next
if self._started:
raise ValueError('Iterator has already started', self)
self._started = True
return self._items_iter()

def _has_next_page(self):
"""Determines whether or not there are more pages with results.
Expand Down
Loading

0 comments on commit 090c5d5

Please sign in to comment.