Skip to content

Commit

Permalink
API redesign: enable custom serializers & deserializers (relates to #35)
Browse files Browse the repository at this point in the history
  • Loading branch information
maciejlach committed Jan 13, 2016
1 parent b32911b commit 980089a
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 35 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
------------------------------------------------------------------------------
qPython 1.2.0 [2016.01.*]
------------------------------------------------------------------------------

- API redesign: enable custom serializers & deserializers

------------------------------------------------------------------------------
qPython 1.1.0 [2015.11.25]
------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion doc/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __getattr__(cls, name):

# General information about the project.
project = u'qPython'
copyright = u'2014-2015, DEVnet'
copyright = u'2014-2016, DEVnet'

# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
Expand Down
44 changes: 42 additions & 2 deletions doc/source/connection.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ qPython wraps connection to a q process in instances of the
q.close()

.. note:: the connection is not established when the connector instance is
created. The connection is initialized explicitly by calling the
created. The connection is initialised explicitly by calling the
:meth:`~qpython.qconnection.QConnection.open` method.


Expand Down Expand Up @@ -48,4 +48,44 @@ to numpy `datetime64`/`timedelta64` representation.
Conversion options can be also overwritten while executing
synchronous/asynchronous queries (:meth:`~qpython.qconnection.QConnection.sync`,
:meth:`~qpython.qconnection.QConnection.async`) or retrieving data from q
(:meth:`~qpython.qconnection.QConnection.receive`).
(:meth:`~qpython.qconnection.QConnection.receive`).


Custom IPC protocol serializers/deserializers
*********************************************

Default IPC serializers (`.QWriter` and `.PandasQWriter`) and deserializers
(`.QReader` and `.PandasQReader`) can be replaced with custom implementations.
This allow users to override the default mapping between the q types and Python
representation.
::

q = qconnection.QConnection(host = 'localhost', port = 5000, writer_class = MyQWriter, reader_class = MyQReader)


::

class MyQReader(QReader):
# QReader and QWriter use decorators to map data types and corresponding function handlers
parse = Mapper(QReader._reader_map)
def _read_list(self, qtype):
if qtype == QSYMBOL_LIST:
self._buffer.skip()
length = self._buffer.get_int()
symbols = self._buffer.get_symbols(length)
return [s.decode(self._encoding) for s in symbols]
else:
return QReader._read_list(self, qtype = qtype)
@parse(QSYMBOL)
def _read_symbol(self, qtype = QSYMBOL):
return numpy.string_(self._buffer.get_symbol()).decode(self._encoding)
with qconnection.QConnection(host='localhost', port=5000, reader_class = MyQReader) as q:
symbols = q.sync('`foo`bar')
print(symbols, type(symbols), type(symbols[0]))
symbol = q.sync('`foo')
print(symbol, type(symbol))
2 changes: 1 addition & 1 deletion qpython/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
__all__ = ['qconnection', 'qtype', 'qtemporal', 'qcollection']


__version__ = '1.1.0'
__version__ = '1.2.0b1'



Expand Down
24 changes: 21 additions & 3 deletions qpython/qconnection.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ class QConnection(object):
- `username` (`string` or `None`) - username for q authentication/authorization
- `password` (`string` or `None`) - password for q authentication/authorization
- `timeout` (`nonnegative float` or `None`) - set a timeout on blocking socket operations
- `encoding` (`string`) - string encoding for data deserialization
- `reader_class` (subclass of `QReader`) - data deserializer
- `writer_class` (subclass of `QWriter`) - data serializer
:Options:
- `raw` (`boolean`) - if ``True`` returns raw data chunk instead of parsed
data, **Default**: ``False``
Expand All @@ -74,7 +77,8 @@ class QConnection(object):
strings are encoded as q strings instead of chars, **Default**: ``False``
'''

def __init__(self, host, port, username = None, password = None, timeout = None, encoding = 'latin-1', **options):

def __init__(self, host, port, username = None, password = None, timeout = None, encoding = 'latin-1', reader_class = None, writer_class = None, **options):
self.host = host
self.port = port
self.username = username
Expand All @@ -89,6 +93,20 @@ def __init__(self, host, port, username = None, password = None, timeout = None,

self._options = MetaData(**CONVERSION_OPTIONS.union_dict(**options))

try:
from qpython._pandas import PandasQReader, PandasQWriter
self._reader_class = PandasQReader
self._writer_class = PandasQWriter
except ImportError:
self._reader_class = QReader
self._writer_class = QWriter

if reader_class:
self._reader_class = reader_class

if writer_class:
self._writer_class = writer_class


def __enter__(self):
self.open()
Expand Down Expand Up @@ -124,8 +142,8 @@ def open(self):
self._init_socket()
self._initialize()

self._writer = QWriter(self._connection, protocol_version = self._protocol_version)
self._reader = QReader(self._connection.makefile('b'))
self._writer = self._writer_class(self._connection, protocol_version = self._protocol_version)
self._reader = self._reader_class(self._connection.makefile('b'))


def _init_socket(self):
Expand Down
15 changes: 2 additions & 13 deletions qpython/qreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,24 +99,13 @@ class QReader(object):
:Parameters:
- `stream` (`file object` or `None`) - data input stream
- `encoding` (`string`) - encoding for characters parsing
'''

_reader_map = {}
parse = Mapper(_reader_map)


def __new__(cls, *args, **kwargs):
if cls is QReader:
# try to load optional pandas binding
try:
from qpython._pandas import PandasQReader
return super(QReader, cls).__new__(PandasQReader)
except ImportError:
return super(QReader, cls).__new__(QReader)
else:
return super(QReader, cls).__new__(cls)


def __init__(self, stream, encoding = 'latin-1'):
self._stream = stream
self._buffer = QReader.BytesBuffer()
Expand Down Expand Up @@ -257,7 +246,7 @@ def _read_symbol(self, qtype = QSYMBOL):

@parse(QCHAR)
def _read_char(self, qtype = QCHAR):
return chr(self._read_atom(QCHAR)).encode(self._encoding)
return chr(self._read_atom(QCHAR)).encode(self._encoding)


@parse(QGUID)
Expand Down
12 changes: 0 additions & 12 deletions qpython/qwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,6 @@ class QWriter(object):
serialize = Mapper(_writer_map)


def __new__(cls, *args, **kwargs):
if cls is QWriter:
# try to load optional pandas binding
try:
from qpython._pandas import PandasQWriter
return super(QWriter, cls).__new__(PandasQWriter)
except ImportError:
return super(QWriter, cls).__new__(QWriter)
else:
return super(QWriter, cls).__new__(cls)


def __init__(self, stream, protocol_version):
self._stream = stream
self._protocol_version = protocol_version
Expand Down
7 changes: 4 additions & 3 deletions tests/pandas_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
from io import BytesIO

from collections import OrderedDict
from qpython import qreader, MetaData, qwriter
from qpython import MetaData
from qpython._pandas import PandasQReader, PandasQWriter
from qpython.qtype import * # @UnusedWildImport
from qpython.qcollection import qlist, QList, QTemporalList, QDictionary
from qpython.qtemporal import QTemporal
Expand Down Expand Up @@ -250,7 +251,7 @@ def test_reading_pandas():
sys.stdout.write(' %-75s' % query)
try:
buffer_.seek(0)
stream_reader = qreader.QReader(buffer_)
stream_reader = PandasQReader(buffer_)
result = stream_reader.read(pandas = True).data
if isinstance(value, dict):
if 'index' in value:
Expand All @@ -271,7 +272,7 @@ def test_reading_pandas():


def test_writing_pandas():
w = qwriter.QWriter(None, 3)
w = PandasQWriter(None, 3)

for query, value in iter(PANDAS_EXPRESSIONS.items()):
sys.stdout.write( '%-75s' % query )
Expand Down

0 comments on commit 980089a

Please sign in to comment.