Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clp-package): Add support for extracting JSON streams from archives. #569

Merged
merged 40 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
97bcbd2
Make it work with a lot of duplications
haiqi96 Oct 30, 2024
db1bf0a
deduplication the code in task launching scripts
haiqi96 Oct 30, 2024
aea5630
Remove redundant task for json. Combine them into one
haiqi96 Oct 30, 2024
d0e3b12
Linter
haiqi96 Oct 30, 2024
6af5d5a
refactor
haiqi96 Oct 30, 2024
496e8d8
Linter
haiqi96 Oct 31, 2024
c4e118d
refactor
haiqi96 Nov 1, 2024
a75c42d
proposal
haiqi96 Nov 2, 2024
66fb365
Add actual refactor
haiqi96 Nov 4, 2024
310f0d5
Linter
haiqi96 Nov 4, 2024
069f568
deduplication
haiqi96 Nov 4, 2024
30c03e3
Decompressor end renaming
haiqi96 Nov 4, 2024
acb0237
Renaming for task and scheduler
haiqi96 Nov 4, 2024
5f12b38
Mass renaming
haiqi96 Nov 4, 2024
3c23fd2
Renaming for webui
haiqi96 Nov 5, 2024
be75c9c
missing change
haiqi96 Nov 5, 2024
383781a
polishing
haiqi96 Nov 5, 2024
9518074
Update comments
haiqi96 Nov 5, 2024
ebe6395
fixes
haiqi96 Nov 5, 2024
7b79dff
fixes
haiqi96 Nov 7, 2024
7498c69
fixes2
haiqi96 Nov 7, 2024
cc9b5c2
update webui part
haiqi96 Nov 8, 2024
cda0fd2
Merge branch 'main' into json_extraction_temp
haiqi96 Nov 12, 2024
3c30054
Apply suggestions from code review
haiqi96 Nov 15, 2024
90e55d5
Address code review suggestions
haiqi96 Nov 15, 2024
63a18d7
Address more code review suggestions
haiqi96 Nov 15, 2024
b900553
fix for previous commit
haiqi96 Nov 15, 2024
6b5e60f
Apply suggestions from code review
haiqi96 Nov 15, 2024
ff15840
fix for previous commit (again)
haiqi96 Nov 15, 2024
549d3c4
First refactor
haiqi96 Nov 15, 2024
972cb2b
Further refactor
haiqi96 Nov 15, 2024
5ec988f
Update comments
haiqi96 Nov 15, 2024
e65799e
small polishing
haiqi96 Nov 15, 2024
fb8e2a6
Further polishing
haiqi96 Nov 15, 2024
314e008
Use proper encapsulation for class data member variables
haiqi96 Nov 16, 2024
3d86291
Linter
haiqi96 Nov 16, 2024
4b84812
Apply suggestions from code review
haiqi96 Nov 17, 2024
7a3398f
Address code review
haiqi96 Nov 17, 2024
1d1599e
Fix silly mistakes
haiqi96 Nov 18, 2024
a5eb217
Update comment.
kirkrodrigues Nov 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
# CONSTANTS
EXTRACT_FILE_CMD = "x"
EXTRACT_IR_CMD = "i"
EXTRACT_JSON_CMD = "j"

# Paths
CONTAINER_CLP_HOME = pathlib.Path("/") / "opt" / "clp"
Expand Down Expand Up @@ -84,7 +85,7 @@ def __init__(self, clp_home: pathlib.Path, docker_clp_home: pathlib.Path):
self.data_dir: typing.Optional[DockerMount] = None
self.logs_dir: typing.Optional[DockerMount] = None
self.archives_output_dir: typing.Optional[DockerMount] = None
self.ir_output_dir: typing.Optional[DockerMount] = None
self.stream_output_dir: typing.Optional[DockerMount] = None


def get_clp_home():
Expand Down Expand Up @@ -251,17 +252,17 @@ def generate_container_config(
container_clp_config.archive_output.directory,
)

