Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/pn 12870 #648

Merged
merged 23 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
dc83268
PN-12870:red phase
DamianoLozzi Oct 8, 2024
f27450a
PN-12870:green phase
DamianoLozzi Oct 8, 2024
8b68c65
PN-12870:blue phase
DamianoLozzi Oct 8, 2024
a165b7b
Merge remote-tracking branch 'origin/develop' into feature/PN-12870
ElenaPagnacco Oct 10, 2024
a27f1d8
PN-12870: fixed scheduled task in TransformationService not repeating…
michelescara Oct 11, 2024
801d0d7
PN-12870: Added new configuration for Spring ThreadPoolTaskExecutor.
michelescara Oct 11, 2024
1b7d5dc
PN-12870: recuded PnSsTaskExecutionPoolCoreSize to 8
mottone-dgs Oct 11, 2024
2ab8e70
PN-12870: tuning on TaskExecutuon variables
mottone-dgs Oct 11, 2024
fb2e174
PN-12870: Fixed TaskExecutor properties settings.
michelescara Oct 11, 2024
4eee584
Merge remote-tracking branch 'origin/feature/PN-12870' into feature/P…
michelescara Oct 11, 2024
5216942
PN-12870: Fixed missing properties in tests.
michelescara Oct 11, 2024
ffd59a7
PN-12870: removed Load balancing algorithm
mottone-dgs Oct 11, 2024
7014635
PN-12870: Fixed scheduled task subscribing in TransformationService.
michelescara Oct 11, 2024
5433b5a
Merge remote-tracking branch 'origin/feature/PN-12870' into feature/P…
michelescara Oct 11, 2024
21e11d2
PN-12870: Fixed polling of messages in TransformationService schedule…
michelescara Oct 11, 2024
de7bc1d
PN-12870: Changed RasterMaxThreadPoolSize value in microservice-dev-c…
michelescara Oct 14, 2024
9b8ffce
PN-12870: added SchedulingConfiguration to manage scheduled tasks and…
Oct 15, 2024
d3167b2
PN-12870: added spring.task.scheduling.pool.size to allow multiple sc…
Oct 15, 2024
50be0e3
PN-12870: Added logs
michelescara Oct 16, 2024
60bbcf2
PN-12870: fake commit
Oct 16, 2024
92fb0a6
Merge remote-tracking branch 'origin/feature/PN-12870' into feature/P…
Oct 16, 2024
53379a4
Merge branch 'develop' into feature/PN-12870
michelescara Oct 22, 2024
5c74f10
PN-12870: Merged develop into branch.
michelescara Oct 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion scripts/aws/cfn/microservice-dev-cfg.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,12 @@
"PnSsSqsRetryStrategyMaxAttempts": "3",
"PnSsSqsRetryStrategyMinBackoff": "3",
"PnSsEventHandlerMaxMessages": "10",
"PnSsTransformationServiceMaxMessages": "10",
"SpringCodecMaxInMemorySize": "10000000",
"SignAndTimemarkMaxThreadPoolSize": "50",
"RasterMaxThreadPoolSize": "4"
"RasterMaxThreadPoolSize": "6",
"PnSsTaskExecutionPoolMaxSize": "2048",
"PnSsTaskExecutionPoolCoreSize": "8",
"PnSsTaskExecutionPoolQueueCapacity": "4096"
}
}
22 changes: 19 additions & 3 deletions scripts/aws/cfn/microservice.yml
Original file line number Diff line number Diff line change
Expand Up @@ -405,8 +405,18 @@ Parameters:

PnSsTaskExecutionPoolMaxSize:
Type: String
Description: 'SpringBoot configuration Task Execution Max Pool Siza'
Default: '512'
Description: 'SpringBoot configuration Task Execution Max Pool Size'
Default: 0

PnSsTaskExecutionPoolCoreSize:
Type: String
Description: 'SpringBoot configuration Task Execution Pool Core Size'
Default: 0

PnSsTaskExecutionPoolQueueCapacity:
Type: String
Description: 'SpringBoot configuration Task Execution Pool Queue Capacity'
Default: 0

