Skip to content

Commit

Permalink
Merge pull request python-happybase#1 from TAKEALOT/reverse_table_scans
Browse files Browse the repository at this point in the history
Reverse table scans
  • Loading branch information
cscheffler committed May 13, 2015
2 parents 6205177 + 2802e28 commit 898b55b
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 2 deletions.
1 change: 1 addition & 0 deletions happybase/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .connection import DEFAULT_HOST, DEFAULT_PORT, Connection
from .table import Table
from .batch import Batch
from .counter_batch import CounterBatch
from .pool import ConnectionPool, NoConnectionsAvailable

# TODO: properly handle errors defined in Thrift specification
2 changes: 1 addition & 1 deletion happybase/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
setup.py.
"""

__version__ = '0.9'
__version__ = '0.9-takealot'
51 changes: 51 additions & 0 deletions happybase/counter_batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from happybase.hbase.ttypes import TIncrement


class CounterBatch:
def __init__(self, table, batch_size=None):
self.table = table
self.batch_size = batch_size
self.batch = []

def counter_inc(self, row, column, value=1):
self.batch.append({'row': row, 'column': column, 'value': value})
self._check_send()

def counter_dec(self, row, column, value=1):
self.batch.append({'row': row, 'column': column, 'value': -value})
self._check_send()

def send(self):
increment_rows = []
for increment in self.batch:
increment_rows.append(
TIncrement(
table=self.table.name,
row=increment['row'],
column=increment['column'],
ammount=increment.get('value', 1),
)
)
self.table.connection.client.incrementRows(increment_rows)

def _check_send(self):
if self.batch_size and (len(self.batch) >= self.batch_size):
self.send()
self.batch = []

#
# Context manager methods
#

def __enter__(self):
"""Called upon entering a ``with`` block"""
return self

def __exit__(self, exc_type, exc_value, traceback):
"""Called upon exiting a ``with`` block"""
# TODO: Examine the exception and decide whether or not to send
# For now we always send
if exc_type is not None:
pass

self.send()
25 changes: 24 additions & 1 deletion happybase/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .hbase.ttypes import TScan
from .util import thrift_type_to_dict, str_increment, OrderedDict
from .batch import Batch
from .counter_batch import CounterBatch

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -215,7 +216,7 @@ def cells(self, row, column, versions=None, timestamp=None,
def scan(self, row_start=None, row_stop=None, row_prefix=None,
columns=None, filter=None, timestamp=None,
include_timestamp=False, batch_size=1000, scan_batching=None,
limit=None, sorted_columns=False):
limit=None, sorted_columns=False, reversed=False):
"""Create a scanner for data in the table.
This method returns an iterable that can be used for looping over the
Expand Down Expand Up @@ -270,6 +271,9 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None,
* The `sorted_columns` argument is only available when using
HBase 0.96 (or up).
* The `reversed` option is only available when using HBase 0.98
(or up).
.. versionadded:: 0.8
`sorted_columns` argument
Expand All @@ -287,6 +291,7 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None,
:param bool scan_batching: server-side scan batching (optional)
:param int limit: max number of rows to return
:param bool sorted_columns: whether to return sorted columns
:param bool reversed: whether or not to reverse the row ordering
:return: generator yielding the rows matching the scan
:rtype: iterable of `(row_key, row_data)` tuples
Expand Down Expand Up @@ -369,6 +374,7 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None,
filterString=filter,
batchSize=scan_batching,
sortColumns=sorted_columns,
reversed=reversed,
)
scan_id = self.connection.client.scannerOpenWithScan(
self.name, scan, {})
Expand Down Expand Up @@ -497,6 +503,23 @@ def batch(self, timestamp=None, batch_size=None, transaction=False,
del kwargs['self']
return Batch(table=self, **kwargs)

def counter_batch(self, batch_size=None):
"""Create a new batch of counter operation for this table.
This method returns a new :py:class:`CounterBatch` instance that can be used
for mass counter manipulation.
If given, the `batch_size` argument specifies the maximum batch size
after which the batch should send the mutations to the server. By
default this is unbounded.
:param int batch_size: batch size (optional)
:return: CounterBatch instance
:rtype: :py:class:`CounterBatch`
"""
return CounterBatch(table=self, batch_size=batch_size)

#
# Atomic counters
#
Expand Down

0 comments on commit 898b55b

Please sign in to comment.