Skip to content

Commit

Permalink
Log errors if batch raises exception, refs #7
Browse files Browse the repository at this point in the history
  • Loading branch information
simonw committed Nov 29, 2023
1 parent 8add778 commit 494c08d
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 17 deletions.
42 changes: 25 additions & 17 deletions datasette_enrichments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
import secrets
from datasette.plugins import pm
from .views import enrichment_picker, enrichment_view
from .utils import get_with_auth, mark_job_complete
from .utils import get_with_auth, mark_job_complete, pks_for_rows
from . import hookspecs

from datasette.utils import await_me_maybe

from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Tuple, Union, List

if TYPE_CHECKING:
from datasette.app import Datasette
Expand All @@ -20,6 +20,9 @@
pm.add_hookspecs(hookspecs)


IdType = Union[int, str, Tuple[Union[int, str], ...]]


async def get_enrichments(datasette):
enrichments = []
for result in pm.hook.register_enrichments(datasette=datasette):
Expand Down Expand Up @@ -54,7 +57,7 @@ async def get_enrichments(datasette):
id integer primary key,
job_id integer references _enrichment_jobs(id),
created_at text,
row_pk text, -- JSON encoded, can be integer or string or ["compound", "pk"]
row_pks text, -- JSON list of row primary keys
error text
)
""".strip()
Expand Down Expand Up @@ -82,22 +85,24 @@ def name(self):
def __repr__(self):
return "<Enrichment: {}>".format(self.slug)

async def log_error(self, db: "Database", job_id: int, ids: Any, error: str):
async def log_error(
self, db: "Database", job_id: int, ids: List[IdType], error: str
):
# Record error and increment error_count
await db.execute_write(
"""
insert into _enrichment_errors (job_id, row_pk, error)
insert into _enrichment_errors (job_id, row_pks, error)
values (?, ?, ?)
""",
(job_id, json.dumps(ids), error),
)
await db.execute_write(
"""
update _enrichment_jobs
set error_count = error_count + 1
set error_count = error_count + ?
where id = ?
""",
(job_id,),
(len(ids), job_id),
)

async def get_config_form(self, datasette: "Datasette", db: "Database", table: str):
Expand Down Expand Up @@ -230,16 +235,19 @@ async def run_enrichment():
break
# Enrich batch
pks = await db.primary_keys(job["table_name"])
await async_call_with_supported_arguments(
self.enrich_batch,
datasette=datasette,
db=db,
table=job["table_name"],
rows=rows,
pks=pks or ["rowid"],
config=json.loads(job["config"]),
job_id=job_id,
)
try:
await async_call_with_supported_arguments(
self.enrich_batch,
datasette=datasette,
db=db,
table=job["table_name"],
rows=rows,
pks=pks or ["rowid"],
config=json.loads(job["config"]),
job_id=job_id,
)
except Exception as ex:
await self.log_error(db, job_id, pks_for_rows(rows, pks), str(ex))
# Update next_cursor
next_cursor = response.json()["next"]
if next_cursor:
Expand Down
11 changes: 11 additions & 0 deletions datasette_enrichments/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,14 @@ def _ensure_enrichment_properties(datasette: "Datasette"):
datasette._enrichment_completed_jobs = set()
if not hasattr(datasette, "_enrichment_completed_events"):
datasette._enrichment_completed_events = {}


def pks_for_rows(rows, pks):
if not pks:
pks = ["rowid"]
is_single = len(pks) == 1
if is_single:
pk = pks[0]
return [row[pk] for row in rows]
else:
return [tuple(row[pk] for pk in pks) for row in rows]

0 comments on commit 494c08d

Please sign in to comment.