Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(repository): Use cosmos patch operations and fix response detail #22

Merged
merged 19 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/code_review.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ jobs:
smoke-test:
name: Smoke Test
runs-on: ubuntu-latest
if: false # todo
cap-ang marked this conversation as resolved.
Show resolved Hide resolved
environment:
name: dev
steps:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/deploy_with_github_runner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ jobs:
container_app_environment_name: ${{ vars.CONTAINER_APP_ENVIRONMENT_NAME }}
resource_group_name: ${{ vars.CONTAINER_APP_ENVIRONMENT_RESOURCE_GROUP_NAME }} # RG of the runner
pat_token: ${{ secrets.BOT_TOKEN_GITHUB }}
self_hosted_runner_image_tag: "latest"

deploy:
needs: [ create_runner ]
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/integration_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ jobs:
name: Test ${{(github.event.inputs == null && 'dev') || inputs.environment }}
runs-on: ubuntu-latest
environment: ${{(github.event.inputs == null && 'dev') || inputs.environment }}
if: false # see https://github.com/pagopa/pagopa-gpd-upload/actions/workflows/integration_test.yml
steps:
- name: Checkout
id: checkout
Expand Down
32 changes: 12 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
# pagoPA Functions template
# GPD Upload Functions

Java template to create an Azure Function.
[![Code Review](https://github.com/pagopa/pagopa-gpd-upload-function/actions/workflows/code_review.yml/badge.svg)](https://github.com/pagopa/pagopa-gpd-upload-function/actions/workflows/code_review.yml)
[![Integration Tests](https://github.com/pagopa/pagopa-gpd-upload/actions/workflows/integration_test.yml/badge.svg)](https://github.com/pagopa/pagopa-gpd-upload/actions/workflows/integration_test.yml)

## Function examples
There is an example of a Http Trigger function.
A set of Azure functions related to debt positions massive upload.

## Functions

| Function | Trigger | Description |
|---------------------|--------------|-------------------------------------------|
| ValidationFunction | QueueTrigger | Validate Blob on CreatedEvent |
| ServiceFunction | QueueTrigger | Perform CRUD operations on debt-positions |

---

Expand Down Expand Up @@ -35,19 +42,4 @@ mvn -f pom.xml clean package -Dmaven.test.skip=true && mvn -e azure-functions:ru
### Test
`curl http://localhost:7071/example`

---


## TODO
Once cloned the repo, you should:
- to deploy on standard Azure service:
- rename `deploy-pipelines-standard.yml` to `deploy-pipelines.yml`
- remove `helm` folder
- to deploy on Kubernetes:
- rename `deploy-pipelines-aks.yml` to `deploy-pipelines.yml`
- customize `helm` configuration
- configure the following GitHub action in `.github` folder:
- `deploy.yml`
- `sonar_analysis.yml`

Configure the SonarCloud project :point_right: [guide](https://pagopa.atlassian.net/wiki/spaces/DEVOPS/pages/147193860/SonarCloud+experimental).
---
4 changes: 2 additions & 2 deletions helm/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ apiVersion: v2
name: pagopa-gpd-upload-function
description: Microservice description
type: application
version: 0.92.0
appVersion: 0.0.16
version: 0.93.0
appVersion: 0.0.15-2-PAGOPA-1787-cosmos-operation-refactoring
dependencies:
- name: microservice-chart
version: 2.4.0
Expand Down
2 changes: 1 addition & 1 deletion helm/values-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ microservice-chart:
fullnameOverride: ""
image:
repository: ghcr.io/pagopa/pagopa-gpd-upload-function
tag: "0.0.16"
tag: "0.0.15-2-PAGOPA-1787-cosmos-operation-refactoring"
pullPolicy: Always
# https://github.com/Azure/azure-functions-host/blob/dev/src/WebJobs.Script.WebHost/Controllers/HostController.cs
livenessProbe:
Expand Down
2 changes: 1 addition & 1 deletion helm/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ microservice-chart:
fullnameOverride: ""
image:
repository: ghcr.io/pagopa/pagopa-gpd-upload-function
tag: "0.0.16"
tag: "0.0.15-2-PAGOPA-1787-cosmos-operation-refactoring"
pullPolicy: Always
# https://github.com/Azure/azure-functions-host/blob/dev/src/WebJobs.Script.WebHost/Controllers/HostController.cs
livenessProbe:
Expand Down
2 changes: 1 addition & 1 deletion helm/values-uat.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ microservice-chart:
fullnameOverride: ""
image:
repository: ghcr.io/pagopa/pagopa-gpd-upload-function
tag: "0.0.16"
tag: "0.0.15-2-PAGOPA-1787-cosmos-operation-refactoring"
pullPolicy: Always
# https://github.com/Azure/azure-functions-host/blob/dev/src/WebJobs.Script.WebHost/Controllers/HostController.cs
livenessProbe:
Expand Down
3 changes: 1 addition & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@

<groupId>it.gov.pagopa.gpd.upload</groupId>
<artifactId>gpd-upload-function</artifactId>
<version>0.0.16</version>
<version>0.0.15-2-PAGOPA-1787-cosmos-operation-refactoring</version>
<packaging>jar</packaging>

<name>GPD-Upload-Function</name>

<properties>
Expand Down
20 changes: 11 additions & 9 deletions src/main/java/it/gov/pagopa/gpd/upload/ServiceFunction.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,28 +42,30 @@ public void run(

try {
QueueMessage msg = objectMapper.readValue(message, QueueMessage.class);
// extract from message
String key = msg.getUploadKey();
String orgFiscalCode = msg.getOrganizationFiscalCode();
// process message request
Function<RequestGPD, ResponseGPD> method = getMethod(msg, getGPDClient(ctx));
getOperationService(ctx, method, getPositionMessage(msg)).processRequestInBulk();

// check if upload is completed
Status status = getStatusService(ctx).getStatus(invocationId, msg.getOrganizationFiscalCode(), msg.getUploadKey());
Status status = getStatusService(ctx).getStatus(invocationId, orgFiscalCode, key);
if(status.upload.getCurrent() == status.upload.getTotal()) {
getStatusService(ctx).updateStatusEndTime(invocationId, status.fiscalCode, status.id, LocalDateTime.now());
report(ctx, logger, msg.getUploadKey(), msg.getBrokerCode(), msg.getOrganizationFiscalCode());
getStatusService(ctx).updateStatusEndTime(orgFiscalCode, key, LocalDateTime.now());
report(logger, key, status);
}

Runtime.getRuntime().gc();
} catch (Exception e) {
logger.log(Level.SEVERE, () -> String.format("[id=%s][ServiceFunction] Processing function exception: %s, caused by: %s", invocationId, e.getMessage(), e.getCause()));
logger.log(Level.SEVERE, () -> String.format("[id=%s][ServiceFunction] Processing function exception: %s, caused by: %s, localized-message: %s",
invocationId, e.getMessage(), e.getCause(), e.getLocalizedMessage()));
}
}

private void report(ExecutionContext ctx, Logger logger, String uploadKey, String broker, String fiscalCode) throws AppException, JsonProcessingException {
public boolean report(Logger logger, String uploadKey, Status status) throws AppException, JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
objectMapper.registerModule(new JavaTimeModule());
Status status = getStatusService(ctx).getStatus(ctx.getInvocationId(), fiscalCode, uploadKey);
BlobRepository.getInstance(logger).uploadReport(objectMapper.writeValueAsString(MapUtils.convert(status)), broker, fiscalCode, uploadKey + ".json");
return BlobRepository.getInstance(logger).uploadReport(objectMapper.writeValueAsString(MapUtils.convert(status)), status.getBrokerID(), status.getFiscalCode(), uploadKey + ".json");
}

private Function<RequestGPD, ResponseGPD> getMethod(QueueMessage msg, GPDClient gpdClient) {
Expand Down
40 changes: 19 additions & 21 deletions src/main/java/it/gov/pagopa/gpd/upload/ValidationFunction.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.azure.messaging.eventgrid.EventGridEvent;
import com.azure.messaging.eventgrid.systemevents.StorageBlobCreatedEventData;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
Expand All @@ -23,6 +22,7 @@
import it.gov.pagopa.gpd.upload.service.StatusService;
import it.gov.pagopa.gpd.upload.util.GPDValidator;

import java.time.LocalDateTime;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -34,6 +34,7 @@
* Validation step act as a filter and is followed by the queuing step
*/
public class ValidationFunction {
private static final String LOG_PREFIX = "[id=%s][upload=%s][ValidationFunction]:";

@FunctionName("BlobQueueEventFunction")
public void run(
Expand All @@ -44,15 +45,9 @@ public void run(

List<EventGridEvent> eventGridEvents = EventGridEvent.fromString(events);

if (eventGridEvents.isEmpty()) {
logger.log(Level.SEVERE, () -> String.format("[id=%s][ValidationFunction] Empty event list.", context.getInvocationId()));
return; // skip event
}

for (EventGridEvent event : eventGridEvents) {
if (event.getEventType().equals("Microsoft.Storage.BlobCreated")) {
logger.log(Level.INFO, () -> String.format("[id=%s][ValidationFunction] Call event type %s handler.", context.getInvocationId(), event.getEventType()));

logger.log(Level.INFO, () -> String.format(LOG_PREFIX + "Call event type %s handler.", context.getInvocationId(), "-", event.getEventType()));

StorageBlobCreatedEventData blobData = event.getData().toObject(StorageBlobCreatedEventData.class, new DefaultJsonSerializer());
if (blobData.getContentLength() > 1e+8) { // if file greater than 100 MB
Expand All @@ -64,8 +59,7 @@ public void run(
return; // skip event
}

logger.log(Level.INFO, () -> String.format("[id=%s][ValidationFunction] Blob event subject: %s", context.getInvocationId(), event.getSubject()));

logger.log(Level.INFO, () -> String.format(LOG_PREFIX + "Blob event subject: %s", context.getInvocationId(), "-", event.getSubject()));

Pattern pattern = Pattern.compile("/containers/(\\w+)/blobs/(\\w+)/input/([\\w\\-\\h]+\\.[Jj][Ss][Oo][Nn])");
Matcher matcher = pattern.matcher(event.getSubject());
Expand All @@ -76,11 +70,17 @@ public void run(
String fiscalCode = matcher.group(2); // creditor institution directory
String filename = matcher.group(3); // e.g. 77777777777c8a1.json

logger.log(Level.INFO, () -> String.format("[id=%s][ValidationFunction] broker: %s, fiscalCode: %s, filename: %s", context.getInvocationId(), broker, fiscalCode, filename));

BinaryData content = this.downloadBlob(context, broker, fiscalCode, filename);
String key = filename.substring(0, filename.indexOf("."));
this.validateBlob(context, broker, fiscalCode, key, content);

logger.log(Level.INFO, () -> String.format(LOG_PREFIX + "broker: %s, fiscalCode: %s, filename: %s",
context.getInvocationId(), key, broker, fiscalCode, filename));
try {
if(!this.validateBlob(context, broker, fiscalCode, key, content))
throw new AppException("Invalid blob");
} catch (AppException e) {
logger.log(Level.SEVERE, () -> String.format("[id=%s][ValidationFunction] Exception %s", context.getInvocationId(), e.getMessage()));
}

Runtime.getRuntime().gc();
} else {
Expand All @@ -90,7 +90,7 @@ public void run(
}
}

public boolean validateBlob(ExecutionContext ctx, String broker, String fiscalCode, String uploadKey, BinaryData content) {
public boolean validateBlob(ExecutionContext ctx, String broker, String fiscalCode, String uploadKey, BinaryData content) throws AppException {
int size = 0;
ObjectMapper om = new ObjectMapper();
om.registerModule(new JavaTimeModule());
Expand Down Expand Up @@ -121,12 +121,10 @@ else if(iupds != null)

// enqueue chunk and other input to form message
return enqueue(ctx, om, input.getOperation(), pps, iupds, uploadKey, fiscalCode, broker);
} catch (JsonMappingException e) {
// todo: in this case is a BAD_REQUEST -> update status
ctx.getLogger().log(Level.SEVERE, () -> String.format("[id=%s][ValidationFunction] Processing function exception: %s, caused by: %s", ctx.getInvocationId(), e.getMessage(), e.getCause()));
return false;
} catch (AppException | JsonProcessingException e) {
ctx.getLogger().log(Level.SEVERE, () -> String.format("[id=%s][ValidationFunction] Processing function exception: %s, caused by: %s", ctx.getInvocationId(), e.getMessage(), e.getCause()));
} catch (JsonProcessingException e) {
ctx.getLogger().log(Level.SEVERE, () -> String.format(LOG_PREFIX + "Processing function JsonMappingException: %s, caused by: %s",
ctx.getInvocationId(), uploadKey, e.getMessage(), e.getCause()));
StatusService.getInstance(ctx.getLogger()).updateStatusEndTime(fiscalCode, uploadKey, LocalDateTime.now());
return false;
}
}
Expand All @@ -144,7 +142,7 @@ public boolean enqueue(ExecutionContext ctx, ObjectMapper om, CRUDOperation oper
QueueService queueService = QueueService.getInstance(ctx.getLogger());
QueueMessage.QueueMessageBuilder builder = queueService.generateMessageBuilder(operation, uploadKey, fiscalCode, broker);
return switch (operation) {
case CREATE, UPDATE -> queueService.enqueueUpsertMessage(ctx, om, paymentPositions, builder, 0, QueueService.CHUNK_SIZE);
case CREATE, UPDATE -> queueService.enqueueUpsertMessage(ctx, om, paymentPositions, builder, 0);
case DELETE -> queueService.enqueueDeleteMessage(ctx, om, IUPDList, builder, 0);
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import it.gov.pagopa.gpd.upload.model.RequestGPD;
import it.gov.pagopa.gpd.upload.model.ResponseGPD;
import it.gov.pagopa.gpd.upload.model.RetryStep;
import it.gov.pagopa.gpd.upload.util.MapUtils;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
Expand Down Expand Up @@ -80,9 +81,7 @@ private Response callGPD(String httpMethod, String url, String body) {
.header(HEADER_SUBSCRIPTION_KEY, GPD_SUBSCRIPTION_KEY)
.header(HEADER_REQUEST_ID, requestId);

Response response = builder.method(httpMethod, Entity.json(body));
logger.log(Level.INFO, () -> String.format("[requestId=%s][%sDebtPositions] Response: %s", requestId, httpMethod, response.getStatus()));
return response;
return builder.method(httpMethod, Entity.json(body));
} catch (Exception e) {
logger.log(Level.WARNING, () -> String.format("[requestId=%s][%sDebtPositions] Exception: %s", requestId, httpMethod, e.getMessage()));
return Response.serverError().build();
Expand All @@ -98,7 +97,6 @@ private ResponseGPD mapResponse(Response response) throws JsonProcessingExceptio
responseGPD = ResponseGPD.builder()
.retryStep(RetryStep.DONE)
.status(status)
.detail(String.valueOf(status))
.build();
}
else if (status >= 400 && status < 500) {
Expand All @@ -112,6 +110,8 @@ else if (status >= 400 && status < 500) {
.retryStep(RetryStep.RETRY)
.detail(HttpStatus.INTERNAL_SERVER_ERROR.name()).build();
}
responseGPD.setDetail(MapUtils.getDetail(HttpStatus.valueOf(status)));

return responseGPD;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class Upload {
private LocalDateTime end;

public void addResponse(ResponseEntry responseEntry) {
if(responses == null) responses = new ArrayList<>();
// regardless of the condition increments the current counter because a list of IUPDs was processed
current += responseEntry.requestIDs.size();
for (ResponseEntry existingEntry : responses) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ public BinaryData download(String broker, String fiscalCode, String filename) {
return blobClient.downloadContent();
}

public void uploadReport(String data, String broker, String fiscalCode, String filename) {
public boolean uploadReport(String data, String broker, String fiscalCode, String filename) {
String blobPath = "/" + fiscalCode + "/" + OUTPUT_DIRECTORY + "/" + REPORT_SUFFIX + filename;
this.upload(data, broker, blobPath);
return this.upload(data, broker, blobPath);
}

public void upload(String data, String container, String blobPath) {
public boolean upload(String data, String container, String blobPath) {
try {
blobServiceClient = new BlobServiceClientBuilder()
.connectionString(connectionString)
Expand All @@ -73,8 +73,10 @@ public void upload(String data, String container, String blobPath) {
logger.log(Level.SEVERE, () -> "container doesn't exist");
BlobClient blobClient = blobContainerClient.getBlobClient(blobPath);
blobClient.upload(BinaryData.fromString(data));
return true;
} catch (BlobStorageException e) {
logger.log(Level.SEVERE, () -> "BlobStorageException " + e.getMessage());
return false;
}
}
}
Expand Down
Loading
Loading