Skip to content

Commit

Permalink
package: Add support for different query job types: (y-scope#452)
Browse files Browse the repository at this point in the history
- Add abstract classes for QueryJobType and QueryJobConfig.
- Submit and track job type in the search jobs table.
- Guard search-job specific handling in the scheduler.
  • Loading branch information
haiqi96 authored and Jack Luo committed Dec 4, 2024
1 parent abdbfcf commit e614a48
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import pymongo
from clp_py_utils.clp_config import Database, QUERY_JOBS_TABLE_NAME, ResultsCache
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.scheduler.constants import QueryJobStatus
from job_orchestration.scheduler.job_config import AggregationConfig, SearchConfig
from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType
from job_orchestration.scheduler.job_config import AggregationConfig, SearchJobConfig

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
Expand Down Expand Up @@ -83,7 +83,7 @@ def create_and_monitor_job_in_db(
do_count_aggregation: bool | None,
count_by_time_bucket_size: int | None,
):
search_config = SearchConfig(
search_config = SearchJobConfig(
query_string=wildcard_query,
begin_timestamp=begin_timestamp,
end_timestamp=end_timestamp,
Expand Down Expand Up @@ -111,8 +111,8 @@ def create_and_monitor_job_in_db(
) as db_cursor:
# Create job
db_cursor.execute(
f"INSERT INTO `{QUERY_JOBS_TABLE_NAME}` (`job_config`) VALUES (%s)",
(msgpack.packb(search_config.dict()),),
f"INSERT INTO `{QUERY_JOBS_TABLE_NAME}` (`job_config`, `type`) VALUES (%s, %s)",
(msgpack.packb(search_config.dict()), QueryJobType.SEARCH_OR_AGGREGATION),
)
db_conn.commit()
job_id = db_cursor.lastrowid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def main(argv):
f"""
CREATE TABLE IF NOT EXISTS `{QUERY_JOBS_TABLE_NAME}` (
`id` INT NOT NULL AUTO_INCREMENT,
`type` INT NOT NULL,
`status` INT NOT NULL DEFAULT '{QueryJobStatus.PENDING}',
`creation_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
`num_tasks` INT NOT NULL DEFAULT '0',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from clp_py_utils.clp_logging import set_logging_level
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.executor.query.celery import app
from job_orchestration.scheduler.job_config import SearchConfig
from job_orchestration.scheduler.job_config import SearchJobConfig
from job_orchestration.scheduler.scheduler_data import QueryTaskResult, QueryTaskStatus

# Setup logging
Expand Down Expand Up @@ -41,7 +41,7 @@ def make_command(
clp_home: Path,
archives_dir: Path,
archive_id: str,
search_config: SearchConfig,
search_config: SearchJobConfig,
results_cache_uri: str,
results_collection: str,
):
Expand Down Expand Up @@ -113,7 +113,7 @@ def search(
self: Task,
job_id: str,
task_id: int,
search_config_obj: dict,
job_config_obj: dict,
archive_id: str,
clp_metadata_db_conn_params: dict,
results_cache_uri: str,
Expand All @@ -133,7 +133,7 @@ def search(

logger.info(f"Started task for job {job_id}")

search_config = SearchConfig.parse_obj(search_config_obj)
search_config = SearchJobConfig.parse_obj(job_config_obj)
sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_conn_params))

start_time = datetime.datetime.now()
Expand Down Expand Up @@ -168,7 +168,7 @@ def search(
task_id=task_id,
status=QueryTaskStatus.FAILED,
duration=0,
error_log_path=clo_log_path,
error_log_path=str(clo_log_path),
).dict()

update_search_task_metadata(
Expand Down Expand Up @@ -231,6 +231,6 @@ def sigterm_handler(_signo, _stack_frame):
)

if QueryTaskStatus.FAILED == search_status:
search_task_result.error_log_path = clo_log_path
search_task_result.error_log_path = str(clo_log_path)

return search_task_result.dict()
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,13 @@ def __str__(self) -> str:

def to_str(self) -> str:
return str(self.name)


class QueryJobType(IntEnum):
SEARCH_OR_AGGREGATION = 0

def __str__(self) -> str:
return str(self.value)

def to_str(self) -> str:
return str(self.name)
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ class AggregationConfig(BaseModel):
count_by_time_bucket_size: typing.Optional[int] = None # Milliseconds


class SearchConfig(BaseModel):
class QueryJobConfig(BaseModel): ...


class SearchJobConfig(QueryJobConfig):
query_string: str
max_num_results: int
tags: typing.Optional[typing.List[str]] = None
Expand Down
Loading

0 comments on commit e614a48

Please sign in to comment.