Skip to content

Commit

Permalink
comment to explain error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
alovew committed Sep 13, 2022
1 parent 0ac7975 commit 6ce742a
Showing 1 changed file with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,8 @@ public List<SyncStats> getSyncStats(final Long attemptId) throws IOException {
@Override
public List<NormalizationSummary> getNormalizationSummary(final Long attemptId) throws IOException {
return jobDatabase
.query(ctx -> ctx.select(DSL.asterisk()).from(DSL.table("normalization_summaries")).where(NORMALIZATION_SUMMARIES.ATTEMPT_ID.eq(attemptId))
.fetch(getNormalizationSummaryRecordMapper())
.query(ctx -> ctx.select(DSL.asterisk()).from(NORMALIZATION_SUMMARIES).where(NORMALIZATION_SUMMARIES.ATTEMPT_ID.eq(attemptId))
.fetch(getNormalizationSummaryRecordMapper(attemptId))
.stream()
.toList());
}
Expand All @@ -408,7 +408,7 @@ private static RecordMapper<Record, SyncStats> getSyncStatsRecordMapper() {
.withMaxSecondsBetweenStateMessageEmittedandCommitted(record.get(SYNC_STATS.MAX_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED));
}

private static RecordMapper<Record, NormalizationSummary> getNormalizationSummaryRecordMapper() {
private static RecordMapper<Record, NormalizationSummary> getNormalizationSummaryRecordMapper(final Long attemptId) {
final ObjectMapper mapper = new ObjectMapper();
return record -> {
try {
Expand All @@ -417,6 +417,9 @@ private static RecordMapper<Record, NormalizationSummary> getNormalizationSummar
.withFailures(record.get(NORMALIZATION_SUMMARIES.FAILURES, String.class) == null ? null
: Lists.newArrayList(mapper.readValue(String.valueOf(record.get(NORMALIZATION_SUMMARIES.FAILURES)), FailureReason[].class)));
} catch (final JsonProcessingException e) {
// There was an error deserializing the FailureReasons into a list of FailureReason Objects,
// so we are returning null Failures
LOGGER.error("There was an error deserializing NormalizationSummary Failures for attempt id " + attemptId);
return new NormalizationSummary().withStartTime(record.get(NORMALIZATION_SUMMARIES.START_TIME).toInstant().toEpochMilli())
.withEndTime(record.get(NORMALIZATION_SUMMARIES.END_TIME).toInstant().toEpochMilli())
.withFailures(null);
Expand Down

0 comments on commit 6ce742a

Please sign in to comment.