Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace isort, black, pylint with ruff #246

Merged
merged 3 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 6 additions & 13 deletions clinvar_ingest/api/main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import datetime
import logging
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import PurePosixPath

from fastapi import BackgroundTasks, FastAPI, HTTPException, Request, status
from google.cloud import bigquery

import clinvar_ingest.config
from clinvar_ingest.api.middleware import LogRequests
Expand Down Expand Up @@ -62,12 +61,13 @@ async def health():
response_model=InitializeWorkflowResponse,
)
async def create_workflow_execution_id(initial_id: BigqueryDatasetId):
assert initial_id is not None and len(initial_id) > 0
if initial_id is None or len(initial_id) == 0:
raise ValueError("initial_id must be nonempty")
# default isoformat has colons, dashes, and periods
# e.g. 2024-01-31T19:13:03.185320
# we want to remove these to make a valid BigQuery table name
timestamp = (
datetime.utcnow()
datetime.datetime.now(tz=datetime.UTC)
.isoformat()
.replace(":", "")
.replace(".", "")
Expand Down Expand Up @@ -250,12 +250,6 @@ async def copy(

def task():
try:
# http_upload(
# http_uri=ftp_path,
# blob_uri=gcs_path,
# file_size=ftp_file_size,
# client=_get_gcs_client(),
# )

# Download to local file
http_download_curl(
Expand Down Expand Up @@ -398,7 +392,6 @@ def task():
tables_created = run_create_external_tables(payload)

for table_name, table in tables_created.items():
table: bigquery.Table = table
logger.info(
"Created table %s as %s:%s.%s",
table_name,
Expand Down Expand Up @@ -446,9 +439,9 @@ def task():

@app.post("/create_internal_tables", status_code=status.HTTP_201_CREATED)
async def create_internal_tables(payload: TodoRequest):
return {"todo": "implement me"}
return {"todo": "implement me", "payload": payload}


@app.post("/create_cleaned_tables", status_code=status.HTTP_201_CREATED)
async def create_cleaned_tables(payload: TodoRequest):
return {"todo": "implement me"}
return {"todo": "implement me", "payload": payload}
5 changes: 3 additions & 2 deletions clinvar_ingest/api/middleware.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import time
import uuid
from typing import Callable
from collections.abc import Callable

from starlette.middleware.base import BaseHTTPMiddleware, Request, Response

Expand All @@ -20,6 +20,7 @@ async def dispatch(self, request: Request, call_next: Callable) -> Response:
response = await call_next(request)
elapsed_ms = int((time.time() - start_ms) * MS_PER_S)
logger.info(
f"{request.method} {request.url.path} id={request_id} elapsed_ms={elapsed_ms} status_code={response.status_code}"
f"{request.method} {request.url.path} id={request_id} "
f"elapsed_ms={elapsed_ms} status_code={response.status_code}"
)
return response
17 changes: 8 additions & 9 deletions clinvar_ingest/api/model/requests.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import re
from collections.abc import Callable
from datetime import date, datetime
from pathlib import PurePath
from typing import Annotated, Any, Callable, Literal, Optional, Union
from typing import Annotated, Any, Literal

from pydantic import (
AnyUrl,
Expand Down Expand Up @@ -59,7 +60,7 @@ def _dump_fn(val):
# Request and response models


def strict_datetime_field_validator(cls, v, info: ValidationInfo) -> datetime:
def strict_datetime_field_validator(_cls, v, info: ValidationInfo) -> datetime:
# print(f"Validating {info.field_name} with value {v}")
if not v:
raise ValueError(f"{info.field_name} was empty")
Expand Down Expand Up @@ -105,7 +106,7 @@ class ClinvarFTPWatcherRequest(BaseModel):
release_date: date

file_format: Annotated[
Optional[Literal["vcv", "rcv"]],
Literal["vcv", "rcv"] | None,
Field(
description=(
"Type of file this request refers to. "
Expand Down Expand Up @@ -167,7 +168,7 @@ class ParseResponse(BaseModel):
Map of entity type to either GCS path (gs:// URLs) or path to local file
"""

parsed_files: dict[str, Union[GcsBlobPath, PurePathStr]]
parsed_files: dict[str, GcsBlobPath | PurePathStr]

@field_serializer("parsed_files", when_used="always")
def _serialize(self, v):
Expand Down Expand Up @@ -222,8 +223,6 @@ class DropExternalTablesRequest(CreateExternalTablesResponse):
Defines the arguments to the drop_external_tables endpoint
"""

pass


class InitializeWorkflowResponse(BaseModel):
"""
Expand All @@ -250,7 +249,7 @@ class InitializeStepRequest(BaseModel):

workflow_execution_id: str
step_name: StepName
message: Optional[str] = None
message: str | None = None


class InitializeStepResponse(BaseModel):
Expand Down Expand Up @@ -286,7 +285,7 @@ class GetStepStatusResponse(BaseModel):
step_name: StepName
step_status: StepStatus
timestamp: datetime
message: Optional[str] = None
message: str | None = None

@field_serializer("timestamp", when_used="always")
def _timestamp_serializer(self, v: datetime):
Expand All @@ -303,7 +302,7 @@ class StepStartedResponse(BaseModel):

timestamp: datetime
step_status: Literal[StepStatus.STARTED] = StepStatus.STARTED
message: Optional[str] = None
message: str | None = None

@field_serializer("timestamp", when_used="always")
def _timestamp_serializer(self, v: datetime):
Expand Down
9 changes: 5 additions & 4 deletions clinvar_ingest/api/status_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
services can monitor the status of the workflow jobs. The status files are written to a GCS bucket.
"""

import datetime
import json
import logging
from datetime import datetime

from google.cloud.storage import Blob
from google.cloud.storage import Client as GCSClient
Expand All @@ -21,8 +21,8 @@ def write_status_file(
file_prefix: str,
step: StepName,
status: StepStatus,
message: str = None,
timestamp: str = datetime.utcnow().isoformat(),
message: str | None = None,
timestamp: str = datetime.datetime.now(datetime.UTC).isoformat(),
) -> StatusValue:
"""
This function writes a status file to a GCS bucket. The status file is a JSON file with the following format:
Expand Down Expand Up @@ -71,7 +71,8 @@ def get_status_file(
blob: Blob = bucket.get_blob(f"{file_prefix}/{step}-{status}.json")
if blob is None:
raise ValueError(
f"Could not find status file for step {step} with status {status} in bucket {bucket} and file prefix {file_prefix}"
f"Could not find status file for step {step} with status {status} "
f"in bucket {bucket} and file prefix {file_prefix}"
)
content = blob.download_as_string()
return StatusValue(**json.loads(content))
28 changes: 14 additions & 14 deletions clinvar_ingest/cloud/bigquery/create_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ def schema_file_path_for_table(table_name: str) -> Path:
Returns the path to the BigQuery schema file for the given table name.
"""
raw_table_name = table_name.replace("_external", "")
schema_path = bq_schemas_dir / f"{raw_table_name}.bq.json"
return schema_path
return bq_schemas_dir / f"{raw_table_name}.bq.json"


def create_table(
Expand All @@ -77,8 +76,7 @@ def create_table(

table = bigquery.Table(table_ref, schema=None)
table.external_data_configuration = external_config
table = client.create_table(table, exists_ok=True)
return table
return client.create_table(table, exists_ok=True)


def run_create_external_tables(
Expand Down Expand Up @@ -107,7 +105,10 @@ def run_create_external_tables(
for table_name, gcs_blob_path in args.source_table_paths.items():
parsed_blob = parse_blob_uri(gcs_blob_path.root, gcs_client)
_logger.info(
"Parsed blob bucket: %s, path: %s", parsed_blob.bucket, parsed_blob.name
"Parsed blob bucket: %s, path: %s for table %s",
parsed_blob.bucket,
parsed_blob.name,
table_name,
)
bucket_obj = gcs_client.get_bucket(parsed_blob.bucket.name)
bucket_location = bucket_obj.location
Expand All @@ -125,7 +126,7 @@ def run_create_external_tables(
bq_client,
project=destination_project,
dataset_id=args.destination_dataset,
location=bucket_location, # type: ignore
location=bucket_location,
)
if not dataset_obj:
raise RuntimeError(f"Didn't get a dataset object back. run_create args: {args}")
Expand Down Expand Up @@ -160,14 +161,14 @@ def get_query_for_copy(
dest_table_ref: bigquery.TableReference,
) -> tuple[str, bool]:
dedupe_queries = {
"gene": f"CREATE OR REPLACE TABLE `{dest_table_ref}` AS "
"gene": f"CREATE OR REPLACE TABLE `{dest_table_ref}` AS " # noqa: S608
f"SELECT * EXCEPT (vcv_id, row_num) from "
f"(SELECT ge.*, ROW_NUMBER() OVER (PARTITION BY ge.id "
f"ORDER BY vcv.date_last_updated DESC, vcv.id DESC) row_num "
f"FROM `{source_table_ref}` AS ge "
f"JOIN `{dest_table_ref.project}.{dest_table_ref.dataset_id}.variation_archive` AS vcv "
f"ON ge.vcv_id = vcv.id) where row_num = 1",
"submission": f"CREATE OR REPLACE TABLE `{dest_table_ref}` AS "
"submission": f"CREATE OR REPLACE TABLE `{dest_table_ref}` AS " # noqa: S608
f"SELECT * EXCEPT (scv_id, row_num) from "
f"(SELECT se.*, ROW_NUMBER() OVER (PARTITION BY se.id "
f"ORDER BY vcv.date_last_updated DESC, vcv.id DESC) row_num "
Expand All @@ -177,7 +178,7 @@ def get_query_for_copy(
f"JOIN `{dest_table_ref.project}.{dest_table_ref.dataset_id}.variation_archive` AS vcv "
f"ON scv.variation_archive_id = vcv.id) "
f"where row_num = 1",
"submitter": f"CREATE OR REPLACE TABLE `{dest_table_ref}` AS "
"submitter": f"CREATE OR REPLACE TABLE `{dest_table_ref}` AS " # noqa: S608
f"SELECT * EXCEPT (scv_id, row_num) from "
f"(SELECT se.*, ROW_NUMBER() OVER (PARTITION BY se.id "
f"ORDER BY vcv.date_last_updated DESC, vcv.id DESC) row_num "
Expand All @@ -187,7 +188,7 @@ def get_query_for_copy(
f"JOIN `{dest_table_ref.project}.{dest_table_ref.dataset_id}.variation_archive` AS vcv "
f"ON scv.variation_archive_id = vcv.id) "
f"where row_num = 1",
"trait": f"CREATE OR REPLACE TABLE `{dest_table_ref}` AS "
"trait": f"CREATE OR REPLACE TABLE `{dest_table_ref}` AS " # noqa: S608
f"SELECT * EXCEPT (rcv_id, row_num) from "
f"(SELECT te.*, ROW_NUMBER() OVER (PARTITION BY te.id "
f"ORDER BY vcv.date_last_updated DESC, vcv.id DESC) row_num "
Expand All @@ -197,7 +198,7 @@ def get_query_for_copy(
f"JOIN `{dest_table_ref.project}.{dest_table_ref.dataset_id}.variation_archive` AS vcv "
f"ON rcv.variation_archive_id = vcv.id) "
f"where row_num = 1",
"trait_set": f"CREATE OR REPLACE TABLE `{dest_table_ref}` AS "
"trait_set": f"CREATE OR REPLACE TABLE `{dest_table_ref}` AS " # noqa: S608
f"SELECT * EXCEPT (rcv_id, row_num) from "
f"(SELECT tse.*, ROW_NUMBER() OVER (PARTITION BY tse.id "
f"ORDER BY vcv.date_last_updated DESC, vcv.id DESC) row_num "
Expand All @@ -208,7 +209,7 @@ def get_query_for_copy(
f"ON rcv.variation_archive_id = vcv.id) "
f"where row_num = 1",
}
default_query = f"CREATE OR REPLACE TABLE `{dest_table_ref}` AS SELECT * from `{source_table_ref}`"
default_query = f"CREATE OR REPLACE TABLE `{dest_table_ref}` AS SELECT * from `{source_table_ref}`" # noqa: S608
query = dedupe_queries.get(dest_table_ref.table_id, default_query)
return query, query == default_query

Expand Down Expand Up @@ -244,8 +245,7 @@ def ctas_copy(
query, _ = get_query_for_copy(source_table_ref, dest_table_ref)
_logger.info(f"Creating table {dest_table_ref} from {source_table_ref}")
_logger.info(f"Query:\n{query}")
query_job = bq_client.query(query)
return query_job
return bq_client.query(query)

bq_client = bigquery.Client()
# Copy each
Expand Down
Loading
Loading