From 0388e2daaf67161dbd016c818053857408b18442 Mon Sep 17 00:00:00 2001 From: Andrea De Rinaldis Date: Fri, 17 May 2024 10:25:46 +0200 Subject: [PATCH] [NOD-918] feat: add retry handling on error --- .../FdrReEventToDataStore.java | 51 ++++++++++++------- .../exception/AppException.java | 8 +++ 2 files changed, 41 insertions(+), 18 deletions(-) create mode 100644 src/main/java/it/gov/pagopa/fdrretodatastore/exception/AppException.java diff --git a/src/main/java/it/gov/pagopa/fdrretodatastore/FdrReEventToDataStore.java b/src/main/java/it/gov/pagopa/fdrretodatastore/FdrReEventToDataStore.java index b86842c..5ffca17 100644 --- a/src/main/java/it/gov/pagopa/fdrretodatastore/FdrReEventToDataStore.java +++ b/src/main/java/it/gov/pagopa/fdrretodatastore/FdrReEventToDataStore.java @@ -8,14 +8,12 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.microsoft.azure.functions.ExecutionContext; -import com.microsoft.azure.functions.annotation.BindingName; -import com.microsoft.azure.functions.annotation.Cardinality; -import com.microsoft.azure.functions.annotation.EventHubTrigger; -import com.microsoft.azure.functions.annotation.FunctionName; +import com.microsoft.azure.functions.annotation.*; import com.mongodb.MongoClient; import com.mongodb.MongoClientURI; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; +import it.gov.pagopa.fdrretodatastore.exception.AppException; import it.gov.pagopa.fdrretodatastore.util.ObjectMapperUtils; import org.bson.Document; @@ -24,9 +22,11 @@ import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; +import java.util.Arrays; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -39,6 +39,8 @@ public class FdrReEventToDataStore { * This function will be invoked when an Event Hub trigger occurs */ + private static final Integer MAX_RETRY_COUNT = 5; + private Pattern replaceDashPattern = Pattern.compile("-([a-zA-Z])"); private static String idField = "uniqueId"; private static String tableName = System.getenv("TABLE_STORAGE_TABLE_NAME"); @@ -51,8 +53,6 @@ public class FdrReEventToDataStore { private static TableServiceClient tableServiceClient = null; - private ObjectMapper om = new ObjectMapper(); - private static MongoClient getMongoClient(){ if(mongoClient==null){ mongoClient = new MongoClient(new MongoClientURI(System.getenv("COSMOS_CONN_STRING"))); @@ -97,7 +97,8 @@ private String replaceDashWithUppercase(String input) { } @FunctionName("EventHubFdrReEventProcessor") - public void processNodoReEvent ( + @ExponentialBackoffRetry(maxRetryCount = 5, maximumInterval = "00:15:00", minimumInterval = "00:00:10") + public void processNodoReEvent ( @EventHubTrigger( name = "FdrReEvent", eventHubName = "", // blank because the value is included in the connection string @@ -107,28 +108,31 @@ public void processNodoReEvent ( @BindingName(value = "PropertiesArray") Map[] properties, final ExecutionContext context) { + String errorCause = null; + boolean isPersistenceOk = true; + int retryIndex = context.getRetryContext() == null ? -1 : context.getRetryContext().getRetrycount(); + Logger logger = context.getLogger(); + logger.log(Level.FINE, () -> String.format("Persisting [%d] events...", reEvents.size())); + if (retryIndex == MAX_RETRY_COUNT) { + logger.log(Level.WARNING, () -> String.format("[ALERT][LAST RETRY][FdrREToDS] Performing last retry for event ingestion: InvocationId [%s], Events: %s", context.getInvocationId(), reEvents)); + } MongoDatabase database = getMongoClient().getDatabase(System.getenv("COSMOS_DB_NAME")); MongoCollection collection = database.getCollection(System.getenv("COSMOS_DB_COLLECTION_NAME")); - TableClient tableClient = getTableServiceClient().getTableClient(tableName); - String msg = String.format("Persisting %d events",reEvents.size()); - logger.info(msg); - try { if (reEvents.size() == properties.length) { - for(int index=0;index< properties.length;index++){ - logger.info("processing "+(index+1)+" of "+properties.length); + for (int index=0; index< properties.length; index++){ final Map reEvent = ObjectMapperUtils.readValue(reEvents.get(index), Map.class); Object servId = reEvent.get(serviceIdentifier); String partitionKey = null; - if(servId.equals(serviceIDFdr001)){ + if (servId.equals(serviceIDFdr001)){ String utcCreated = LocalDateTime.parse(reEvent.get(columnCreated).toString()).atZone(ZoneId.of("Europe/Rome")).toInstant().toString(); partitionKey = utcCreated.substring(0,10); reEvent.put(columnCreated,utcCreated); - }else{ + } else { partitionKey = reEvent.get(columnCreated).toString().substring(0,10); } reEvent.put(partitionKeyColumnCreated,partitionKey); @@ -137,19 +141,30 @@ public void processNodoReEvent ( reEvent.put(s,v); }); reEvent.put("timestamp",ZonedDateTime.now().toInstant().toEpochMilli()); + + logger.log(Level.INFO, () -> String.format("Performing event ingestion: InvocationId [%s], Retry Attempt [%d], Events: %s", context.getInvocationId(), retryIndex, reEvents)); + toTableStorage(logger,tableClient,new LinkedHashMap<>(reEvent)); collection.insertOne(new Document(reEvent)); } logger.info("Done processing events"); } else { - logger.severe("Error processing events, lengths do not match ["+reEvents.size()+","+properties.length+"]"); + isPersistenceOk = false; + errorCause = String.format("[ALERT][FdrREToDS] AppException - Error processing events, lengths do not match: [events: %d - properties: %d]", reEvents.size(), properties.length); } } catch (NullPointerException e) { - logger.severe("NullPointerException exception on cosmos fdr-re-events msg ingestion at "+ LocalDateTime.now()+ " : " + e.getMessage()); + isPersistenceOk = false; + errorCause = "[ALERT][FdrREToDS] AppException - Null pointer exception on fdr-re-events msg ingestion at " + LocalDateTime.now() + " : " + e; } catch (Exception e) { - logger.severe("Generic exception on cosmos fdr-re-events msg ingestion at "+ LocalDateTime.now()+ " : " + e.getMessage()); + isPersistenceOk = false; + errorCause = "[ALERT][FdrREToDS] AppException - Generic exception on fdr-re-events msg ingestion at " + LocalDateTime.now() + " : " + e.getMessage(); } + if (!isPersistenceOk) { + String finalErrorCause = errorCause; + logger.log(Level.SEVERE, () -> finalErrorCause); + throw new AppException(errorCause); + } } } diff --git a/src/main/java/it/gov/pagopa/fdrretodatastore/exception/AppException.java b/src/main/java/it/gov/pagopa/fdrretodatastore/exception/AppException.java new file mode 100644 index 0000000..ff3f33b --- /dev/null +++ b/src/main/java/it/gov/pagopa/fdrretodatastore/exception/AppException.java @@ -0,0 +1,8 @@ +package it.gov.pagopa.fdrretodatastore.exception; + +public class AppException extends RuntimeException { + + public AppException(String message) { + super(message); + } +}