Skip to content

Commit

Permalink
Merge pull request #100 from danni-m/master
Browse files Browse the repository at this point in the history
Module's data type support
  • Loading branch information
oranagra authored Jul 26, 2017
2 parents fe322ed + 6e4dba6 commit bd08e80
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 29 deletions.
39 changes: 39 additions & 0 deletions rdbtools/iowrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@

class IOWrapper(object):
def __init__(self, io_object):
self.io_object = io_object
self.record_buffer = False
self.record_buffer_size = False
self.bytes = bytes()
self.buffer_size = 0

def start_recording(self):
self.record_buffer = True

def start_recording_size(self):
self.record_buffer_size = True

def get_recorded_buffer(self):
return self.bytes

def get_recorded_size(self):
return self.buffer_size

def stop_recording(self):
self.record_buffer = False
self.bytes = bytes()

def stop_recording_size(self):
self.record_buffer_size = True
self.buffer_size = 0

def read(self, n_bytes):
current_bytes = self.io_object.read(n_bytes)

if self.record_buffer:
self.bytes += current_bytes

if self.record_buffer_size:
self.buffer_size += len(current_bytes)

return current_bytes
17 changes: 15 additions & 2 deletions rdbtools/memprofiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def __init__(self, stream, architecture, redis_version='3.2', string_escape=None
self._pointer_size = 4
self._long_size = 4
self._architecture = 32

def emit_record(self, record_type, key, byte_count, encoding, size, largest_el):
if key is not None:
key = bytes_to_unicode(key, self._escape, skip_printable=True)
Expand Down Expand Up @@ -298,7 +298,19 @@ def end_list(self, key, info):
self.emit_record("list", key, self._current_size, self._current_encoding, self._current_length,
self._len_largest_element)
self.end_key()


def start_module(self, key, module_id, expiry):
self._current_encoding = module_id
self._current_size = self.top_level_object_overhead(key, expiry)
self._current_size += 8 + 1 # add the module id length and EOF byte

return False # don't build the full key buffer

def end_module(self, key, buffer_size, buffer=None):
size = self._current_size + buffer_size
self.emit_record("module", key, size, self._current_encoding, 1, size)
self.end_key()

def start_sorted_set(self, key, length, expiry, info):
self._current_length = length
self._current_encoding = info['encoding']
Expand Down Expand Up @@ -479,6 +491,7 @@ def zset_random_level(self):

MAXINT = 2**63 - 1


def element_length(element):
if isinstance(element, int):
if element < - MAXINT - 1 or element > MAXINT:
Expand Down
128 changes: 103 additions & 25 deletions rdbtools/parser.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import struct
import io
import sys
import datetime
import re

from rdbtools.encodehelpers import STRING_ESCAPE_RAW, apply_escape_bytes, bval
from .compat import range, str2regexp
from .iowrapper import IOWrapper

try:
try:
Expand Down Expand Up @@ -41,6 +41,7 @@
REDIS_RDB_TYPE_HASH = 4
REDIS_RDB_TYPE_ZSET_2 = 5 # ZSET version 2 with doubles stored in binary.
REDIS_RDB_TYPE_MODULE = 6
REDIS_RDB_TYPE_MODULE_2 = 7
REDIS_RDB_TYPE_HASH_ZIPMAP = 9
REDIS_RDB_TYPE_LIST_ZIPLIST = 10
REDIS_RDB_TYPE_SET_INTSET = 11
Expand All @@ -53,8 +54,15 @@
REDIS_RDB_ENC_INT32 = 2
REDIS_RDB_ENC_LZF = 3

REDIS_RDB_MODULE_OPCODE_EOF = 0 # End of module value.
REDIS_RDB_MODULE_OPCODE_SINT = 1
REDIS_RDB_MODULE_OPCODE_UINT = 2
REDIS_RDB_MODULE_OPCODE_FLOAT = 3
REDIS_RDB_MODULE_OPCODE_DOUBLE = 4
REDIS_RDB_MODULE_OPCODE_STRING = 5

DATA_TYPE_MAPPING = {
0 : "string", 1 : "list", 2 : "set", 3 : "sortedset", 4 : "hash", 5 : "sortedset", 6 : "module",
0 : "string", 1 : "list", 2 : "set", 3 : "sortedset", 4 : "hash", 5 : "sortedset", 6 : "module", 7: "module",
9 : "hash", 10 : "list", 11 : "set", 12 : "sortedset", 13 : "hash", 14 : "list"}

class RdbCallback(object):
Expand Down Expand Up @@ -106,7 +114,23 @@ def start_database(self, db_number):
Typically, callbacks store the current database number in a class variable
"""
"""
pass

def start_module(self, key, module_name, expiry):
"""
Called to indicate start of a module key
:param key: string
:param module_name: string
:param expiry:
:return: boolean to indicate whatever to record the full buffer or not
"""
return False

def handle_module_data(self, key, opcode, data):
pass

def end_module(self, key, buffer_size, buffer=None):
pass

def db_size(self, db_size, expires_size):
Expand Down Expand Up @@ -366,14 +390,14 @@ def parse_fd(self, fd):
self._callback.db_size(db_size, expire_size)
continue

if data_type == REDIS_RDB_OPCODE_EOF :
if data_type == REDIS_RDB_OPCODE_EOF:
self._callback.end_database(db_number)
self._callback.end_rdb()
if self._rdb_version >= 5:
f.read(8)
break

if self.matches_filter(db_number) :
if self.matches_filter(db_number):
self._key = self.read_string(f)
if self.matches_filter(db_number, self._key, data_type):
self.read_object(f, data_type)
Expand All @@ -382,20 +406,20 @@ def parse_fd(self, fd):
else :
self.skip_key_and_object(f, data_type)

def read_length_with_encoding(self, f) :
def read_length_with_encoding(self, f):
length = 0
is_encoded = False
bytes = []
bytes.append(read_unsigned_char(f))
enc_type = (bytes[0] & 0xC0) >> 6
if enc_type == REDIS_RDB_ENCVAL :
if enc_type == REDIS_RDB_ENCVAL:
is_encoded = True
length = bytes[0] & 0x3F
elif enc_type == REDIS_RDB_6BITLEN :
elif enc_type == REDIS_RDB_6BITLEN:
length = bytes[0] & 0x3F
elif enc_type == REDIS_RDB_14BITLEN :
elif enc_type == REDIS_RDB_14BITLEN:
bytes.append(read_unsigned_char(f))
length = ((bytes[0]&0x3F)<<8)|bytes[1]
length = ((bytes[0] & 0x3F) << 8) | bytes[1]
elif bytes[0] == REDIS_RDB_32BITLEN:
length = read_unsigned_int_be(f)
elif bytes[0] == REDIS_RDB_64BITLEN:
Expand Down Expand Up @@ -460,47 +484,49 @@ def read_object(self, f, enc_type) :
val = self.read_string(f)
self._callback.rpush(self._key, val)
self._callback.end_list(self._key, info={'encoding':'linkedlist' })
elif enc_type == REDIS_RDB_TYPE_SET :
elif enc_type == REDIS_RDB_TYPE_SET:
# A redis list is just a sequence of strings
# We successively read strings from the stream and create a set from it
# Note that the order of strings is non-deterministic
length = self.read_length(f)
self._callback.start_set(self._key, length, self._expiry, info={'encoding':'hashtable'})
for count in range(0, length) :
for count in range(0, length):
val = self.read_string(f)
self._callback.sadd(self._key, val)
self._callback.end_set(self._key)
elif enc_type == REDIS_RDB_TYPE_ZSET or enc_type == REDIS_RDB_TYPE_ZSET_2 :
length = self.read_length(f)
self._callback.start_sorted_set(self._key, length, self._expiry, info={'encoding':'skiplist'})
for count in range(0, length) :
for count in range(0, length):
val = self.read_string(f)
score = read_double(f) if enc_type == REDIS_RDB_TYPE_ZSET_2 else self.read_float(f)
self._callback.zadd(self._key, score, val)
self._callback.end_sorted_set(self._key)
elif enc_type == REDIS_RDB_TYPE_HASH :
elif enc_type == REDIS_RDB_TYPE_HASH:
length = self.read_length(f)
self._callback.start_hash(self._key, length, self._expiry, info={'encoding':'hashtable'})
for count in range(0, length) :
for count in range(0, length):
field = self.read_string(f)
value = self.read_string(f)
self._callback.hset(self._key, field, value)
self._callback.end_hash(self._key)
elif enc_type == REDIS_RDB_TYPE_HASH_ZIPMAP :
elif enc_type == REDIS_RDB_TYPE_HASH_ZIPMAP:
self.read_zipmap(f)
elif enc_type == REDIS_RDB_TYPE_LIST_ZIPLIST :
elif enc_type == REDIS_RDB_TYPE_LIST_ZIPLIST:
self.read_ziplist(f)
elif enc_type == REDIS_RDB_TYPE_SET_INTSET :
elif enc_type == REDIS_RDB_TYPE_SET_INTSET:
self.read_intset(f)
elif enc_type == REDIS_RDB_TYPE_ZSET_ZIPLIST :
elif enc_type == REDIS_RDB_TYPE_ZSET_ZIPLIST:
self.read_zset_from_ziplist(f)
elif enc_type == REDIS_RDB_TYPE_HASH_ZIPLIST :
elif enc_type == REDIS_RDB_TYPE_HASH_ZIPLIST:
self.read_hash_from_ziplist(f)
elif enc_type == REDIS_RDB_TYPE_LIST_QUICKLIST:
self.read_list_from_quicklist(f)
elif enc_type == REDIS_RDB_TYPE_MODULE :
raise Exception('read_object', 'Unable to read Redis Modules RDB objects (key %s)' % (enc_type, self._key))
else :
elif enc_type == REDIS_RDB_TYPE_MODULE:
raise Exception('read_object', 'Unable to read Redis Modules RDB objects (key %s)' % self._key)
elif enc_type == REDIS_RDB_TYPE_MODULE_2:
self.read_module(f)
else:
raise Exception('read_object', 'Invalid object type %d for key %s' % (enc_type, self._key))

def skip_key_and_object(self, f, data_type):
Expand Down Expand Up @@ -564,8 +590,10 @@ def skip_object(self, f, enc_type):
elif enc_type == REDIS_RDB_TYPE_LIST_QUICKLIST:
skip_strings = self.read_length(f)
elif enc_type == REDIS_RDB_TYPE_MODULE:
raise Exception('skip_object', 'Unable to skip Redis Modules RDB objects (key %s)' % (enc_type, self._key))
else :
raise Exception('skip_object', 'Unable to skip Redis Modules RDB objects (key %s)' % self._key)
elif enc_type == REDIS_RDB_TYPE_MODULE_2:
self.read_module(f)
else:
raise Exception('skip_object', 'Invalid object type %d for key %s' % (enc_type, self._key))
for x in range(0, skip_strings):
self.skip_string(f)
Expand Down Expand Up @@ -728,6 +756,56 @@ def read_zipmap_next_length(self, f) :
else:
return None

def read_module(self, f):
# this method is based on the actual implementation in redis (src/rdb.c:rdbLoadObject)
iowrapper = IOWrapper(f)
iowrapper.start_recording_size()
iowrapper.start_recording()
length, encoding = self.read_length_with_encoding(iowrapper)
record_buffer = self._callback.start_module(self._key, self._decode_module_id(length), self._expiry)

if not record_buffer:
iowrapper.stop_recording()

opcode = self.read_length(iowrapper)
while opcode != REDIS_RDB_MODULE_OPCODE_EOF:
if opcode == REDIS_RDB_MODULE_OPCODE_SINT or opcode == REDIS_RDB_MODULE_OPCODE_UINT:
data = self.read_length(iowrapper)
elif opcode == REDIS_RDB_MODULE_OPCODE_FLOAT:
data = self.read_float(iowrapper)
elif opcode == REDIS_RDB_MODULE_OPCODE_DOUBLE:
data = read_double(iowrapper)
elif opcode == REDIS_RDB_MODULE_OPCODE_STRING:
data = self.read_string(iowrapper)
else:
raise Exception("Unknown module opcode %s" % opcode)
self._callback.handle_module_data(self._key, opcode, data)
# read the next item in the module data type
opcode = self.read_length(iowrapper)

buffer = None
if record_buffer:
# prepand the buffer with REDIS_RDB_TYPE_MODULE_2 type
buffer = struct.pack('B', REDIS_RDB_TYPE_MODULE_2) + iowrapper.get_recorded_buffer()
iowrapper.stop_recording()
self._callback.end_module(self._key, buffer_size=iowrapper.get_recorded_size(), buffer=buffer)

charset = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_'

def _decode_module_id(self, module_id):
"""
decode module id to string
based on @antirez moduleTypeNameByID function from redis/src/module.c
:param module_id: 64bit integer
:return: string
"""
name = [''] * 9
module_id >>= 10
for i in reversed(range(9)):
name[i] = self.charset[module_id & 63]
module_id >>= 6
return ''.join(name)

def verify_magic_string(self, magic_string) :
if magic_string != b'REDIS' :
raise Exception('verify_magic_string', 'Invalid File Format')
Expand Down
Binary file added tests/dumps/redis_40_with_module.rdb
Binary file not shown.
19 changes: 17 additions & 2 deletions tests/memprofiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,25 @@
from rdbtools import MemoryCallback
import os

from rdbtools.memprofiler import MemoryRecord


class Stats(object):
def __init__(self):
self.records = {}

def next_record(self, record):
self.records[record.key] = record


def get_stats(file_name):
stats = Stats()
callback = MemoryCallback(stats, 64)
parser = RdbParser(callback)
parser.parse(os.path.join(os.path.dirname(__file__), 'dumps', file_name))
return stats.records



class MemoryCallbackTestCase(unittest.TestCase):
def setUp(self):
pass
Expand All @@ -26,3 +31,13 @@ def test_len_largest_element(self):
stats = get_stats('ziplist_that_compresses_easily.rdb')

self.assertEqual(stats['ziplist_compresses_easily'].len_largest_element, 36, "Length of largest element does not match")

def test_rdb_with_module(self):
stats = get_stats('redis_40_with_module.rdb')

self.assertTrue('simplekey' in stats)
self.assertTrue('foo' in stats)
expected_record = MemoryRecord(database=0, type='module', key='foo',
bytes=101, encoding='ReJSON-RL', size=1,
len_largest_element=101)
self.assertEquals(stats['foo'], expected_record)

0 comments on commit bd08e80

Please sign in to comment.