Skip to content

Commit

Permalink
[NOD-918] feat: add retry handling on error
Browse files Browse the repository at this point in the history
  • Loading branch information
andrea-deri committed May 17, 2024
1 parent 08e9d24 commit 0388e2d
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.annotation.BindingName;
import com.microsoft.azure.functions.annotation.Cardinality;
import com.microsoft.azure.functions.annotation.EventHubTrigger;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.annotation.*;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import it.gov.pagopa.fdrretodatastore.exception.AppException;
import it.gov.pagopa.fdrretodatastore.util.ObjectMapperUtils;
import org.bson.Document;

Expand All @@ -24,9 +22,11 @@
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -39,6 +39,8 @@ public class FdrReEventToDataStore {
* This function will be invoked when an Event Hub trigger occurs
*/

private static final Integer MAX_RETRY_COUNT = 5;

private Pattern replaceDashPattern = Pattern.compile("-([a-zA-Z])");
private static String idField = "uniqueId";
private static String tableName = System.getenv("TABLE_STORAGE_TABLE_NAME");
Expand All @@ -51,8 +53,6 @@ public class FdrReEventToDataStore {

private static TableServiceClient tableServiceClient = null;

private ObjectMapper om = new ObjectMapper();

private static MongoClient getMongoClient(){
if(mongoClient==null){
mongoClient = new MongoClient(new MongoClientURI(System.getenv("COSMOS_CONN_STRING")));
Expand Down Expand Up @@ -97,7 +97,8 @@ private String replaceDashWithUppercase(String input) {
}

@FunctionName("EventHubFdrReEventProcessor")
public void processNodoReEvent (
@ExponentialBackoffRetry(maxRetryCount = 5, maximumInterval = "00:15:00", minimumInterval = "00:00:10")
public void processNodoReEvent (
@EventHubTrigger(
name = "FdrReEvent",
eventHubName = "", // blank because the value is included in the connection string
Expand All @@ -107,28 +108,31 @@ public void processNodoReEvent (
@BindingName(value = "PropertiesArray") Map<String, Object>[] properties,
final ExecutionContext context) {

String errorCause = null;
boolean isPersistenceOk = true;
int retryIndex = context.getRetryContext() == null ? -1 : context.getRetryContext().getRetrycount();

Logger logger = context.getLogger();
logger.log(Level.FINE, () -> String.format("Persisting [%d] events...", reEvents.size()));
if (retryIndex == MAX_RETRY_COUNT) {
logger.log(Level.WARNING, () -> String.format("[ALERT][LAST RETRY][FdrREToDS] Performing last retry for event ingestion: InvocationId [%s], Events: %s", context.getInvocationId(), reEvents));
}

MongoDatabase database = getMongoClient().getDatabase(System.getenv("COSMOS_DB_NAME"));
MongoCollection<Document> collection = database.getCollection(System.getenv("COSMOS_DB_COLLECTION_NAME"));

TableClient tableClient = getTableServiceClient().getTableClient(tableName);

String msg = String.format("Persisting %d events",reEvents.size());
logger.info(msg);

try {
if (reEvents.size() == properties.length) {
for(int index=0;index< properties.length;index++){
logger.info("processing "+(index+1)+" of "+properties.length);
for (int index=0; index< properties.length; index++){
final Map<String,Object> reEvent = ObjectMapperUtils.readValue(reEvents.get(index), Map.class);
Object servId = reEvent.get(serviceIdentifier);
String partitionKey = null;
if(servId.equals(serviceIDFdr001)){
if (servId.equals(serviceIDFdr001)){
String utcCreated = LocalDateTime.parse(reEvent.get(columnCreated).toString()).atZone(ZoneId.of("Europe/Rome")).toInstant().toString();
partitionKey = utcCreated.substring(0,10);
reEvent.put(columnCreated,utcCreated);
}else{
} else {
partitionKey = reEvent.get(columnCreated).toString().substring(0,10);
}
reEvent.put(partitionKeyColumnCreated,partitionKey);
Expand All @@ -137,19 +141,30 @@ public void processNodoReEvent (
reEvent.put(s,v);
});
reEvent.put("timestamp",ZonedDateTime.now().toInstant().toEpochMilli());

logger.log(Level.INFO, () -> String.format("Performing event ingestion: InvocationId [%s], Retry Attempt [%d], Events: %s", context.getInvocationId(), retryIndex, reEvents));

toTableStorage(logger,tableClient,new LinkedHashMap<>(reEvent));
collection.insertOne(new Document(reEvent));

}
logger.info("Done processing events");
} else {
logger.severe("Error processing events, lengths do not match ["+reEvents.size()+","+properties.length+"]");
isPersistenceOk = false;
errorCause = String.format("[ALERT][FdrREToDS] AppException - Error processing events, lengths do not match: [events: %d - properties: %d]", reEvents.size(), properties.length);
}
} catch (NullPointerException e) {
logger.severe("NullPointerException exception on cosmos fdr-re-events msg ingestion at "+ LocalDateTime.now()+ " : " + e.getMessage());
isPersistenceOk = false;
errorCause = "[ALERT][FdrREToDS] AppException - Null pointer exception on fdr-re-events msg ingestion at " + LocalDateTime.now() + " : " + e;
} catch (Exception e) {
logger.severe("Generic exception on cosmos fdr-re-events msg ingestion at "+ LocalDateTime.now()+ " : " + e.getMessage());
isPersistenceOk = false;
errorCause = "[ALERT][FdrREToDS] AppException - Generic exception on fdr-re-events msg ingestion at " + LocalDateTime.now() + " : " + e.getMessage();
}

if (!isPersistenceOk) {
String finalErrorCause = errorCause;
logger.log(Level.SEVERE, () -> finalErrorCause);
throw new AppException(errorCause);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package it.gov.pagopa.fdrretodatastore.exception;

public class AppException extends RuntimeException {

public AppException(String message) {
super(message);
}
}

0 comments on commit 0388e2d

Please sign in to comment.