Skip to content

Commit

Permalink
Add Python 3 support to happybase. Mostly, this meant tedious encodin…
Browse files Browse the repository at this point in the history
…g and decoding of strings and bytes respectively, but also meant correcting imports and, in a couple cases, explicitly doing something different for Python 2 vs. Python 3.
  • Loading branch information
Devin Anderson committed Mar 9, 2016
1 parent a1a9ac1 commit cc6790b
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 64 deletions.
18 changes: 10 additions & 8 deletions happybase/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

logger = logging.getLogger(__name__)


class Batch(object):
"""Batch mutation class.
Expand Down Expand Up @@ -45,17 +44,19 @@ def _reset_mutations(self):

def send(self):
"""Send the batch to the server."""
bms = [BatchMutation(row, m) for row, m in self._mutations.iteritems()]
bms = [BatchMutation(row.encode("utf-8"), m)
for row, m in self._mutations.items()]
if not bms:
return

logger.debug("Sending batch for '%s' (%d mutations on %d rows)",
self._table.name, self._mutation_count, len(bms))
name = self._table.name.encode("utf-8")
if self._timestamp is None:
self._table.connection.client.mutateRows(self._table.name, bms, {})
self._table.connection.client.mutateRows(name, bms, {})
else:
self._table.connection.client.mutateRowsTs(
self._table.name, bms, self._timestamp, {})
name, bms, self._timestamp, {})

self._reset_mutations()

Expand All @@ -77,10 +78,10 @@ def put(self, row, data, wal=None):
self._mutations[row].extend(
Mutation(
isDelete=False,
column=column,
value=value,
column=column.encode("utf-8"),
value=value.encode("utf-8"),
writeToWAL=wal)
for column, value in data.iteritems())
for column, value in data.items())

self._mutation_count += len(data)
if self._batch_size and self._mutation_count >= self._batch_size:
Expand All @@ -107,7 +108,8 @@ def delete(self, row, columns=None, wal=None):
wal = self._wal

self._mutations[row].extend(
Mutation(isDelete=True, column=column, writeToWAL=wal)
Mutation(
isDelete=True, column=column.encode("utf-8"), writeToWAL=wal)
for column in columns)

self._mutation_count += len(columns)
Expand Down
25 changes: 15 additions & 10 deletions happybase/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

import logging
import sys

from thrift.transport.TSocket import TSocket
from thrift.transport.TTransport import TBufferedTransport, TFramedTransport
Expand Down Expand Up @@ -33,6 +34,8 @@
DEFAULT_COMPAT = '0.96'
DEFAULT_PROTOCOL = 'binary'

if sys.version_info >= (3,):
basestring = str

class Connection(object):
"""Connection to an HBase Thrift server.
Expand Down Expand Up @@ -235,7 +238,7 @@ def tables(self):
:return: The table names
:rtype: List of strings
"""
names = self.client.getTableNames()
names = (n.decode("utf-8") for n in self.client.getTableNames())

# Filter using prefix, and strip prefix from names
if self.table_prefix is not None:
Expand Down Expand Up @@ -288,21 +291,23 @@ def create_table(self, name, families):
% name)

column_descriptors = []
for cf_name, options in families.iteritems():
for cf_name, options in families.items():
if options is None:
options = dict()

kwargs = dict()
for option_name, value in options.iteritems():
for option_name, value in options.items():
if isinstance(value, basestring):
value = value.encode("utf-8")
kwargs[pep8_to_camel_case(option_name)] = value

if not cf_name.endswith(':'):
cf_name += ':'
kwargs['name'] = cf_name
kwargs['name'] = cf_name.encode("utf-8")

column_descriptors.append(ColumnDescriptor(**kwargs))

self.client.createTable(name, column_descriptors)
self.client.createTable(name.encode("utf-8"), column_descriptors)

def delete_table(self, name, disable=False):
"""Delete the specified table.
Expand All @@ -321,23 +326,23 @@ def delete_table(self, name, disable=False):
self.disable_table(name)

name = self._table_name(name)
self.client.deleteTable(name)
self.client.deleteTable(name.encode("utf-8"))

def enable_table(self, name):
"""Enable the specified table.
:param str name: The table name
"""
name = self._table_name(name)
self.client.enableTable(name)
self.client.enableTable(name.encode("utf-8"))

def disable_table(self, name):
"""Disable the specified table.
:param str name: The table name
"""
name = self._table_name(name)
self.client.disableTable(name)
self.client.disableTable(name.encode("utf-8"))

def is_table_enabled(self, name):
"""Return whether the specified table is enabled.
Expand All @@ -348,15 +353,15 @@ def is_table_enabled(self, name):
:rtype: bool
"""
name = self._table_name(name)
return self.client.isTableEnabled(name)
return self.client.isTableEnabled(name.encode("utf-8"))

def compact_table(self, name, major=False):
"""Compact the specified table.
:param str name: The table name
:param bool major: Whether to perform a major compaction.
"""
name = self._table_name(name)
name = self._table_name(name).encode("utf-8")
if major:
self.client.majorCompact(name)
else:
Expand Down
14 changes: 11 additions & 3 deletions happybase/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,24 @@

import contextlib
import logging
import Queue
import socket
import sys
import threading

if sys.version_info >= (3,):
from queue import Empty, LifoQueue
else:
from Queue import Empty, LifoQueue

from thrift.Thrift import TException

from .connection import Connection

logger = logging.getLogger(__name__)

if sys.version_info >= (3,):
xrange = range

#
# TODO: maybe support multiple Thrift servers. What would a reasonable
# distribution look like? Round-robin? Randomize the list upon
Expand Down Expand Up @@ -61,7 +69,7 @@ def __init__(self, size, **kwargs):
"Initializing connection pool with %d connections", size)

self._lock = threading.Lock()
self._queue = Queue.LifoQueue(maxsize=size)
self._queue = LifoQueue(maxsize=size)
self._thread_connections = threading.local()

connection_kwargs = kwargs
Expand All @@ -81,7 +89,7 @@ def _acquire_connection(self, timeout=None):
"""Acquire a connection from the pool."""
try:
return self._queue.get(True, timeout)
except Queue.Empty:
except Empty:
raise NoConnectionsAvailable(
"No connection available from pool within specified "
"timeout")
Expand Down
Loading

0 comments on commit cc6790b

Please sign in to comment.