diff --git a/synapse/lib/lmdblayer.py b/synapse/lib/lmdblayer.py index 9a8ad9ae74..35aac99730 100644 --- a/synapse/lib/lmdblayer.py +++ b/synapse/lib/lmdblayer.py @@ -96,11 +96,11 @@ def _migrate_db_pre010(self, dbname, newslab): logger.info('Pre-010 %s migration starting. Total rows: %d...', dbname, entries) def progfunc(count): - logger.info('Progress %d/%d (%2.2f%)', count, entries, count / entries * 100) + logger.info('Progress %d/%d (%2.2f%%)', count, entries, count / entries * 100) oldslab.copydb(olddb, newslab, destdbname=dbname, progresscb=progfunc) logger.info('Pre-010 %s migration copying done. Deleting from old location...', dbname) - oldslab.dropdb(olddb) + oldslab.dropdb(dbname) logger.info('Pre-010 %s migration completed.', dbname) return True diff --git a/synapse/lib/lmdbslab.py b/synapse/lib/lmdbslab.py index 1b3eab81e0..00e35acb95 100644 --- a/synapse/lib/lmdbslab.py +++ b/synapse/lib/lmdbslab.py @@ -17,7 +17,7 @@ import synapse.lib.msgpack as s_msgpack COPY_CHUNKSIZE = 512 -PROGRESS_PERIOD = COPY_CHUNKSIZE * 128 +PROGRESS_PERIOD = COPY_CHUNKSIZE * 1024 # By default, double the map size each time we run out of space, until this amount, and then we only increase by that MAX_DOUBLE_SIZE = 100 * s_const.gibibyte @@ -363,13 +363,34 @@ def _growMapSize(self, size=None): return self.mapsize def initdb(self, name, dupsort=False): - with self._noCoXact(): - while True: - try: - db = self.lenv.open_db(name.encode('utf8'), dupsort=dupsort) - return LmdbDatabase(db, dupsort) - except lmdb.MapFullError: - self._growMapSize() + while True: + try: + db = self.lenv.open_db(name.encode('utf8'), txn=self.xact, dupsort=dupsort) + self.dirty = True + self.forcecommit() + return LmdbDatabase(db, dupsort) + except lmdb.MapFullError: + self._handle_mapfull() + + def dropdb(self, name): + ''' + Deletes an **entire database** (i.e. a table), losing all data. + ''' + if self.readonly: + raise s_exc.IsReadOnly() + + while True: + try: + if not self.dbexists(name): + return + db = self.initdb(name) + self.dirty = True + self.xact.drop(db.db, delete=True) + self.forcecommit() + return + + except lmdb.MapFullError: + self._handle_mapfull() def dbexists(self, name): ''' @@ -562,12 +583,6 @@ def putmulti(self, kvpairs, dupdata=False, append=False, db=_DefaultDB): except lmdb.MapFullError: return self._handle_mapfull() - def dropdb(self, db): - ''' - Deletes an **entire database** (i.e. a table), losing all data. - ''' - self.xact.drop(db.db, delete=True) - def copydb(self, sourcedb, destslab, destdbname=None, progresscb=None): ''' Copy an entire database in this slab to a new database in potentially another slab. @@ -597,7 +612,7 @@ def copydb(self, sourcedb, destslab, destdbname=None, progresscb=None): for chunk in s_common.chunks(self.scanByFull(db=sourcedb), COPY_CHUNKSIZE): ccount, acount = destslab.putmulti(chunk, dupdata=True, append=True, db=destdb) if ccount != len(chunk) or acount != len(chunk): - raise s_exc.BadCoreStore(mesg='Unexpected number of values written') + raise s_exc.BadCoreStore(mesg='Unexpected number of values written') # pragma: no cover rowcount += len(chunk) if progresscb is not None and 0 == (rowcount % PROGRESS_PERIOD): diff --git a/synapse/tests/test_lib_lmdbslab.py b/synapse/tests/test_lib_lmdbslab.py index 8a4ae0ea2b..c9253a8e63 100644 --- a/synapse/tests/test_lib_lmdbslab.py +++ b/synapse/tests/test_lib_lmdbslab.py @@ -21,6 +21,8 @@ async def test_lmdbslab_base(self): path = os.path.join(dirn, 'test.lmdb') + await self.asyncraises(s_exc.BadArg, s_lmdbslab.Slab.anit(path, map_size=None)) + slab = await s_lmdbslab.Slab.anit(path, map_size=1000000) foo = slab.initdb('foo') @@ -87,14 +89,17 @@ def progfunc(count): path2 = os.path.join(dirn, 'test2.lmdb') async with await s_lmdbslab.Slab.anit(path2, map_size=512 * 1024) as slab2: with patch('synapse.lib.lmdbslab.PROGRESS_PERIOD', 2): + self.eq(4, slab.copydb(foo2, slab2, destdbname='foo2', progresscb=progfunc)) self.gt(vardict.get('prog', 0), 0) # Test slab.drop and slab.dbexists self.true(slab.dbexists('foo2')) - slab.dropdb(foo2) + slab.dropdb('foo2') self.false(slab.dbexists('foo2')) + self.none(slab.dropdb('notadb')) + # start a scan and then fini the whole db... scan = slab.scanByPref(b'\x00', db=foo) self.eq((b'\x00\x01', b'hehe'), next(scan)) @@ -103,14 +108,33 @@ def progfunc(count): self.raises(s_exc.IsFini, next, scan) + async def test_lmdbslab_maxsize(self): + with self.getTestDir() as dirn: + path = os.path.join(dirn, 'test.lmdb') + + my_maxsize = 400000 + async with await s_lmdbslab.Slab.anit(path, map_size=100000, maxsize=my_maxsize) as slab: + foo = slab.initdb('foo', dupsort=True) + byts = b'\x00' * 256 + + # Trigger an out-of-space + with self.raises(s_exc.DbOutOfSpace): + + for i in range(400): + slab.put(b'\xff\xff\xff\xff' + s_common.guid().encode('utf8'), byts, db=foo) + + # lets ensure our maxsize persisted and it caps the mapsize + async with await s_lmdbslab.Slab.anit(path, map_size=100000, readonly=True) as newdb: + self.eq(my_maxsize, newdb.mapsize) + self.eq(my_maxsize, newdb.maxsize) + async def test_lmdbslab_grow(self): with self.getTestDir() as dirn: path = os.path.join(dirn, 'test.lmdb') - my_maxsize = 400000 - async with await s_lmdbslab.Slab.anit(path, map_size=100000, growsize=10000, maxsize=my_maxsize) as slab: + async with await s_lmdbslab.Slab.anit(path, map_size=100000, growsize=10000) as slab: foo = slab.initdb('foo', dupsort=True) foo2 = slab.initdb('foo2', dupsort=False) @@ -158,29 +182,17 @@ async def test_lmdbslab_grow(self): self.eq(count - 1, sum(1 for _ in iter)) self.eq(99, sum(1 for _ in iter2)) - # Trigger an out-of-space - try: - for i in range(400): - slab.put(b'\xff\xff\xff\xff' + s_common.guid().encode('utf8'), byts, db=foo) - - # Should have hit a DbOutOfSpace exception - self.true(0) - - except s_exc.DbOutOfSpace: - pass - # lets ensure our mapsize / growsize persisted, and make sure readonly works async with await s_lmdbslab.Slab.anit(path, map_size=100000, readonly=True) as newdb: - self.eq(my_maxsize, newdb.mapsize) self.eq(10000, newdb.growsize) - self.eq(my_maxsize, newdb.maxsize) foo = newdb.initdb('foo', dupsort=True) for _, _ in newdb.scanByRange(b'', db=foo): count += 1 self.gt(count, 300) # Make sure readonly is really readonly + self.raises(s_exc.IsReadOnly, newdb.dropdb, 'foo') self.raises(s_exc.IsReadOnly, newdb.put, b'1234', b'3456') self.raises(s_exc.IsReadOnly, newdb.replace, b'1234', b'3456') self.raises(s_exc.IsReadOnly, newdb.pop, b'1234') @@ -340,34 +352,42 @@ async def test_slab_mapfull_runsyncloop(self): ''' forcecommit in runSyncLoop can very occasionally trigger a mapfull ''' - fake_confdefs = ( # type: ignore + fake_confdefs = ( ('lmdb:mapsize', {'type': 'int', 'defval': s_const.mebibyte}), ('lmdb:maxsize', {'type': 'int', 'defval': None}), ('lmdb:growsize', {'type': 'int', 'defval': 128 * s_const.kibibyte}), ('lmdb:readahead', {'type': 'bool', 'defval': True}), ) with patch('synapse.lib.lmdblayer.LmdbLayer.confdefs', fake_confdefs): - batchsize = 1000 - numbatches = 4 + batchsize = 4000 + numbatches = 2 async with self.getTestCore() as core: + before_mapsize = core.layer.layrslab.mapsize for i in range(numbatches): async with await core.snap() as snap: ips = ((('test:int', i * 1000000 + x), {'props': {'loc': 'us'}}) for x in range(batchsize)) await alist(snap.addNodes(ips)) + # Wait for the syncloop to run + await asyncio.sleep(1.1) - await alist(core.streamstorm('test:int:loc=us | delnode')) - - async def lotsOfWrites(): - for i in range(numbatches): - async with await core.snap() as snap: - ips = ((('test:int', 20_000_000 + i * 1000000 + x), {'props': {'loc': 'cn'}}) - for x in range(batchsize)) - await alist(snap.addNodes(ips)) - await asyncio.sleep(0.1) + # Verify that it hit + self.gt(core.layer.layrslab.mapsize, before_mapsize) - task = core.schedCoro(lotsOfWrites()) + async def test_slab_mapfull_drop(self): + ''' + Test a mapfull in the middle of a dropdb + ''' + with self.getTestDir() as dirn: - nodes_left = await alist(core.eval('test:int:loc=us')) - self.len(0, nodes_left) + path = os.path.join(dirn, 'test.lmdb') + data = [i.to_bytes(4, 'little') for i in range(400)] - await task + async with await s_lmdbslab.Slab.anit(path, map_size=32000, growsize=5000) as slab: + slab.initdb('foo') + kvpairs = [(x, x) for x in data] + slab.putmulti(kvpairs) + slab.forcecommit() + before_mapsize = slab.mapsize + slab.dropdb('foo') + self.false(slab.dbexists('foo')) + self.gt(slab.mapsize, before_mapsize)