diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java index 5f27af00ade1..4983d003cb16 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java @@ -388,10 +388,10 @@ public List getSyncStats(final Long attemptId) throws IOException { } @Override - public List getNormalizationSummary(final Long attemptId) throws IOException { + public List getNormalizationSummary(final Long attemptId) throws IOException, JsonProcessingException { return jobDatabase .query(ctx -> ctx.select(DSL.asterisk()).from(NORMALIZATION_SUMMARIES).where(NORMALIZATION_SUMMARIES.ATTEMPT_ID.eq(attemptId)) - .fetch(getNormalizationSummaryRecordMapper(attemptId)) + .fetch(getNormalizationSummaryRecordMapper()) .stream() .toList()); } @@ -407,20 +407,22 @@ private static RecordMapper getSyncStatsRecordMapper() { .withMaxSecondsBetweenStateMessageEmittedandCommitted(record.get(SYNC_STATS.MAX_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED)); } - private static RecordMapper getNormalizationSummaryRecordMapper(final Long attemptId) { - return record -> new NormalizationSummary().withStartTime(record.get(NORMALIZATION_SUMMARIES.START_TIME).toInstant().toEpochMilli()) - .withEndTime(record.get(NORMALIZATION_SUMMARIES.END_TIME).toInstant().toEpochMilli()) - .withFailures(record.get(NORMALIZATION_SUMMARIES.FAILURES, String.class) == null ? null : deserializeFailureReasons(record, attemptId)); + private static RecordMapper getNormalizationSummaryRecordMapper() { + return record -> { + try { + new NormalizationSummary().withStartTime(record.get(NORMALIZATION_SUMMARIES.START_TIME).toInstant().toEpochMilli()) + .withEndTime(record.get(NORMALIZATION_SUMMARIES.END_TIME).toInstant().toEpochMilli()) + .withFailures(record.get(NORMALIZATION_SUMMARIES.FAILURES, String.class) == null ? null : deserializeFailureReasons(record)); + } catch (final JsonProcessingException e) { + throw new RuntimeException(e); + } + return null; + }; } - private static List deserializeFailureReasons(final Record record, final Long attemptId) { + private static List deserializeFailureReasons(final Record record) throws JsonProcessingException { final ObjectMapper mapper = new ObjectMapper(); - try { - return List.of(mapper.readValue(String.valueOf(record.get(NORMALIZATION_SUMMARIES.FAILURES)), FailureReason[].class)); - } catch (final JsonProcessingException e) { - LOGGER.error("There was an error deserializing NormalizationSummary Failures for attempt id " + attemptId); - return List.of(null); - } + return List.of(mapper.readValue(String.valueOf(record.get(NORMALIZATION_SUMMARIES.FAILURES)), FailureReason[].class)); } @Override