Skip to content

Commit

Permalink
raise error
Browse files Browse the repository at this point in the history
  • Loading branch information
alovew committed Sep 14, 2022
1 parent e0b1d6a commit bd7fab1
Showing 1 changed file with 15 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -388,10 +388,10 @@ public List<SyncStats> getSyncStats(final Long attemptId) throws IOException {
}

@Override
public List<NormalizationSummary> getNormalizationSummary(final Long attemptId) throws IOException {
public List<NormalizationSummary> getNormalizationSummary(final Long attemptId) throws IOException, JsonProcessingException {
return jobDatabase
.query(ctx -> ctx.select(DSL.asterisk()).from(NORMALIZATION_SUMMARIES).where(NORMALIZATION_SUMMARIES.ATTEMPT_ID.eq(attemptId))
.fetch(getNormalizationSummaryRecordMapper(attemptId))
.fetch(getNormalizationSummaryRecordMapper())
.stream()
.toList());
}
Expand All @@ -407,20 +407,22 @@ private static RecordMapper<Record, SyncStats> getSyncStatsRecordMapper() {
.withMaxSecondsBetweenStateMessageEmittedandCommitted(record.get(SYNC_STATS.MAX_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED));
}

private static RecordMapper<Record, NormalizationSummary> getNormalizationSummaryRecordMapper(final Long attemptId) {
return record -> new NormalizationSummary().withStartTime(record.get(NORMALIZATION_SUMMARIES.START_TIME).toInstant().toEpochMilli())
.withEndTime(record.get(NORMALIZATION_SUMMARIES.END_TIME).toInstant().toEpochMilli())
.withFailures(record.get(NORMALIZATION_SUMMARIES.FAILURES, String.class) == null ? null : deserializeFailureReasons(record, attemptId));
private static RecordMapper<Record, NormalizationSummary> getNormalizationSummaryRecordMapper() {
return record -> {
try {
new NormalizationSummary().withStartTime(record.get(NORMALIZATION_SUMMARIES.START_TIME).toInstant().toEpochMilli())
.withEndTime(record.get(NORMALIZATION_SUMMARIES.END_TIME).toInstant().toEpochMilli())
.withFailures(record.get(NORMALIZATION_SUMMARIES.FAILURES, String.class) == null ? null : deserializeFailureReasons(record));
} catch (final JsonProcessingException e) {
throw new RuntimeException(e);
}
return null;
};
}

private static List<FailureReason> deserializeFailureReasons(final Record record, final Long attemptId) {
private static List<FailureReason> deserializeFailureReasons(final Record record) throws JsonProcessingException {
final ObjectMapper mapper = new ObjectMapper();
try {
return List.of(mapper.readValue(String.valueOf(record.get(NORMALIZATION_SUMMARIES.FAILURES)), FailureReason[].class));
} catch (final JsonProcessingException e) {
LOGGER.error("There was an error deserializing NormalizationSummary Failures for attempt id " + attemptId);
return List.of(null);
}
return List.of(mapper.readValue(String.valueOf(record.get(NORMALIZATION_SUMMARIES.FAILURES)), FailureReason[].class));
}

@Override
Expand Down

0 comments on commit bd7fab1

Please sign in to comment.