Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: configure ruff and mypy #128

Merged
merged 1 commit into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
repos:
- repo: https://github.com/charliermarsh/ruff-pre-commit
# Ruff version.
rev: "v0.2.0"
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
- repo: local
hooks:
- id: isort
name: isort
language: system
entry: isort
types: [python]
- id: black
name: black
language: system
entry: black
types: [python]
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: 'v0.3.2'
hooks:
- id: ruff
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
hooks:
Expand Down
56 changes: 37 additions & 19 deletions acringest.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
"""Extract data from loaded ACRCloud records."""

from __future__ import annotations

import json
import logging
import signal
import sys
from io import BytesIO
from typing import TYPE_CHECKING, Any, NoReturn

import urllib3
from cloudevents.kafka import from_structured
from cloudevents.kafka.conversion import KafkaMessage
from configargparse import ArgumentParser
from jsondiff import diff
from kafka import KafkaConsumer
from configargparse import ArgumentParser # type: ignore[import-untyped]
from jsondiff import diff # type: ignore[import-untyped]
from kafka import KafkaConsumer # type: ignore[import-untyped]
from minio import Minio
from minio.error import S3Error

if TYPE_CHECKING: # pragma: no cover
from cloudevents.http import CloudEvent

logger = logging.getLogger(__name__)

_ACR_INVALID_KEYS = [
Expand Down Expand Up @@ -71,7 +79,8 @@
"rights_claim",
"release_by_territories",
"langs",
] + _ACR_INVALID_KEYS
*_ACR_INVALID_KEYS,
]
"""
These keys are part of a music entry but not relevant for our purposes.
"""
Expand All @@ -93,13 +102,15 @@
"bpm",
"exids",
"works",
] + _ACR_IGNORED_KEYS
*_ACR_IGNORED_KEYS,
]
"""
All keys not in this list are a surprise and warrant further investigation.
"""


def put_data(mc: Minio, bucket: str, acrid: str, data: str):
def put_data(mc: Minio, bucket: str, acrid: str, data: str) -> None:
"""Put data to MinIO."""
_as_bytes = data.encode("utf-8")
mc.put_object(
bucket,
Expand All @@ -110,7 +121,7 @@ def put_data(mc: Minio, bucket: str, acrid: str, data: str):
)


def app( # noqa: PLR0912,PLR0913
def app( # noqa: PLR0912,PLR0913,C901
bootstrap_servers: list[str],
security_protocol: str,
tls_cafile: str,
Expand All @@ -124,10 +135,11 @@ def app( # noqa: PLR0912,PLR0913
minio_secret_key: str,
minio_bucket_raw: str,
minio_bucket_music: str,
minio_secure: bool,
minio_secure: bool, # noqa: FBT001
minio_cert_reqs: str,
minio_ca_certs: str,
):
) -> None:
"""Consume MinIO CloudEvents and create deduped and historized objects."""
consumer = KafkaConsumer(
consumer_topic,
bootstrap_servers=bootstrap_servers,
Expand All @@ -139,7 +151,7 @@ def app( # noqa: PLR0912,PLR0913
ssl_keyfile=tls_keyfile,
)

def on_sigint(*_): # pragma: no cover
def on_sigint(*_: Any) -> NoReturn: # noqa: ANN401 # pragma: no cover
consumer.close()
sys.exit(0)

Expand All @@ -151,20 +163,21 @@ def on_sigint(*_): # pragma: no cover
minio_secret_key,
secure=minio_secure,
http_client=urllib3.PoolManager(
cert_reqs=minio_cert_reqs, ca_certs=minio_ca_certs
cert_reqs=minio_cert_reqs,
ca_certs=minio_ca_certs,
),
)
for bucket in [minio_bucket_raw, minio_bucket_music]:
if not mc.bucket_exists(bucket): # pragma: no cover
mc.make_bucket(bucket)

for msg in consumer:
ce = from_structured(
ce: CloudEvent = from_structured(
message=KafkaMessage(
key=msg.key,
value=msg.value,
headers=msg.headers if msg.headers else {},
)
),
)
if (
ce["source"] == "minio:s3..acrcloud.raw"
Expand All @@ -178,9 +191,9 @@ def on_sigint(*_): # pragma: no cover
acrid = music.get("acrid")
for key in list(music.keys()):
if key not in _ACR_EXPECTED_KEYS: # pragma: no cover
logger.error(f"Unexpected key {key} in acr results")
logger.error("Unexpected key %s in acr results", key)
for to_del in _ACR_IGNORED_KEYS:
if to_del in music.keys():
if to_del in music:
del music[to_del]
minio_data = None
try:
Expand All @@ -192,10 +205,11 @@ def on_sigint(*_): # pragma: no cover
changes = diff(minio_data, music)
if changes:
put_data(mc, minio_bucket_music, acrid, json.dumps(music))
logger.info(f"Applied changes to {acrid=}: {changes}")
logger.info("Applied changes to acrid=%s: %s", acrid, changes)


def main(): # pragma: no cover
def main() -> None: # pragma: no cover
"""Run main."""
parser = ArgumentParser(__name__)
parser.add(
"--kafka-bootstrap-servers",
Expand Down Expand Up @@ -286,14 +300,18 @@ def main(): # pragma: no cover
help="MinIO Secret Key",
)
parser.add(
"--quiet", "-q", default=False, action="store_true", env_var="ACRINGEST_QUIET"
"--quiet",
"-q",
default=False,
action="store_true",
env_var="ACRINGEST_QUIET",
)

options = parser.parse_args()

if not options.quiet:
logging.basicConfig(level=logging.INFO)
logger.info(f"Starting {__name__}...")
logger.info("Starting %s", __name__)

app(
bootstrap_servers=options.kafka_bootstrap_servers,
Expand Down
6 changes: 4 additions & 2 deletions docs/gen_ref_pages.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
with mkdocs_gen_files.open("reference/SUMMARY.md", "w") as nav_file:
nav_file.writelines(nav.build_literate_nav())

readme = Path("README.md").open("r")
with mkdocs_gen_files.open("index.md", "w") as index_file:
with Path("README.md").open("r") as readme, mkdocs_gen_files.open(
"index.md",
"w",
) as index_file:
index_file.writelines(readme.read())
Loading
Loading