Skip to content

Commit

Permalink
lmdbslab.dropdb, initdb fixes (#1174)
Browse files Browse the repository at this point in the history
* Fix logging typo on moving DB progress

* lmdbslab.dropdb, initdb fixes

Fix failure in dropdb.  Made similar change to initdb to match
Fix logger formatting failure in lmdblayer
Separate maxsize test to fix occasional test failure
Optimize mapfull_runsyncloop test

dropdb could raise lmdb.MapFullError.  This requires an API change from
passing the DB name instead of the DB object.
  • Loading branch information
jnwatson authored Mar 20, 2019
1 parent e84e2e6 commit d6a7b0d
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 49 deletions.
4 changes: 2 additions & 2 deletions synapse/lib/lmdblayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ def _migrate_db_pre010(self, dbname, newslab):
logger.info('Pre-010 %s migration starting. Total rows: %d...', dbname, entries)

def progfunc(count):
logger.info('Progress %d/%d (%2.2f%)', count, entries, count / entries * 100)
logger.info('Progress %d/%d (%2.2f%%)', count, entries, count / entries * 100)

oldslab.copydb(olddb, newslab, destdbname=dbname, progresscb=progfunc)
logger.info('Pre-010 %s migration copying done. Deleting from old location...', dbname)
oldslab.dropdb(olddb)
oldslab.dropdb(dbname)
logger.info('Pre-010 %s migration completed.', dbname)

return True
Expand Down
45 changes: 30 additions & 15 deletions synapse/lib/lmdbslab.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import synapse.lib.msgpack as s_msgpack

COPY_CHUNKSIZE = 512
PROGRESS_PERIOD = COPY_CHUNKSIZE * 128
PROGRESS_PERIOD = COPY_CHUNKSIZE * 1024

# By default, double the map size each time we run out of space, until this amount, and then we only increase by that
MAX_DOUBLE_SIZE = 100 * s_const.gibibyte
Expand Down Expand Up @@ -363,13 +363,34 @@ def _growMapSize(self, size=None):
return self.mapsize

def initdb(self, name, dupsort=False):
with self._noCoXact():
while True:
try:
db = self.lenv.open_db(name.encode('utf8'), dupsort=dupsort)
return LmdbDatabase(db, dupsort)
except lmdb.MapFullError:
self._growMapSize()
while True:
try:
db = self.lenv.open_db(name.encode('utf8'), txn=self.xact, dupsort=dupsort)
self.dirty = True
self.forcecommit()
return LmdbDatabase(db, dupsort)
except lmdb.MapFullError:
self._handle_mapfull()

def dropdb(self, name):
'''
Deletes an **entire database** (i.e. a table), losing all data.
'''
if self.readonly:
raise s_exc.IsReadOnly()

while True:
try:
if not self.dbexists(name):
return
db = self.initdb(name)
self.dirty = True
self.xact.drop(db.db, delete=True)
self.forcecommit()
return

except lmdb.MapFullError:
self._handle_mapfull()

def dbexists(self, name):
'''
Expand Down Expand Up @@ -562,12 +583,6 @@ def putmulti(self, kvpairs, dupdata=False, append=False, db=_DefaultDB):
except lmdb.MapFullError:
return self._handle_mapfull()

def dropdb(self, db):
'''
Deletes an **entire database** (i.e. a table), losing all data.
'''
self.xact.drop(db.db, delete=True)

def copydb(self, sourcedb, destslab, destdbname=None, progresscb=None):
'''
Copy an entire database in this slab to a new database in potentially another slab.
Expand Down Expand Up @@ -597,7 +612,7 @@ def copydb(self, sourcedb, destslab, destdbname=None, progresscb=None):
for chunk in s_common.chunks(self.scanByFull(db=sourcedb), COPY_CHUNKSIZE):
ccount, acount = destslab.putmulti(chunk, dupdata=True, append=True, db=destdb)
if ccount != len(chunk) or acount != len(chunk):
raise s_exc.BadCoreStore(mesg='Unexpected number of values written')
raise s_exc.BadCoreStore(mesg='Unexpected number of values written') # pragma: no cover

rowcount += len(chunk)
if progresscb is not None and 0 == (rowcount % PROGRESS_PERIOD):
Expand Down
84 changes: 52 additions & 32 deletions synapse/tests/test_lib_lmdbslab.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ async def test_lmdbslab_base(self):

path = os.path.join(dirn, 'test.lmdb')

await self.asyncraises(s_exc.BadArg, s_lmdbslab.Slab.anit(path, map_size=None))

slab = await s_lmdbslab.Slab.anit(path, map_size=1000000)

foo = slab.initdb('foo')
Expand Down Expand Up @@ -87,14 +89,17 @@ def progfunc(count):
path2 = os.path.join(dirn, 'test2.lmdb')
async with await s_lmdbslab.Slab.anit(path2, map_size=512 * 1024) as slab2:
with patch('synapse.lib.lmdbslab.PROGRESS_PERIOD', 2):

self.eq(4, slab.copydb(foo2, slab2, destdbname='foo2', progresscb=progfunc))
self.gt(vardict.get('prog', 0), 0)

# Test slab.drop and slab.dbexists
self.true(slab.dbexists('foo2'))
slab.dropdb(foo2)
slab.dropdb('foo2')
self.false(slab.dbexists('foo2'))

self.none(slab.dropdb('notadb'))

# start a scan and then fini the whole db...
scan = slab.scanByPref(b'\x00', db=foo)
self.eq((b'\x00\x01', b'hehe'), next(scan))
Expand All @@ -103,14 +108,33 @@ def progfunc(count):

self.raises(s_exc.IsFini, next, scan)

async def test_lmdbslab_maxsize(self):
with self.getTestDir() as dirn:
path = os.path.join(dirn, 'test.lmdb')

my_maxsize = 400000
async with await s_lmdbslab.Slab.anit(path, map_size=100000, maxsize=my_maxsize) as slab:
foo = slab.initdb('foo', dupsort=True)
byts = b'\x00' * 256

# Trigger an out-of-space
with self.raises(s_exc.DbOutOfSpace):

for i in range(400):
slab.put(b'\xff\xff\xff\xff' + s_common.guid().encode('utf8'), byts, db=foo)

# lets ensure our maxsize persisted and it caps the mapsize
async with await s_lmdbslab.Slab.anit(path, map_size=100000, readonly=True) as newdb:
self.eq(my_maxsize, newdb.mapsize)
self.eq(my_maxsize, newdb.maxsize)

async def test_lmdbslab_grow(self):

with self.getTestDir() as dirn:

path = os.path.join(dirn, 'test.lmdb')
my_maxsize = 400000

async with await s_lmdbslab.Slab.anit(path, map_size=100000, growsize=10000, maxsize=my_maxsize) as slab:
async with await s_lmdbslab.Slab.anit(path, map_size=100000, growsize=10000) as slab:

foo = slab.initdb('foo', dupsort=True)
foo2 = slab.initdb('foo2', dupsort=False)
Expand Down Expand Up @@ -158,29 +182,17 @@ async def test_lmdbslab_grow(self):
self.eq(count - 1, sum(1 for _ in iter))
self.eq(99, sum(1 for _ in iter2))

# Trigger an out-of-space
try:
for i in range(400):
slab.put(b'\xff\xff\xff\xff' + s_common.guid().encode('utf8'), byts, db=foo)

# Should have hit a DbOutOfSpace exception
self.true(0)

except s_exc.DbOutOfSpace:
pass

# lets ensure our mapsize / growsize persisted, and make sure readonly works
async with await s_lmdbslab.Slab.anit(path, map_size=100000, readonly=True) as newdb:
self.eq(my_maxsize, newdb.mapsize)

self.eq(10000, newdb.growsize)
self.eq(my_maxsize, newdb.maxsize)
foo = newdb.initdb('foo', dupsort=True)
for _, _ in newdb.scanByRange(b'', db=foo):
count += 1
self.gt(count, 300)

# Make sure readonly is really readonly
self.raises(s_exc.IsReadOnly, newdb.dropdb, 'foo')
self.raises(s_exc.IsReadOnly, newdb.put, b'1234', b'3456')
self.raises(s_exc.IsReadOnly, newdb.replace, b'1234', b'3456')
self.raises(s_exc.IsReadOnly, newdb.pop, b'1234')
Expand Down Expand Up @@ -340,34 +352,42 @@ async def test_slab_mapfull_runsyncloop(self):
'''
forcecommit in runSyncLoop can very occasionally trigger a mapfull
'''
fake_confdefs = ( # type: ignore
fake_confdefs = (
('lmdb:mapsize', {'type': 'int', 'defval': s_const.mebibyte}),
('lmdb:maxsize', {'type': 'int', 'defval': None}),
('lmdb:growsize', {'type': 'int', 'defval': 128 * s_const.kibibyte}),
('lmdb:readahead', {'type': 'bool', 'defval': True}),
)
with patch('synapse.lib.lmdblayer.LmdbLayer.confdefs', fake_confdefs):
batchsize = 1000
numbatches = 4
batchsize = 4000
numbatches = 2
async with self.getTestCore() as core:
before_mapsize = core.layer.layrslab.mapsize
for i in range(numbatches):
async with await core.snap() as snap:
ips = ((('test:int', i * 1000000 + x), {'props': {'loc': 'us'}}) for x in range(batchsize))
await alist(snap.addNodes(ips))
# Wait for the syncloop to run
await asyncio.sleep(1.1)

await alist(core.streamstorm('test:int:loc=us | delnode'))

async def lotsOfWrites():
for i in range(numbatches):
async with await core.snap() as snap:
ips = ((('test:int', 20_000_000 + i * 1000000 + x), {'props': {'loc': 'cn'}})
for x in range(batchsize))
await alist(snap.addNodes(ips))
await asyncio.sleep(0.1)
# Verify that it hit
self.gt(core.layer.layrslab.mapsize, before_mapsize)

task = core.schedCoro(lotsOfWrites())
async def test_slab_mapfull_drop(self):
'''
Test a mapfull in the middle of a dropdb
'''
with self.getTestDir() as dirn:

nodes_left = await alist(core.eval('test:int:loc=us'))
self.len(0, nodes_left)
path = os.path.join(dirn, 'test.lmdb')
data = [i.to_bytes(4, 'little') for i in range(400)]

await task
async with await s_lmdbslab.Slab.anit(path, map_size=32000, growsize=5000) as slab:
slab.initdb('foo')
kvpairs = [(x, x) for x in data]
slab.putmulti(kvpairs)
slab.forcecommit()
before_mapsize = slab.mapsize
slab.dropdb('foo')
self.false(slab.dbexists('foo'))
self.gt(slab.mapsize, before_mapsize)

0 comments on commit d6a7b0d

Please sign in to comment.