From e0b1d6ad68ec02ead0fcd5bb3362667f645b4797 Mon Sep 17 00:00:00 2001 From: alovew Date: Tue, 13 Sep 2022 12:14:23 -0700 Subject: [PATCH] move failure reason deserialization into own method --- .../persistence/DefaultJobPersistence.java | 28 ++++++++----------- .../DefaultJobPersistenceTest.java | 2 +- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java index e69a37acf23c..5f27af00ade1 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java @@ -16,7 +16,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.collect.UnmodifiableIterator; import io.airbyte.commons.enums.Enums; @@ -409,22 +408,19 @@ private static RecordMapper getSyncStatsRecordMapper() { } private static RecordMapper getNormalizationSummaryRecordMapper(final Long attemptId) { + return record -> new NormalizationSummary().withStartTime(record.get(NORMALIZATION_SUMMARIES.START_TIME).toInstant().toEpochMilli()) + .withEndTime(record.get(NORMALIZATION_SUMMARIES.END_TIME).toInstant().toEpochMilli()) + .withFailures(record.get(NORMALIZATION_SUMMARIES.FAILURES, String.class) == null ? null : deserializeFailureReasons(record, attemptId)); + } + + private static List deserializeFailureReasons(final Record record, final Long attemptId) { final ObjectMapper mapper = new ObjectMapper(); - return record -> { - try { - return new NormalizationSummary().withStartTime(record.get(NORMALIZATION_SUMMARIES.START_TIME).toInstant().toEpochMilli()) - .withEndTime(record.get(NORMALIZATION_SUMMARIES.END_TIME).toInstant().toEpochMilli()) - .withFailures(record.get(NORMALIZATION_SUMMARIES.FAILURES, String.class) == null ? null - : Lists.newArrayList(mapper.readValue(String.valueOf(record.get(NORMALIZATION_SUMMARIES.FAILURES)), FailureReason[].class))); - } catch (final JsonProcessingException e) { - // There was an error deserializing the FailureReasons into a list of FailureReason Objects, - // so we are returning null Failures - LOGGER.error("There was an error deserializing NormalizationSummary Failures for attempt id " + attemptId); - return new NormalizationSummary().withStartTime(record.get(NORMALIZATION_SUMMARIES.START_TIME).toInstant().toEpochMilli()) - .withEndTime(record.get(NORMALIZATION_SUMMARIES.END_TIME).toInstant().toEpochMilli()) - .withFailures(null); - } - }; + try { + return List.of(mapper.readValue(String.valueOf(record.get(NORMALIZATION_SUMMARIES.FAILURES)), FailureReason[].class)); + } catch (final JsonProcessingException e) { + LOGGER.error("There was an error deserializing NormalizationSummary Failures for attempt id " + attemptId); + return List.of(null); + } } @Override diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java index a881f0cd1b5a..dbf60c5af6cc 100644 --- a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java +++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java @@ -1050,7 +1050,7 @@ class GetJobCount { @Test @DisplayName("Should return the total job count for the connection") void testGetJobCount() throws IOException { - int numJobsToCreate = 10; + final int numJobsToCreate = 10; for (int i = 0; i < numJobsToCreate; i++) { jobPersistence.enqueueJob(CONNECTION_ID.toString(), SPEC_JOB_CONFIG); }