Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: massive update [PAGOPA-1588] #15

Merged
merged 8 commits into from
Mar 25, 2024
59 changes: 43 additions & 16 deletions src/main/java/it/gov/pagopa/gpd/upload/ServiceFunction.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.annotation.QueueTrigger;
import it.gov.pagopa.gpd.upload.client.GPDClient;
import it.gov.pagopa.gpd.upload.entity.PaymentPositionsMessage;
import it.gov.pagopa.gpd.upload.entity.UploadMessage;
import it.gov.pagopa.gpd.upload.entity.Status;
import it.gov.pagopa.gpd.upload.exception.AppException;
import it.gov.pagopa.gpd.upload.model.RequestGPD;
import it.gov.pagopa.gpd.upload.model.ResponseGPD;
import it.gov.pagopa.gpd.upload.model.RetryStep;
import it.gov.pagopa.gpd.upload.model.pd.PaymentPosition;
Expand All @@ -24,6 +25,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
Expand All @@ -47,8 +49,20 @@ public void run(
objectMapper.registerModule(new JavaTimeModule());

try {
PaymentPositionsMessage msg = objectMapper.readValue(message, PaymentPositionsMessage.class);
this.create(context, msg);
UploadMessage msg = objectMapper.readValue(message, UploadMessage.class);
GPDClient gpdClient = getGPDClient();
Function<RequestGPD, ResponseGPD> method = gpdClient::createDebtPosition;

switch(msg.operation) {
case CREATE:
method = gpdClient::createDebtPosition;
break;
case UPDATE:
method = gpdClient::updateDebtPosition;
}

this.operation(context, msg, method);

// check if upload is completed
Status status = getStatusService(context).getStatus(invocationId, msg.organizationFiscalCode, msg.uploadKey);
if(status.upload.getCurrent() == status.upload.getTotal()) {
Expand All @@ -62,31 +76,44 @@ public void run(
}
}

private void create(ExecutionContext ctx, PaymentPositionsMessage msg) throws AppException {
// constraint: paymentPositions size less than max bulk item per call -> respected by design (max queue message = 64KB)
GPDClient gpdClient = getGPDClient();
private void operation(ExecutionContext ctx, UploadMessage msg, Function<RequestGPD, ResponseGPD> method) throws AppException {
// constraint: paymentPositions size less than max bulk item per call -> compliant by design(max queue message = 64KB = ~30 PaymentPosition)
StatusService statusService = getStatusService(ctx);
ResponseGPD response = gpdClient.createBulkDebtPositions(msg.organizationFiscalCode, msg.paymentPositions, ctx.getLogger(), ctx.getInvocationId());

RequestGPD requestGPD = RequestGPD.builder()
.mode(RequestGPD.Mode.BULK)
.orgFiscalCode(msg.organizationFiscalCode)
.body(msg.paymentPositions)
.logger(ctx.getLogger())
.invocationId(ctx.getInvocationId())
.build();

ResponseGPD response = method.apply(requestGPD);
ctx.getLogger().log(Level.INFO, () -> String.format("[id=%s][ServiceFunction] Create %s payment positions calling GPD-Core", ctx.getInvocationId(), msg.paymentPositions.getPaymentPositions().size()));

if(response.getStatus() != HttpStatus.CREATED.value()) {
// if BULK creation wasn't successful, switch to single debt position creation
ctx.getLogger().log(Level.INFO, () -> String.format("[id=%s][ServiceFunction] Call GPD-Core one-by-one", ctx.getInvocationId()));

Map<String, ResponseGPD> responseByIUPD = new HashMap<>();
// if BULK creation wasn't successful, switch to single debt position creation
for(PaymentPosition paymentPosition : msg.paymentPositions.getPaymentPositions()) {
response = gpdClient.createDebtPosition(ctx.getInvocationId(), ctx.getLogger(), msg.organizationFiscalCode, paymentPosition);

for(PaymentPosition paymentPosition: msg.paymentPositions.getPaymentPositions()) {
requestGPD = RequestGPD.builder()
.mode(RequestGPD.Mode.SINGLE)
.orgFiscalCode(msg.organizationFiscalCode)
.body(paymentPosition)
.logger(ctx.getLogger())
.invocationId(ctx.getInvocationId())
.build();
response = method.apply(requestGPD);
responseByIUPD.put(paymentPosition.getIupd(), response);
}

// Selecting responses where retry == true
Map<String, ResponseGPD> retryResponses = responseByIUPD.entrySet().stream()
.filter(entry -> entry.getValue().getRetryStep().equals(RetryStep.RETRY))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
.filter(entry -> entry.getValue().getRetryStep().equals(RetryStep.RETRY))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

if (!retryResponses.isEmpty() && msg.retryCounter < MAX_RETRY) {
// Remove retryResponses from responseMap and enqueue retry responses
// Remove retry-responses from response-map and enqueue retry-responses
responseByIUPD.entrySet().removeAll(retryResponses.entrySet());
this.retry(ctx, msg, retryResponses);
}
Expand Down Expand Up @@ -117,7 +144,7 @@ public GPDClient getGPDClient() {
}


public void retry(ExecutionContext ctx, PaymentPositionsMessage msg, Map<String, ResponseGPD> retryResponses) {
public void retry(ExecutionContext ctx, UploadMessage msg, Map<String, ResponseGPD> retryResponses) {
List<PaymentPosition> retryPositions = msg.paymentPositions.getPaymentPositions().stream()
.filter(paymentPosition -> retryResponses.containsKey(paymentPosition.getIupd()))
.collect(Collectors.toList());
Expand Down
32 changes: 18 additions & 14 deletions src/main/java/it/gov/pagopa/gpd/upload/ValidationFunction.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.annotation.QueueTrigger;
import it.gov.pagopa.gpd.upload.entity.PaymentPositionsMessage;
import it.gov.pagopa.gpd.upload.entity.UploadMessage;
import it.gov.pagopa.gpd.upload.entity.Status;
import it.gov.pagopa.gpd.upload.exception.AppException;
import it.gov.pagopa.gpd.upload.model.UploadInput;
import it.gov.pagopa.gpd.upload.model.Operation;
import it.gov.pagopa.gpd.upload.model.pd.PaymentPosition;
import it.gov.pagopa.gpd.upload.model.pd.PaymentPositions;
import it.gov.pagopa.gpd.upload.repository.BlobRepository;
Expand Down Expand Up @@ -68,7 +70,7 @@ public void run(
logger.log(Level.INFO, () -> String.format("[id=%s][ValidationFunction] Blob event subject: %s", context.getInvocationId(), event.getSubject()));


Pattern pattern = Pattern.compile("/containers/(\\w+)/blobs/(\\w+)/input/(\\w+\\.json)");
Pattern pattern = Pattern.compile("/containers/(\\w+)/blobs/(\\w+)/input/([\\w\\-]+\\.json)");
Matcher matcher = pattern.matcher(event.getSubject());

// Check if the pattern is found
Expand Down Expand Up @@ -97,16 +99,17 @@ public boolean validateBlob(ExecutionContext ctx, String broker, String fiscalCo

try {
// deserialize payment positions from JSON to Object
PaymentPositions paymentPositions = objectMapper.readValue(content.toString(), PaymentPositions.class);
Status status = this.createStatus(ctx, broker, fiscalCode, uploadKey, paymentPositions.getPaymentPositions().size());
UploadInput uploadInput = objectMapper.readValue(content.toString(), UploadInput.class);
PaymentPositions pps = uploadInput.getPaymentPositions();
Status status = this.createStatus(ctx, broker, fiscalCode, uploadKey, pps.getPaymentPositions().size());
if (status.getUpload().getEnd() != null) { // already exist and upload is completed, so no-retry
return false;
}
// call payment position object validation logic
PaymentPositionValidator.validate(ctx, StatusService.getInstance(ctx.getLogger()), paymentPositions, fiscalCode, uploadKey);
PaymentPositionValidator.validate(ctx, StatusService.getInstance(ctx.getLogger()), pps, fiscalCode, uploadKey);

// enqueue payment positions message by chunk size
this.enqueue(ctx, paymentPositions.getPaymentPositions(), uploadKey, fiscalCode, broker);
this.enqueue(ctx, uploadInput.getOperation(), pps.getPaymentPositions(), uploadKey, fiscalCode, broker);

return true;
} catch (JsonMappingException e) {
Expand All @@ -129,20 +132,21 @@ public Status createStatus(ExecutionContext ctx, String broker, String fiscalCod
return status;
}

public boolean enqueue(ExecutionContext ctx, List<PaymentPosition> paymentPositions, String uploadKey, String fiscalCode, String broker) {
public boolean enqueue(ExecutionContext ctx, Operation operation, List<PaymentPosition> paymentPositions, String uploadKey, String fiscalCode, String broker) {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
for (int i = 0; i < paymentPositions.size(); i += CHUNK_SIZE) {
int endIndex = Math.min(i + CHUNK_SIZE, paymentPositions.size());
List<PaymentPosition> subList = paymentPositions.subList(i, endIndex);

PaymentPositionsMessage message = PaymentPositionsMessage.builder()
.uploadKey(uploadKey)
.organizationFiscalCode(fiscalCode)
.brokerCode(broker)
.retryCounter(0)
.paymentPositions(PaymentPositions.builder().paymentPositions(subList).build())
.build();
UploadMessage message = UploadMessage.builder()
.operation(operation)
.uploadKey(uploadKey)
.organizationFiscalCode(fiscalCode)
.brokerCode(broker)
.retryCounter(0)
.paymentPositions(PaymentPositions.builder().paymentPositions(subList).build())
.build();
objectMapper.disable(SerializationFeature.INDENT_OUTPUT); // remove useless whitespaces from message
try {
QueueService.enqueue(ctx.getInvocationId(), ctx.getLogger(), objectMapper.writeValueAsString(message), 0);
Expand Down
Loading
Loading