Skip to content

Commit

Permalink
fix: record exceptions in embedding.setup()
Browse files Browse the repository at this point in the history
Previously exceptions in self.vectorizer.config.embedding.setup()
were not recorded in the db.

This PR catches exceptions one stack higher to catch more of them.
  • Loading branch information
cevian committed Feb 12, 2025
1 parent 03a6a9a commit adc8d66
Showing 1 changed file with 67 additions and 69 deletions.
136 changes: 67 additions & 69 deletions projects/pgai/pgai/vectorizer/vectorizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,16 +485,49 @@ async def run(self) -> int:
async with await psycopg.AsyncConnection.connect(
self.db_url, autocommit=True
) as conn:
await register_vector_async(conn)
await self.vectorizer.config.embedding.setup()
while True:
if not await self._should_continue_processing(conn, loops, res):
return res
items_processed = await self._do_batch(conn)
if items_processed == 0:
return res
res += items_processed
loops += 1
try:
await register_vector_async(conn)
await self.vectorizer.config.embedding.setup()
while True:
if not await self._should_continue_processing(conn, loops, res):
return res
items_processed = await self._do_batch(conn)
if items_processed == 0:
return res
res += items_processed
loops += 1
except EmbeddingProviderError as e:
async with conn.transaction():
await self._insert_vectorizer_error(
conn,
(
self.vectorizer.id,
e.msg,
Jsonb(
{
"provider": self.vectorizer.config.embedding.implementation, # noqa
"error_reason": str(e.__cause__),
}
),
),
)
# This is to make the traceback not as verbose by removing
# the lines about our wrapper exception being casused by
# the actual exception.
if e.__cause__ is not None:
raise e.__cause__ # noqa
raise e
except Exception as e:
async with conn.transaction():
await self._insert_vectorizer_error(
conn,
(
self.vectorizer.id,
VECTORIZER_FAILED,
Jsonb({"error_reason": str(e)}),
),
)
raise e

async def _should_continue_processing(
self, conn: AsyncConnection, loops: int, res: int
Expand Down Expand Up @@ -540,68 +573,33 @@ async def _do_batch(self, conn: AsyncConnection) -> int:
int: The number of items processed in the batch.
"""
processing_stats = ProcessingStats()
try:
start_time = time.perf_counter()
async with conn.transaction():
items = await self._fetch_work(conn)

current_span = tracer.current_span()
if current_span:
current_span.set_tag("items_from_queue.pulled", len(items))
await logger.adebug(f"Items pulled from queue: {len(items)}")

# Filter out items that were deleted from the source table.
# We use the first primary key column, since they can only
# be null if the LEFT JOIN didn't find a match.
items = [
i
for i in items
if i[self.vectorizer.source_pk[0].attname] is not None
]
start_time = time.perf_counter()
async with conn.transaction():
items = await self._fetch_work(conn)

if len(items) == 0:
return 0
current_span = tracer.current_span()
if current_span:
current_span.set_tag("items_from_queue.pulled", len(items))
await logger.adebug(f"Items pulled from queue: {len(items)}")

num_chunks = await self._embed_and_write(conn, items)
# Filter out items that were deleted from the source table.
# We use the first primary key column, since they can only
# be null if the LEFT JOIN didn't find a match.
items = [
i for i in items if i[self.vectorizer.source_pk[0].attname] is not None
]

processing_stats.add_request_time(
time.perf_counter() - start_time, num_chunks
)
await processing_stats.print_stats()

return len(items)
except EmbeddingProviderError as e:
async with conn.transaction():
await self._insert_vectorizer_error(
conn,
(
self.vectorizer.id,
e.msg,
Jsonb(
{
"provider": self.vectorizer.config.embedding.implementation, # noqa
"error_reason": str(e.__cause__),
}
),
),
)
# This is to make the traceback not as verbose by removing
# the lines about our wrapper exception being casused by
# the actual exception.
if e.__cause__ is not None:
raise e.__cause__ # noqa
raise e
except Exception as e:
async with conn.transaction():
await self._insert_vectorizer_error(
conn,
(
self.vectorizer.id,
VECTORIZER_FAILED,
Jsonb({"error_reason": str(e)}),
),
)
raise e
if len(items) == 0:
return 0

num_chunks = await self._embed_and_write(conn, items)

processing_stats.add_request_time(
time.perf_counter() - start_time, num_chunks
)
await processing_stats.print_stats()

return len(items)

async def _fetch_work(self, conn: AsyncConnection) -> list[SourceRow]:
"""
Expand Down

0 comments on commit adc8d66

Please sign in to comment.