Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

package: Add support for different query job types: #452

Merged
merged 12 commits into from
Jun 24, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import pymongo
from clp_py_utils.clp_config import Database, QUERY_JOBS_TABLE_NAME, ResultsCache
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.scheduler.constants import QueryJobStatus
from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType
from job_orchestration.scheduler.job_config import AggregationConfig, SearchConfig

from clp_package_utils.general import (
Expand Down Expand Up @@ -111,8 +111,8 @@ def create_and_monitor_job_in_db(
) as db_cursor:
# Create job
db_cursor.execute(
f"INSERT INTO `{QUERY_JOBS_TABLE_NAME}` (`job_config`) VALUES (%s)",
(msgpack.packb(search_config.dict()),),
f"INSERT INTO `{QUERY_JOBS_TABLE_NAME}` (`job_config`, `type`) VALUES (%s, %s)",
(msgpack.packb(search_config.dict()), QueryJobType.SEARCH),
)
db_conn.commit()
job_id = db_cursor.lastrowid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def main(argv):
f"""
CREATE TABLE IF NOT EXISTS `{QUERY_JOBS_TABLE_NAME}` (
`id` INT NOT NULL AUTO_INCREMENT,
`type` INT NOT NULL,
`status` INT NOT NULL DEFAULT '{QueryJobStatus.PENDING}',
`creation_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
`num_tasks` INT NOT NULL DEFAULT '0',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def search(
self: Task,
job_id: str,
task_id: int,
search_config_obj: dict,
job_config_obj: dict,
archive_id: str,
clp_metadata_db_conn_params: dict,
results_cache_uri: str,
Expand All @@ -133,7 +133,7 @@ def search(

logger.info(f"Started task for job {job_id}")

search_config = SearchConfig.parse_obj(search_config_obj)
search_config = SearchConfig.parse_obj(job_config_obj)
sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_conn_params))

start_time = datetime.datetime.now()
Expand Down Expand Up @@ -168,7 +168,7 @@ def search(
task_id=task_id,
status=QueryTaskStatus.FAILED,
duration=0,
error_log_path=clo_log_path,
error_log_path=str(clo_log_path),
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
).dict()

update_search_task_metadata(
Expand Down Expand Up @@ -231,6 +231,6 @@ def sigterm_handler(_signo, _stack_frame):
)

if QueryTaskStatus.FAILED == search_status:
search_task_result.error_log_path = clo_log_path
search_task_result.error_log_path = str(clo_log_path)

return search_task_result.dict()
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,13 @@ def __str__(self) -> str:

def to_str(self) -> str:
return str(self.name)


class QueryJobType(IntEnum):
SEARCH = 0

def __str__(self) -> str:
return str(self.value)

def to_str(self) -> str:
return str(self.name)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import typing
from abc import ABC

from pydantic import BaseModel, validator

Expand Down Expand Up @@ -39,7 +40,10 @@ class AggregationConfig(BaseModel):
count_by_time_bucket_size: typing.Optional[int] = None # Milliseconds


class SearchConfig(BaseModel):
class QueryConfig(BaseModel, ABC): ...
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved


class SearchConfig(QueryConfig):
query_string: str
max_num_results: int
tags: typing.Optional[typing.List[str]] = None
Expand Down
Loading