Skip to content

Commit

Permalink
move failure reason deserialization into own method
Browse files Browse the repository at this point in the history
  • Loading branch information
alovew committed Sep 14, 2022
1 parent 77c558a commit e0b1d6a
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator;
import io.airbyte.commons.enums.Enums;
Expand Down Expand Up @@ -409,22 +408,19 @@ private static RecordMapper<Record, SyncStats> getSyncStatsRecordMapper() {
}

private static RecordMapper<Record, NormalizationSummary> getNormalizationSummaryRecordMapper(final Long attemptId) {
return record -> new NormalizationSummary().withStartTime(record.get(NORMALIZATION_SUMMARIES.START_TIME).toInstant().toEpochMilli())
.withEndTime(record.get(NORMALIZATION_SUMMARIES.END_TIME).toInstant().toEpochMilli())
.withFailures(record.get(NORMALIZATION_SUMMARIES.FAILURES, String.class) == null ? null : deserializeFailureReasons(record, attemptId));
}

private static List<FailureReason> deserializeFailureReasons(final Record record, final Long attemptId) {
final ObjectMapper mapper = new ObjectMapper();
return record -> {
try {
return new NormalizationSummary().withStartTime(record.get(NORMALIZATION_SUMMARIES.START_TIME).toInstant().toEpochMilli())
.withEndTime(record.get(NORMALIZATION_SUMMARIES.END_TIME).toInstant().toEpochMilli())
.withFailures(record.get(NORMALIZATION_SUMMARIES.FAILURES, String.class) == null ? null
: Lists.newArrayList(mapper.readValue(String.valueOf(record.get(NORMALIZATION_SUMMARIES.FAILURES)), FailureReason[].class)));
} catch (final JsonProcessingException e) {
// There was an error deserializing the FailureReasons into a list of FailureReason Objects,
// so we are returning null Failures
LOGGER.error("There was an error deserializing NormalizationSummary Failures for attempt id " + attemptId);
return new NormalizationSummary().withStartTime(record.get(NORMALIZATION_SUMMARIES.START_TIME).toInstant().toEpochMilli())
.withEndTime(record.get(NORMALIZATION_SUMMARIES.END_TIME).toInstant().toEpochMilli())
.withFailures(null);
}
};
try {
return List.of(mapper.readValue(String.valueOf(record.get(NORMALIZATION_SUMMARIES.FAILURES)), FailureReason[].class));
} catch (final JsonProcessingException e) {
LOGGER.error("There was an error deserializing NormalizationSummary Failures for attempt id " + attemptId);
return List.of(null);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,7 @@ class GetJobCount {
@Test
@DisplayName("Should return the total job count for the connection")
void testGetJobCount() throws IOException {
int numJobsToCreate = 10;
final int numJobsToCreate = 10;
for (int i = 0; i < numJobsToCreate; i++) {
jobPersistence.enqueueJob(CONNECTION_ID.toString(), SPEC_JOB_CONFIG);
}
Expand Down

0 comments on commit e0b1d6a

Please sign in to comment.