Skip to content

Commit

Permalink
chore: refactor db debug logging (#798)
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler authored Nov 26, 2024
1 parent 535832b commit 7f1e8a2
Showing 1 changed file with 53 additions and 27 deletions.
80 changes: 53 additions & 27 deletions y/_db/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ async def _extend(self, objs) -> None:
Args:
objs: The objects to extend the list with.
"""
return self._objects.extend(objs)
if objs:
return self._objects.extend(objs)

def _remove(self, obj: T) -> None:
self._objects.remove(obj)
Expand All @@ -267,13 +268,13 @@ async def _load_cache(self, from_block: int) -> int:
if cached_thru := await self.executor.run(
self.cache.is_cached_thru, from_block
):
logger.warning(
logger.info(
"%s is cached thru block %s, loading from db", self, cached_thru
)
await self._extend(
await self.executor.run(self.cache.select, from_block, cached_thru)
)
logger.warning(
logger.info(
"%s loaded %s objects thru block %s from disk",
self,
len(self._objects),
Expand Down Expand Up @@ -368,6 +369,7 @@ def _get_block_for_obj(self, obj: T) -> int:
@a_sync.ASyncIterator.wrap
async def _objects_thru(self, block: Optional[int]) -> AsyncIterator[T]:
self._ensure_task()
debug_logs = logger.isEnabledFor(logging.DEBUG)
yielded = 0
done_thru = 0
while True:
Expand All @@ -392,7 +394,8 @@ async def _objects_thru(self, block: Optional[int]) -> AsyncIterator[T]:
elif block and done_thru >= block:
return
done_thru = self._lock.value
logger.debug("%s lock value %s to_block %s", self, done_thru, block)
if debug_logs:
logger._log(logging.DEBUG, "%s lock value %s to_block %s", (self, done_thru, block))
if block is None:
await asyncio.sleep(self._sleep_time)

Expand Down Expand Up @@ -427,11 +430,15 @@ async def _fetch(self) -> NoReturn:

@stuck_coro_debugger
async def _fetch_range_wrapped(
self, i: int, range_start: int, range_end: int
self, i: int, range_start: int, range_end: int, debug_logs: bool
) -> List[T]:
async with self.semaphore[range_end]:
logger.debug("fetching %s block %s to %s", self, range_start, range_end)
return i, range_end, await self._fetch_range(range_start, range_end)
if debug_logs:
async with self.semaphore[range_end]:
logger._log(logging.DEBUG, "fetching %s block %s to %s", (self, range_start, range_end))
return i, range_end, await self._fetch_range(range_start, range_end)
else:
async with self.semaphore[range_end]:
return i, range_end, await self._fetch_range(range_start, range_end)

async def _loop(self, from_block: int) -> NoReturn:
logger.debug("starting work loop for %s", self)
Expand All @@ -446,7 +453,11 @@ async def _loop(self, from_block: int) -> NoReturn:
async def _load_new_objects(
self, to_block: Optional[int] = None, start_from_block: Optional[int] = None
) -> None:
logger.debug("loading new objects for %s", self)
SLEEP_TIME = 1

if debug_logs := logger.isEnabledFor(logging.DEBUG):
logger._log(logging.DEBUG, "loading new objects for %s", (self, ))

start = (
v + 1 if (v := self._lock.value) else start_from_block or self.from_block
)
Expand All @@ -456,21 +467,28 @@ async def _load_new_objects(
raise ValueError(
f"start {start} is bigger than end {end}, can't do that"
)
else:

elif debug_logs:
while start > (end := await dank_mids.eth.block_number):
logger.debug(
"%s start %s is greater than end %s, sleeping...", self, start, end
logger._log(
logging.DEBUG, "%s start %s is greater than end %s, sleeping...", (self, start, end)
)
await asyncio.sleep(1)
await asyncio.sleep(SLEEP_TIME)

else:
while start > (end := await dank_mids.eth.block_number):
await asyncio.sleep(SLEEP_TIME)

await self._load_range(start, end)

@stuck_coro_debugger
async def _load_range(self, from_block: int, to_block: int) -> None:
logger.debug("loading block range %s to %s", from_block, to_block)
if debug_logs := logger.isEnabledFor(logging.DEBUG):
logger._log(logging.DEBUG, "loading block range %s to %s", (from_block, to_block))
chunks_yielded = 0
done = {}
coros = [
self._fetch_range_wrapped(i, start, end)
self._fetch_range_wrapped(i, start, end, debug_logs)
for i, (start, end) in enumerate(
block_ranges(from_block, to_block, self._chunk_size)
)
Expand All @@ -485,13 +503,14 @@ async def _load_range(self, from_block: int, to_block: int) -> None:
if i not in done:
break
end, objs = done.pop(i)
self._insert_chunk(objs, from_block, end)
self._insert_chunk(objs, from_block, end, debug_logs)
await self._extend(objs)
next_chunk_loaded = True
chunks_yielded += 1
if next_chunk_loaded:
await self._set_lock(end)
logger.debug("%s loaded thru block %s", self, end)
if debug_logs:
logger._log(logging.DEBUG, "%s loaded thru block %s", (self, end))

@stuck_coro_debugger
async def _set_lock(self, block: int) -> None:
Expand All @@ -503,21 +522,26 @@ async def _set_lock(self, block: int) -> None:
"""
self._lock.set(block)

def _insert_chunk(self, objs: List[T], from_block: int, done_thru: int) -> None:
def _insert_chunk(self, objs: List[T], from_block: int, done_thru: int, debug_logs: bool) -> None:
if (
(prev_task := self._db_task)
and prev_task.done()
and (e := prev_task.exception())
):
raise e
depth = prev_task._depth + 1 if prev_task else 0
logger.debug(
"%s queuing next db insert chunk %s thru block %s", self, depth, done_thru
)
task = asyncio.create_task(
coro=self.__insert_chunk(objs, from_block, done_thru, prev_task, depth),
name=f"_insert_chunk from {from_block} to {done_thru}",
)
insert_coro = self.__insert_chunk(objs, from_block, done_thru, prev_task, depth, debug_logs)
if debug_logs:
logger._log(
logging.DEBUG, "%s queuing next db insert chunk %s thru block %s", (self, depth, done_thru)
)
task = asyncio.create_task(
coro=insert_coro,
name=f"_insert_chunk from {from_block} to {done_thru}",
)
else:
task = asyncio.create_task(insert_coro)

task._depth = depth
task._prev_task = prev_task
self._db_task = task
Expand All @@ -542,13 +566,15 @@ async def __insert_chunk(
done_thru: int,
prev_chunk_task: Optional[asyncio.Task],
depth: int,
debug_logs: bool,
) -> None:
if prev_chunk_task:
await prev_chunk_task
if objs:
await self.bulk_insert(objs, executor=self.executor)
await self.bulk_insert(objs)
await self.executor.run(self.cache.set_metadata, from_block, done_thru)
logger.debug("%s chunk %s thru block %s is now in db", self, depth, done_thru)
if debug_logs:
logger._log(logging.DEBUG, "%s chunk %s thru block %s is now in db", (self, depth, done_thru))


def _clean_addresses(addresses) -> Union[str, List[str]]:
Expand Down

0 comments on commit 7f1e8a2

Please sign in to comment.