PnSsDocumentDeletionMode:
Type: String
Expand Down Expand Up @@ -507,6 +517,10 @@ Parameters:
Type: Number
Description: 'Max number of attempts for SQS retry strategy'

PnSsTransformationServiceMaxMessages:
Type: Number
Description: 'Max number of messages to be processed by the TransformationService'

# Namirial provider properties
PnSsNamirialServerAddress:
Type: String
Expand Down Expand Up @@ -594,6 +608,9 @@ Resources:
ContainerEnvEntry59: !Sub 'SignAndTimemarkMaxThreadPoolSize=${SignAndTimemarkMaxThreadPoolSize}'
ContainerEnvEntry60: !Sub 'RasterMaxThreadPoolSize=${RasterMaxThreadPoolSize}'
ContainerEnvEntry61: !Sub 'SpringCodecMaxInMemorySize=${SpringCodecMaxInMemorySize}'
ContainerEnvEntry62: !Sub 'PnSsTransformationServiceMaxMessages=${PnSsTransformationServiceMaxMessages}'
ContainerEnvEntry63: !Sub 'PnSsTaskExecutionPoolCoreSize=${PnSsTaskExecutionPoolCoreSize}'
ContainerEnvEntry64: !Sub 'PnSsTaskExecutionPoolQueueCapacity=${PnSsTaskExecutionPoolQueueCapacity}'
MappedPaths: '/safestorage/*,/safe-storage/*'
ECSClusterName: !Ref ECSClusterName
Subnets: !Ref VpcEgressSubnetsIds
Expand All @@ -605,7 +622,6 @@ Resources:
TaskRoleManagedPolicyArn: !Ref PnSsManagedPolicy
LogAlarmStrategyV1: !Ref LogAlarmStrategy
EcsLogGroup: !Ref EcsLogGroup
LoadBalancingAlgorithm: round_robin
AutoscalingThreshold: !Ref AutoscalingThreshold
AutoscalingStrategy: REST-API
MinTasksNumber: !Ref MinTasksNumber
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ public void processRecords() {
.map(Tuple2::getT1)
.toList();
})
//
.flatMap(wrappers ->
Flux.fromIterable(wrappers)
.map(SqsMessageWrapper::getMessage)
Expand All @@ -113,7 +112,7 @@ public void processRecords() {
.then()
.doOnError(e -> log.fatal("DBStream: Errore generico ", e))
.doOnSuccess(unused -> log.logEndingProcess(PROCESS_RECORDS))
.subscribe();
.block();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public <T> Flux<SqsMessageWrapper<T>> getMessages(String queueName, Class<T> mes
AtomicBoolean listIsEmpty = new AtomicBoolean();
listIsEmpty.set(false);

BooleanSupplier condition = () -> (actualMessages.get() <= maxMessages && !listIsEmpty.get());
BooleanSupplier condition = () -> (actualMessages.get() < maxMessages && !listIsEmpty.get());

return getQueueUrlFromName(queueName).flatMap(queueUrl -> Mono.fromCompletionStage(sqsAsyncClient.receiveMessage(builder -> builder.queueUrl(
queueUrl))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver;
import org.springframework.validation.beanvalidation.LocalValidatorFactoryBean;
Expand Down Expand Up @@ -231,10 +229,6 @@ public S3Presigner s3Presigner()

return builder.build();
}
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor(); // Or use another one of your liking
}

private String getTaskId() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.*;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -33,7 +32,6 @@
* The class handles a scheduled task to periodically update this list.
*/
@CustomLog
@EnableScheduling
@Configuration
@ConditionalOnExpression("T(org.apache.commons.lang3.StringUtils).isNotEmpty('${pn.ss.ignored.update.metadata.list}')")
public class IgnoredUpdateMetadataConfig {
Expand Down Expand Up @@ -90,7 +88,7 @@ void refreshIgnoredUpdateMetadataListScheduled() {
.onErrorResume(FileNotModifiedException.class, throwable -> Mono.empty())
.doOnError(throwable -> log.logEndingProcess(REFRESH_IGNORED_UPDATE_METADATA_LIST_SCHEDULED, false, throwable.getMessage()))
.doOnSuccess(result -> log.logEndingProcess(REFRESH_IGNORED_UPDATE_METADATA_LIST_SCHEDULED))
.subscribe();
.block();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package it.pagopa.pnss.configuration;

import it.pagopa.pnss.configurationproperties.TaskExecutorConfigurationProperties;
import lombok.CustomLog;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
* A configuration class for the Spring ThreadPoolTaskExecutor. It allows to configure threads related settings.
*/
@Configuration
@CustomLog
public class TaskExecutorConfiguration {

private final TaskExecutorConfigurationProperties taskExecutorConfigurationProperties;

/**
* Instantiates a new Task executor configuration.
*
* @param taskExecutorConfigurationProperties the task executor configuration properties
*/
public TaskExecutorConfiguration(TaskExecutorConfigurationProperties taskExecutorConfigurationProperties) {
this.taskExecutorConfigurationProperties = taskExecutorConfigurationProperties;
}

/**
* The taskExecutor bean.
*
* @return the task executor
*/
@Bean
ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
if (taskExecutorConfigurationProperties.coreSize() != 0) {
taskExecutor.setCorePoolSize(taskExecutorConfigurationProperties.coreSize());
}
if (taskExecutorConfigurationProperties.maxSize() != 0) {
taskExecutor.setMaxPoolSize(taskExecutorConfigurationProperties.maxSize());
}
if (taskExecutorConfigurationProperties.queueCapacity() != 0) {
taskExecutor.setQueueCapacity(taskExecutorConfigurationProperties.queueCapacity());
}
taskExecutor.afterPropertiesSet();
return taskExecutor;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package it.pagopa.pnss.configuration.scheduling;


import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

@ConditionalOnProperty( value = "app.scheduling.enable",
havingValue = "true",
matchIfMissing = true)
@Configuration
@EnableScheduling
public class SchedulingConfiguration {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package it.pagopa.pnss.configurationproperties;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "pn.ss.task.execution.pool")
public record TaskExecutorConfigurationProperties(Integer maxSize, Integer coreSize, Integer queueCapacity) {
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package it.pagopa.pnss.transformation.service;


import io.awspring.cloud.messaging.listener.Acknowledgment;
import io.awspring.cloud.messaging.listener.SqsMessageDeletionPolicy;
import io.awspring.cloud.messaging.listener.annotation.SqsListener;
import it.pagopa.pn.commons.utils.MDCUtils;
import it.pagopa.pn.library.sign.pojo.PnSignDocumentResponse;
import it.pagopa.pn.library.sign.service.impl.PnSignProviderService;
Expand All @@ -13,24 +10,29 @@
import it.pagopa.pnss.common.client.DocumentClientCall;
import it.pagopa.pnss.common.exception.IllegalTransformationException;
import it.pagopa.pnss.common.exception.InvalidStatusTransformationException;
import it.pagopa.pnss.common.model.pojo.SqsMessageWrapper;
import it.pagopa.pnss.common.rest.call.pdfraster.PdfRasterCall;
import it.pagopa.pnss.common.service.SqsService;
import it.pagopa.pnss.common.utils.LogUtils;
import it.pagopa.pnss.configurationproperties.BucketName;
import it.pagopa.pnss.transformation.model.dto.CreatedS3ObjectDto;
import it.pagopa.pnss.transformation.model.dto.CreationDetail;
import it.pagopa.pnss.transformation.service.impl.S3ServiceImpl;
import lombok.CustomLog;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import software.amazon.awssdk.core.BytesWrapper;
import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.sqs.model.DeleteMessageResponse;

import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

Expand Down Expand Up @@ -60,6 +62,8 @@ public class TransformationService {
private String defaultInternalClientIdValue;
@Value("${s3.queue.sign-queue-name}")
private String signQueueName;
@Value("${pn.ss.transformation-service.max.messages}")
private int maxMessages;
// Numero massimo di retry. Due step: 1) firma del documento e inserimento nel bucket 2) delete del file dal bucket di staging, piu' un retry aggiuntivo di sicurezza
private static final int MAX_RETRIES = 3;

Expand All @@ -81,41 +85,59 @@ public TransformationService(S3ServiceImpl s3Service,
this.rasterSemaphore = new Semaphore(rasterMaxThreadPoolSize);
}

@SqsListener(value = "${s3.queue.sign-queue-name}", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
void newStagingBucketObjectCreatedListener(CreatedS3ObjectDto newStagingBucketObject, Acknowledgment acknowledgment) {
@Scheduled(cron = "${PnEcCronScaricamentoEsitiPec ?:*/10 * * * * *}")
void newStagingBucketObjectCreatedListener() {
MDC.clear();
var fileKey = isKeyPresent(newStagingBucketObject) ? newStagingBucketObject.getCreationDetailObject().getObject().getKey() : "null";
MDC.put(MDC_CORR_ID_KEY, fileKey);
log.logStartingProcess(NEW_STAGING_BUCKET_OBJECT_CREATED_LISTENER);
MDCUtils.addMDCToContextAndExecute(newStagingBucketObjectCreatedEvent(newStagingBucketObject, acknowledgment)
.doOnSuccess(result -> log.logEndingProcess(NEW_STAGING_BUCKET_OBJECT_CREATED_LISTENER))
.doOnError(throwable -> log.logEndingProcess(NEW_STAGING_BUCKET_OBJECT_CREATED_LISTENER, false, throwable.getMessage())))
.subscribe();
AtomicBoolean hasMessages = new AtomicBoolean();
hasMessages.set(true);
Mono.defer(() -> sqsService.getMessages(signQueueName, CreatedS3ObjectDto.class, maxMessages)
.doOnNext(messageWrapper -> logIncomingMessage(signQueueName, messageWrapper.getMessageContent()))
.flatMap(sqsMessageWrapper -> {
String fileKey = isKeyPresent(sqsMessageWrapper.getMessageContent()) ? sqsMessageWrapper.getMessageContent().getCreationDetailObject().getObject().getKey() : "null";
MDC.put(MDC_CORR_ID_KEY, fileKey);
return MDCUtils.addMDCToContextAndExecute(newStagingBucketObjectCreatedEvent(sqsMessageWrapper));
})
.collectList())
.doOnNext(list -> hasMessages.set(!list.isEmpty()))
.repeat(hasMessages::get)
.doOnError(e -> log.logEndingProcess(NEW_STAGING_BUCKET_OBJECT_CREATED_LISTENER, false, e.getMessage()))
.doOnComplete(() -> log.logEndingProcess(NEW_STAGING_BUCKET_OBJECT_CREATED_LISTENER))
.blockLast();
}

public Mono<Void> newStagingBucketObjectCreatedEvent(CreatedS3ObjectDto newStagingBucketObject, Acknowledgment acknowledgment) {
public Mono<DeleteMessageResponse> newStagingBucketObjectCreatedEvent(SqsMessageWrapper<CreatedS3ObjectDto> newStagingBucketObjectWrapper) {

log.debug(LogUtils.INVOKING_METHOD, NEW_STAGING_BUCKET_OBJECT_CREATED, newStagingBucketObject);
log.debug(LogUtils.INVOKING_METHOD, NEW_STAGING_BUCKET_OBJECT_CREATED, newStagingBucketObjectWrapper.getMessageContent());

AtomicReference<String> fileKeyReference = new AtomicReference<>("");
return Mono.fromCallable(() -> {
logIncomingMessage(signQueueName, newStagingBucketObject);
return newStagingBucketObject;
return newStagingBucketObjectWrapper;
})
.filter(this::isKeyPresent)
.doOnDiscard(CreatedS3ObjectDto.class, createdS3ObjectDto -> log.debug("The new staging bucket object with id '{}' was discarded", newStagingBucketObject.getId()))
.flatMap(createdS3ObjectDto -> {
var detailObject = createdS3ObjectDto.getCreationDetailObject();
var fileKey = detailObject.getObject().getKey();
.filter(wrapper-> isKeyPresent(wrapper.getMessageContent()))
.doOnDiscard(SqsMessageWrapper.class, wrapper -> {
CreatedS3ObjectDto newStagingBucketObject = (CreatedS3ObjectDto) wrapper.getMessageContent();
log.debug("The new staging bucket object with id '{}' was discarded", newStagingBucketObject.getId());
})
.flatMap(wrapper -> {
CreationDetail detailObject = wrapper.getMessageContent().getCreationDetailObject();
String fileKey = detailObject.getObject().getKey();
fileKeyReference.set(fileKey);
return objectTransformation(fileKey, detailObject.getBucketOriginDetail().getName(), newStagingBucketObject.getRetry(), true);
return objectTransformation(fileKey, detailObject.getBucketOriginDetail().getName(), wrapper.getMessageContent().getRetry(), true);
})
.then()
.doOnSuccess( s3ObjectDto -> acknowledgment.acknowledge())
.doOnError(throwable -> log.debug(EXCEPTION_IN_PROCESS + ": {}", NEW_STAGING_BUCKET_OBJECT_CREATED, throwable.getMessage(), throwable))
.flatMap(transformationResponse ->
sqsService.deleteMessageFromQueue(newStagingBucketObjectWrapper.getMessage(), signQueueName)
.doOnSuccess(response -> log.debug("Message with key '{}' was deleted", fileKeyReference.get()))
.doOnError(throwable -> log.error("An error occurred during deletion of message with key '{}' -> {}", fileKeyReference.get(), throwable.getMessage()))
)
.doOnError(throwable -> !(throwable instanceof InvalidStatusTransformationException || throwable instanceof IllegalTransformationException), throwable -> log.error("An error occurred during transformations for document with key '{}' -> {}", fileKeyReference.get(), throwable.getMessage()))
.onErrorResume(throwable -> newStagingBucketObject.getRetry() <= MAX_RETRIES, throwable -> {
newStagingBucketObject.setRetry(newStagingBucketObject.getRetry() + 1);
return sqsService.send(signQueueName, newStagingBucketObject).then(Mono.fromRunnable(acknowledgment::acknowledge));
.onErrorResume(throwable -> newStagingBucketObjectWrapper.getMessageContent().getRetry() <= MAX_RETRIES, throwable -> {
newStagingBucketObjectWrapper.getMessageContent().setRetry(newStagingBucketObjectWrapper.getMessageContent().getRetry() + 1);
return sqsService.send(signQueueName, newStagingBucketObjectWrapper.getMessageContent())
.then(sqsService.deleteMessageFromQueue(newStagingBucketObjectWrapper.getMessage(), signQueueName))
.doOnSuccess(response -> log.debug("Retry - Message with key '{}' was deleted", fileKeyReference.get()))
.doOnError(throwable1 -> log.error("Retry - An error occurred during deletion of message with key '{}' -> {}", fileKeyReference.get(), throwable1.getMessage()));
});
}

Expand All @@ -133,7 +155,9 @@ public Mono<DeleteObjectResponse> objectTransformation(String key, String stagin
.switchIfEmpty(Mono.error(new IllegalTransformationException(key)))
.filterWhen(document -> isSignatureNeeded(key, retry))
.flatMap(document -> chooseTransformationType(document, key, stagingBucketName, marcatura))
.then(removeObjectFromStagingBucket(key, stagingBucketName));
.then(removeObjectFromStagingBucket(key, stagingBucketName))
.doOnSuccess(response -> log.debug(SUCCESSFUL_OPERATION_LABEL, OBJECT_TRANSFORMATION, response))
.doOnError(throwable -> log.debug(EXCEPTION_IN_PROCESS + ": {}", OBJECT_TRANSFORMATION, throwable.getMessage(), throwable));
}

private Mono<PutObjectResponse> chooseTransformationType(Document document, String key, String stagingBucketName, boolean marcatura) {
Expand All @@ -160,6 +184,7 @@ private Mono<PutObjectResponse> rasterTransformation(Document document, String k
.map(BytesWrapper::asByteArray)
.flatMap(fileBytes -> pdfRasterCall.convertPdf(fileBytes, key))
.flatMap(convertedDocument -> s3Service.putObject(key, convertedDocument, document.getContentType(), bucketName.ssHotName()))
.doOnError(throwable -> log.debug(EXCEPTION_IN_PROCESS + ": {}", RASTER_TRANSFORMATION, throwable.getMessage(), throwable))
.doFinally(signalType -> rasterSemaphore.release());
}

Expand Down
Loading