From 6ce742a04ec9f17b194b41085d432f7c488702cf Mon Sep 17 00:00:00 2001 From: alovew Date: Tue, 13 Sep 2022 11:08:03 -0700 Subject: [PATCH] comment to explain error handling --- .../scheduler/persistence/DefaultJobPersistence.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java index cc1428455451..e69a37acf23c 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java @@ -391,8 +391,8 @@ public List getSyncStats(final Long attemptId) throws IOException { @Override public List getNormalizationSummary(final Long attemptId) throws IOException { return jobDatabase - .query(ctx -> ctx.select(DSL.asterisk()).from(DSL.table("normalization_summaries")).where(NORMALIZATION_SUMMARIES.ATTEMPT_ID.eq(attemptId)) - .fetch(getNormalizationSummaryRecordMapper()) + .query(ctx -> ctx.select(DSL.asterisk()).from(NORMALIZATION_SUMMARIES).where(NORMALIZATION_SUMMARIES.ATTEMPT_ID.eq(attemptId)) + .fetch(getNormalizationSummaryRecordMapper(attemptId)) .stream() .toList()); } @@ -408,7 +408,7 @@ private static RecordMapper getSyncStatsRecordMapper() { .withMaxSecondsBetweenStateMessageEmittedandCommitted(record.get(SYNC_STATS.MAX_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED)); } - private static RecordMapper getNormalizationSummaryRecordMapper() { + private static RecordMapper getNormalizationSummaryRecordMapper(final Long attemptId) { final ObjectMapper mapper = new ObjectMapper(); return record -> { try { @@ -417,6 +417,9 @@ private static RecordMapper getNormalizationSummar .withFailures(record.get(NORMALIZATION_SUMMARIES.FAILURES, String.class) == null ? null : Lists.newArrayList(mapper.readValue(String.valueOf(record.get(NORMALIZATION_SUMMARIES.FAILURES)), FailureReason[].class))); } catch (final JsonProcessingException e) { + // There was an error deserializing the FailureReasons into a list of FailureReason Objects, + // so we are returning null Failures + LOGGER.error("There was an error deserializing NormalizationSummary Failures for attempt id " + attemptId); return new NormalizationSummary().withStartTime(record.get(NORMALIZATION_SUMMARIES.START_TIME).toInstant().toEpochMilli()) .withEndTime(record.get(NORMALIZATION_SUMMARIES.END_TIME).toInstant().toEpochMilli()) .withFailures(null);