Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBOR serializer (using cbor2 package) #191

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ Features
- Multiple platforms support: Linux, macOS, Windows
- Pure python
- Both filed based queues and sqlite3 based queues are supported
- Filed based queue: multiple serialization protocol support: pickle(default), msgpack, json
- Filed based queue: multiple serialization protocol support: pickle(default), msgpack, cbor, json

Deprecation
-----------
Expand All @@ -69,7 +69,7 @@ from pypi
.. code-block:: console

pip install persist-queue
# for msgpack and mysql support, use following command
# for msgpack, cbor and mysql support, use following command
pip install persist-queue[extra]


Expand All @@ -80,7 +80,7 @@ from source code

git clone https://github.com/peter-wangxu/persist-queue
cd persist-queue
# for msgpack support, run 'pip install -r extra-requirements.txt' first
# for msgpack and cbor support, run 'pip install -r extra-requirements.txt' first
python setup.py install


Expand Down Expand Up @@ -489,15 +489,17 @@ Due to the limitation of file queue described in issue `#89 <https://github.com/
`task_done` in one thread may acknowledge items in other threads which should not be. Considering the `SQLiteAckQueue` if you have such requirement.


Serialization via msgpack/json
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Serialization via msgpack/cbor/json
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- v0.4.1: Currently only available for file based Queue
- v0.4.2: Also available for SQLite3 based Queues

.. code-block:: python

>>> from persistqueue
>>> q = persistqueue.Queue('mypath', serializer=persistqueue.serializers.msgpack)
>>> # via cbor2
>>> # q = persistqueue.Queue('mypath', serializer=persistqueue.serializers.cbor2)
>>> # via json
>>> # q = Queue('mypath', serializer=persistqueue.serializers.json)
>>> q.get()
Expand Down Expand Up @@ -611,4 +613,3 @@ when creating the queue if above error occurs.

The sqlite3 queues are heavily tested under multi-threading environment, if you find it's not thread-safe, please
make sure you set the ``multithreading=True`` when initializing the queue before submitting new issue:).

3 changes: 2 additions & 1 deletion extra-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
msgpack>=0.5.6
cbor2>=5.2.0
PyMySQL
DBUtils<3.0.0 # since 3.0.0 no longer supports Python2.x
DBUtils<3.0.0 # since 3.0.0 no longer supports Python2.x
8 changes: 8 additions & 0 deletions persist-queue.code-workspace
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"folders": [
{
"path": "."
}
],
"settings": {}
}
45 changes: 45 additions & 0 deletions persistqueue/serializers/cbor2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# coding=utf-8

"""
A serializer that extends cbor2 to specify recommended parameters and adds a
4 byte length prefix to store multiple objects per file.
"""

from __future__ import absolute_import
try:
import cbor2
except ImportError:
pass

from struct import Struct


length_struct = Struct("<L")


def dump(value, fp, sort_keys=False):
"Serialize value as cbor2 to a byte-mode file object"
if sort_keys and isinstance(value, dict):
value = {key: value[key] for key in sorted(value)}
packed = cbor2.dumps(value)
length = length_struct.pack(len(packed))
fp.write(length)
fp.write(packed)


def dumps(value, sort_keys=False):
"Serialize value as cbor2 to bytes"
if sort_keys and isinstance(value, dict):
value = {key: value[key] for key in sorted(value)}
return cbor2.dumps(value)


def load(fp):
"Deserialize one cbor2 value from a byte-mode file object"
length = length_struct.unpack(fp.read(4))[0]
return cbor2.loads(fp.read(length))


def loads(bytes_value):
"Deserialize one cbor2 value from bytes"
return cbor2.loads(bytes_value)
2 changes: 2 additions & 0 deletions persistqueue/tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from persistqueue.serializers import json as serializers_json
from persistqueue.serializers import pickle as serializers_pickle
from persistqueue.serializers import msgpack as serializers_msgpack
from persistqueue.serializers import cbor2 as serializers_cbor2

from persistqueue import Queue, Empty, Full

Expand All @@ -22,6 +23,7 @@
"serializer=default": {},
"serializer=json": {"serializer": serializers_json},
"serializer=msgpack": {"serializer": serializers_msgpack},
"serializer=cbor2": {"serializer": serializers_cbor2},
"serializer=pickle": {"serializer": serializers_pickle},
}

Expand Down
13 changes: 13 additions & 0 deletions persistqueue/tests/test_sqlqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from persistqueue.serializers import json as serializers_json
from persistqueue.serializers import pickle as serializers_pickle
from persistqueue.serializers import msgpack as serializers_msgpack
from persistqueue.serializers import cbor2 as serializers_cbor2


class SQLite3QueueTest(unittest.TestCase):
Expand Down Expand Up @@ -454,6 +455,18 @@ def test_unique_dictionary_serialization_msgpack(self):
queue.put({"bar": 2, "foo": 1})
self.assertEqual(queue.total, 1)

def test_unique_dictionary_serialization_cbor2(self):
queue = UniqueQ(
path=self.path,
multithreading=True,
auto_commit=self.auto_commit,
serializer=serializers_cbor2
)
queue.put({"foo": 1, "bar": 2})
self.assertEqual(queue.total, 1)
queue.put({"bar": 2, "foo": 1})
self.assertEqual(queue.total, 1)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to verify the content in the queue as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of the other tests (pickle, msgpack, json) verify the content in the queue. I presume this is because this particular test is testing that duplicate items are only queued once for UniqueQ. I also presume that verifiying the content is handled in test_queue.py

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, we have more test to cover serialization scenario for file based queues, maybe we can enhance the coverage next time for sqlite base queues


def test_unique_dictionary_serialization_json(self):
queue = UniqueQ(
path=self.path,
Expand Down
3 changes: 2 additions & 1 deletion test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ mock>=2.0.0
flake8>=3.2.1
eventlet>=0.19.0
msgpack>=0.5.6
cbor2>=5.2.0
nose2>=0.6.5
coverage!=4.5
cov_core>=1.15.0
virtualenv>=15.1.0
cryptography;sys_platform!="win32" # package only required for tests under mysql8.0&linux
cryptography;sys_platform!="win32" # package only required for tests under mysql8.0&linux