Skip to content

Commit

Permalink
User-defined cron jobs (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
droserasprout authored Jun 30, 2021
1 parent 144db32 commit 1442b02
Show file tree
Hide file tree
Showing 16 changed files with 257 additions and 44 deletions.
119 changes: 77 additions & 42 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ aiosignalrcore = "^0.9.2"
fcache = "^0.4.7"
click = "^8.0.1"
pyee = "^8.1.0"
APScheduler = "^3.7.0"
sentry-sdk = "^1.1.0"

[tool.poetry.dev-dependencies]
Expand Down
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file added src/demo_tzbtc/jobs/__init__.py
Empty file.
Empty file.
24 changes: 24 additions & 0 deletions src/dipdup/codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ async def create_package(self) -> None:
with open(join(handlers_path, '__init__.py'), 'w'):
pass

self._logger.info('Creating `%s.jobs` package', self._config.package)
jobs_path = join(self._config.package_path, 'jobs')
with suppress(FileExistsError):
mkdir(jobs_path)
with open(join(jobs_path, '__init__.py'), 'w'):
pass

self._logger.info('Creating `%s/sql` directory', self._config.package)
sql_path = join(self._config.package_path, 'sql')
with suppress(FileExistsError):
Expand Down Expand Up @@ -315,6 +322,23 @@ async def generate_user_handlers(self) -> None:
else:
raise NotImplementedError(f'Index kind `{index_config.kind}` is not supported')

async def generate_jobs(self) -> None:
if not self._config.jobs:
return

jobs_path = join(self._config.package_path, 'jobs')
with open(join(dirname(__file__), 'templates', 'job.py.j2')) as file:
job_template = Template(file.read())

job_callbacks = set(job_config.callback for job_config in self._config.jobs.values())
for job_callback in job_callbacks:
self._logger.info('Generating job `%s`', job_callback)
job_code = job_template.render(job=job_callback)
job_path = join(jobs_path, f'{job_callback}.py')
if not exists(job_path):
with open(job_path, 'w') as file:
file.write(job_code)

async def cleanup(self) -> None:
"""Remove fetched JSONSchemas"""
self._logger.info('Cleaning up')
Expand Down
26 changes: 26 additions & 0 deletions src/dipdup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,10 @@ class HandlerConfig:

def __post_init_post_parse__(self):
self._callback_fn = None
if self.callback in (ROLLBACK_HANDLER, CONFIGURE_HANDLER):
raise ConfigurationError(f'`{self.callback}` callback name is reserved')
if self.callback and self.callback != pascal_to_snake(self.callback):
raise ConfigurationError('`callback` field must conform to snake_case naming style')

@property
def callback_fn(self) -> Callable:
Expand Down Expand Up @@ -610,6 +614,13 @@ def valid_url(cls, v):
return v


@dataclass
class JobConfig(HandlerConfig):
crontab: str
args: Optional[Dict[str, Any]] = None
atomic: bool = False


@dataclass
class SentryConfig:
dsn: str
Expand All @@ -627,6 +638,7 @@ class DipDupConfig:
:param templates: Mapping of template aliases and index templates
:param database: Database config
:param hasura: Hasura config
:param jobs: Mapping of job aliases and job configs
:param sentry: Sentry integration config
"""

Expand All @@ -638,6 +650,7 @@ class DipDupConfig:
templates: Optional[Dict[str, IndexConfigTemplateT]] = None
database: Union[SqliteDatabaseConfig, PostgresDatabaseConfig] = SqliteDatabaseConfig(kind='sqlite')
hasura: Optional[HasuraConfig] = None
jobs: Optional[Dict[str, JobConfig]] = None
sentry: Optional[SentryConfig] = None

def __post_init_post_parse__(self):
Expand Down Expand Up @@ -833,6 +846,12 @@ def _initialize_handler_callback(self, handler_config: HandlerConfig) -> None:
callback_fn = getattr(handler_module, handler_config.callback)
handler_config.callback_fn = callback_fn

def _initialize_job_callback(self, job_config: JobConfig) -> None:
_logger.info('Registering job callback `%s`', job_config.callback)
job_module = importlib.import_module(f'{self.package}.jobs.{job_config.callback}')
callback_fn = getattr(job_module, job_config.callback)
job_config.callback_fn = callback_fn

def _initialize_index(self, index_name: str, index_config: IndexConfigT) -> None:
if index_name in self._initialized:
return
Expand Down Expand Up @@ -890,12 +909,19 @@ def _initialize_index(self, index_name: str, index_config: IndexConfigT) -> None

self._initialized.append(index_name)

def _initialize_jobs(self) -> None:
if not self.jobs:
return
for job_config in self.jobs.values():
self._initialize_job_callback(job_config)

def initialize(self) -> None:
_logger.info('Setting up handlers and types for package `%s`', self.package)

self.pre_initialize()
for index_name, index_config in self.indexes.items():
self._initialize_index(index_name, index_config)
self._initialize_jobs()


@dataclass
Expand Down
Loading

0 comments on commit 1442b02

Please sign in to comment.