Skip to content

Commit

Permalink
Fix scheduler shutdown (#102)
Browse files Browse the repository at this point in the history
  • Loading branch information
droserasprout authored Jul 19, 2021
1 parent 9356ef8 commit e14589a
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 5 deletions.
12 changes: 10 additions & 2 deletions src/dipdup/dipdup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import hashlib
import logging
from contextlib import AsyncExitStack
from contextlib import AsyncExitStack, asynccontextmanager
from os import listdir
from os.path import join
from typing import Dict, List, Optional, cast
Expand Down Expand Up @@ -216,7 +216,7 @@ async def run(self, reindex: bool, oneshot: bool) -> None:
datasource_tasks = [] if oneshot else [asyncio.create_task(d.run()) for d in self._datasources.values()]

if self._config.jobs and not oneshot:
stack.enter_context(self._scheduler)
await stack.enter_async_context(self._scheduler_context())
for job_name, job_config in self._config.jobs.items():
add_job(self._ctx, self._scheduler, job_name, job_config)

Expand Down Expand Up @@ -351,3 +351,11 @@ def _finish_migration(self, version: str) -> None:
self._logger.warning('Your project has been migrated to spec version %s.', version)
self._logger.warning('Review and commit changes before proceeding.')
self._logger.warning('==================== WARNING =====================')

@asynccontextmanager
async def _scheduler_context(self):
self._scheduler.start()
try:
yield
finally:
self._scheduler.shutdown()
1 change: 1 addition & 0 deletions src/dipdup/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ async def __aenter__(self) -> None:
)

async def __aexit__(self, exc_type, exc, tb):
self._logger.info('Closing gateway session (%s)', self._url)
await self.__session.close()

@property
Expand Down
11 changes: 8 additions & 3 deletions src/dipdup/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from contextlib import AsyncExitStack

from apscheduler.executors.asyncio import AsyncIOExecutor # type: ignore
from apscheduler.jobstores.memory import MemoryJobStore # type: ignore
from apscheduler.schedulers.asyncio import AsyncIOScheduler # type: ignore
Expand Down Expand Up @@ -31,16 +33,19 @@ def create_scheduler() -> AsyncIOScheduler:


def add_job(ctx: DipDupContext, scheduler: AsyncIOScheduler, job_name: str, job_config: JobConfig) -> None:
async def _atomic_wrapper(ctx, args):
async with in_global_transaction():
async def _wrapper(ctx, args) -> None:
nonlocal job_config
async with AsyncExitStack() as stack:
if job_config.atomic:
await stack.enter_async_context(in_global_transaction())
await job_config.callback_fn(ctx, args)

if job_config.crontab:
trigger = CronTrigger.from_crontab(job_config.crontab)
elif job_config.interval:
trigger = IntervalTrigger(seconds=job_config.interval)
scheduler.add_job(
func=_atomic_wrapper if job_config.atomic else job_config.callback_fn,
func=_wrapper,
id=job_name,
name=job_name,
trigger=trigger,
Expand Down

0 comments on commit e14589a

Please sign in to comment.