container_clp_config.ir_output.directory = pathlib.Path("/") / "mnt" / "ir-output"
container_clp_config.stream_output.directory = pathlib.Path("/") / "mnt" / "stream-output"
if not is_path_already_mounted(
clp_home,
CONTAINER_CLP_HOME,
clp_config.ir_output.directory,
container_clp_config.ir_output.directory,
clp_config.stream_output.directory,
container_clp_config.stream_output.directory,
):
docker_mounts.ir_output_dir = DockerMount(
docker_mounts.stream_output_dir = DockerMount(
DockerMountType.BIND,
clp_config.ir_output.directory,
container_clp_config.ir_output.directory,
clp_config.stream_output.directory,
container_clp_config.stream_output.directory,
)

return container_clp_config, docker_mounts
Expand Down Expand Up @@ -482,7 +483,7 @@ def validate_results_cache_config(
def validate_worker_config(clp_config: CLPConfig):
clp_config.validate_input_logs_dir()
clp_config.validate_archive_output_dir()
clp_config.validate_ir_output_dir()
clp_config.validate_stream_output_dir()


def validate_webui_config(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
dump_container_config,
EXTRACT_FILE_CMD,
EXTRACT_IR_CMD,
EXTRACT_JSON_CMD,
generate_container_config,
generate_container_name,
generate_container_start_cmd,
Expand Down Expand Up @@ -146,11 +147,11 @@ def handle_extract_file_cmd(
return 0


def handle_extract_ir_cmd(
def handle_extract_stream_cmd(
parsed_args, clp_home: pathlib.Path, default_config_file_path: pathlib.Path
) -> int:
"""
Handles the IR extraction command.
Handles the stream extraction command.
:param parsed_args:
:param clp_home:
:param default_config_file_path:
Expand All @@ -174,29 +175,41 @@ def handle_extract_ir_cmd(
)

# fmt: off
job_command = parsed_args.command
extract_cmd = [
"python3",
"-m", "clp_package_utils.scripts.native.decompress",
"--config", str(generated_config_path_on_container),
EXTRACT_IR_CMD,
str(parsed_args.msg_ix),
job_command
]
# fmt: on
if parsed_args.orig_file_id:
extract_cmd.append("--orig-file-id")
extract_cmd.append(str(parsed_args.orig_file_id))

if EXTRACT_IR_CMD == job_command:
extract_cmd.append(str(parsed_args.msg_ix))
if parsed_args.orig_file_id:
extract_cmd.append("--orig-file-id")
extract_cmd.append(str(parsed_args.orig_file_id))
else:
extract_cmd.append("--orig-file-path")
extract_cmd.append(str(parsed_args.orig_file_path))
if parsed_args.target_uncompressed_size:
extract_cmd.append("--target-uncompressed-size")
extract_cmd.append(str(parsed_args.target_uncompressed_size))
elif EXTRACT_JSON_CMD == job_command:
extract_cmd.append(str(parsed_args.archive_id))
if parsed_args.target_chunk_size:
extract_cmd.append("--target-chunk-size")
extract_cmd.append(str(parsed_args.target_chunk_size))
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
else:
extract_cmd.append("--orig-file-path")
extract_cmd.append(str(parsed_args.orig_file_path))
if parsed_args.target_uncompressed_size:
extract_cmd.append("--target-uncompressed-size")
extract_cmd.append(str(parsed_args.target_uncompressed_size))
logger.error(f"Unexpected command: {job_command}")
return -1

haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
cmd = container_start_cmd + extract_cmd

try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError:
logger.exception("Docker or IR extraction command failed.")
logger.exception("Docker or stream extraction command failed.")
return -1

# Remove generated files
Expand Down Expand Up @@ -241,13 +254,20 @@ def main(argv):
group.add_argument("--orig-file-id", type=str, help="Original file's ID.")
group.add_argument("--orig-file-path", type=str, help="Original file's path.")

# JSON extraction command parser
json_extraction_parser = command_args_parser.add_parser(EXTRACT_JSON_CMD)
json_extraction_parser.add_argument("archive_id", type=str, help="Archive ID")
json_extraction_parser.add_argument(
"--target-chunk-size", type=int, help="Target chunk size", default=100000
)

parsed_args = args_parser.parse_args(argv[1:])

command = parsed_args.command
if EXTRACT_FILE_CMD == command:
return handle_extract_file_cmd(parsed_args, clp_home, default_config_file_path)
elif EXTRACT_IR_CMD == command:
return handle_extract_ir_cmd(parsed_args, clp_home, default_config_file_path)
elif command in (EXTRACT_IR_CMD, EXTRACT_JSON_CMD):
return handle_extract_stream_cmd(parsed_args, clp_home, default_config_file_path)
else:
logger.exception(f"Unexpected command: {command}")
return -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,17 @@
from clp_py_utils.clp_config import CLP_METADATA_TABLE_PREFIX, CLPConfig, Database
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType
from job_orchestration.scheduler.job_config import ExtractIrJobConfig
from job_orchestration.scheduler.job_config import (
ExtractIrJobConfig,
ExtractJsonJobConfig,
QueryJobConfig,
)

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
EXTRACT_FILE_CMD,
EXTRACT_IR_CMD,
EXTRACT_JSON_CMD,
get_clp_home,
load_config_file,
)
Expand Down Expand Up @@ -70,45 +75,37 @@ def get_orig_file_id(db_config: Database, path: str) -> Optional[str]:
return results[0]["orig_file_id"]


def submit_and_monitor_ir_extraction_job_in_db(
def submit_and_monitor_extraction_job_in_db(
db_config: Database,
orig_file_id: str,
msg_ix: int,
target_uncompressed_size: Optional[int],
job_type: QueryJobType,
job_config: QueryJobConfig,
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
) -> int:
"""
Submits an IR extraction job to the scheduler and waits until the job finishes.
Submits a stream extraction job to the scheduler and waits until it finishes.
:param db_config:
:param orig_file_id:
:param msg_ix:
:param target_uncompressed_size:
:param job_type:
:param job_config:
:return: 0 on success, -1 otherwise.
"""
extract_ir_config = ExtractIrJobConfig(
orig_file_id=orig_file_id,
msg_ix=msg_ix,
target_uncompressed_size=target_uncompressed_size,
)

sql_adapter = SQL_Adapter(db_config)
job_id = submit_query_job(sql_adapter, extract_ir_config, QueryJobType.EXTRACT_IR)
job_id = submit_query_job(sql_adapter, job_config, job_type)
job_status = wait_for_query_job(sql_adapter, job_id)

if QueryJobStatus.SUCCEEDED == job_status:
logger.info(f"Finished IR extraction job {job_id}.")
logger.info(f"Finished extraction job {job_id}.")
return 0

logger.error(
f"IR extraction job {job_id} finished with unexpected status: {job_status.to_str()}."
)
logger.error(f"Extraction job {job_id} finished with unexpected status: {job_status.to_str()}.")
return -1


def handle_extract_ir_cmd(
parsed_args: argparse.Namespace, clp_home: pathlib.Path, default_config_file_path: pathlib.Path
def handle_extract_stream_cmd(
parsed_args: argparse.Namespace,
clp_home: pathlib.Path,
default_config_file_path: pathlib.Path,
) -> int:
"""
Handles the IR extraction command.
Handles the stream extraction command.
:param parsed_args:
:param clp_home:
:param default_config_file_path:
Expand All @@ -121,26 +118,46 @@ def handle_extract_ir_cmd(
if clp_config is None:
return -1

orig_file_id: str
if parsed_args.orig_file_id:
orig_file_id = parsed_args.orig_file_id
command = parsed_args.command

job_config: QueryJobConfig
job_type: QueryJobType
if EXTRACT_IR_CMD == command:
job_type = QueryJobType.EXTRACT_IR
orig_file_id: str
if parsed_args.orig_file_id:
orig_file_id = parsed_args.orig_file_id
else:
orig_file_path = parsed_args.orig_file_path
orig_file_id = get_orig_file_id(clp_config.database, orig_file_path)
if orig_file_id is None:
logger.error(f"Cannot find orig_file_id corresponding to '{orig_file_path}'.")
return -1
job_config = ExtractIrJobConfig(
orig_file_id=orig_file_id,
msg_ix=parsed_args.msg_ix,
target_uncompressed_size=parsed_args.target_uncompressed_size,
)
elif EXTRACT_JSON_CMD == command:
job_type = QueryJobType.EXTRACT_JSON
job_config = ExtractJsonJobConfig(
archive_id=parsed_args.archive_id, target_chunk_size=parsed_args.target_chunk_size
)
else:
orig_file_id = get_orig_file_id(clp_config.database, parsed_args.orig_file_path)
if orig_file_id is None:
return -1
logger.error(f"Unsupported stream extraction command: {command}")
return -1

try:
return asyncio.run(
run_function_in_process(
submit_and_monitor_ir_extraction_job_in_db,
submit_and_monitor_extraction_job_in_db,
clp_config.database,
orig_file_id,
parsed_args.msg_ix,
parsed_args.target_uncompressed_size,
job_type,
job_config,
)
)
except asyncio.CancelledError:
logger.error("IR extraction cancelled.")
logger.error("Stream extraction cancelled.")
return -1


Expand Down Expand Up @@ -278,13 +295,20 @@ def main(argv):
group.add_argument("--orig-file-id", type=str, help="Original file's ID.")
group.add_argument("--orig-file-path", type=str, help="Original file's path.")

# JSON extraction command parser
json_extraction_parser = command_args_parser.add_parser(EXTRACT_JSON_CMD)
json_extraction_parser.add_argument("archive_id", type=str, help="Archive ID")
json_extraction_parser.add_argument(
"--target-chunk-size", type=int, help="Target chunk size.", required=True
)

parsed_args = args_parser.parse_args(argv[1:])

command = parsed_args.command
if EXTRACT_FILE_CMD == command:
return handle_extract_file_cmd(parsed_args, clp_home, default_config_file_path)
elif EXTRACT_IR_CMD == command:
return handle_extract_ir_cmd(parsed_args, clp_home, default_config_file_path)
elif command in (EXTRACT_IR_CMD, EXTRACT_JSON_CMD):
return handle_extract_stream_cmd(parsed_args, clp_home, default_config_file_path)
else:
logger.exception(f"Unexpected command: {command}")
return -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def create_results_cache_indices(
"python3",
str(clp_py_utils_dir / "create-results-cache-indices.py"),
"--uri", container_clp_config.results_cache.get_uri(),
"--ir-collection", container_clp_config.results_cache.ir_collection_name,
"--stream-collection", container_clp_config.results_cache.stream_collection_name,
]
# fmt: on

Expand Down Expand Up @@ -660,10 +660,10 @@ def start_query_worker(
celery_method = "job_orchestration.executor.query"
celery_route = f"{QueueName.QUERY}"

query_worker_mount = [mounts.ir_output_dir]
query_worker_mount = [mounts.stream_output_dir]
query_worker_env = {
"CLP_IR_OUTPUT_DIR": container_clp_config.ir_output.directory,
"CLP_IR_COLLECTION": clp_config.results_cache.ir_collection_name,
"CLP_STREAM_OUTPUT_DIR": container_clp_config.stream_output.directory,
"CLP_STREAM_COLLECTION_NAME": clp_config.results_cache.stream_collection_name,
}

generic_start_worker(
Expand Down Expand Up @@ -710,7 +710,7 @@ def generic_start_worker(

# Create necessary directories
clp_config.archive_output.directory.mkdir(parents=True, exist_ok=True)
clp_config.ir_output.directory.mkdir(parents=True, exist_ok=True)
clp_config.stream_output.directory.mkdir(parents=True, exist_ok=True)

clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages"
# fmt: off
Expand Down Expand Up @@ -933,9 +933,9 @@ def start_log_viewer_webui(
"MongoDbHost": clp_config.results_cache.host,
"MongoDbPort": clp_config.results_cache.port,
"MongoDbName": clp_config.results_cache.db_name,
"MongoDbIrFilesCollectionName": clp_config.results_cache.ir_collection_name,
"MongoDbStreamFilesCollectionName": clp_config.results_cache.stream_collection_name,
"ClientDir": str(container_log_viewer_webui_dir / "client"),
"IrFilesDir": str(container_clp_config.ir_output.directory),
"StreamFilesDir": str(container_clp_config.stream_output.directory),
"LogViewerDir": str(container_log_viewer_webui_dir / "yscope-log-viewer"),
}
settings_json = read_and_update_settings_json(settings_json_path, settings_json_updates)
Expand All @@ -961,7 +961,7 @@ def start_log_viewer_webui(
# fmt: on
necessary_mounts = [
mounts.clp_home,
mounts.ir_output_dir,
mounts.stream_output_dir,
]
for mount in necessary_mounts:
if mount:
Expand Down
Loading