Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: refactored code for batched riconciliation #1

Merged
merged 1 commit into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions helm/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ microservice-chart:
servicePort: 8080
serviceAccount:
create: false
annotations: {}
annotations: { }
name: ""
podAnnotations: {}
podAnnotations: { }
podSecurityContext:
seccompProfile:
type: RuntimeDefault
Expand All @@ -47,8 +47,8 @@ microservice-chart:
memory: "512Mi"
cpu: "0.25"
limits:
memory: "512Mi"
cpu: "0.25"
memory: "768Mi"
cpu: "0.50"
autoscaling:
enable: true
minReplica: 1
Expand All @@ -66,7 +66,7 @@ microservice-chart:
APPLICATIONINSIGHTS_CONNECTION_STRING: 'ai-connection-string'
APPLICATIONINSIGHTS_ROLE_NAME: "pagopa-nodo-verifyko-aux"
ENV: 'aks-dev'
APP_LOGGING_LEVEL: 'DEBUG'
APP_LOGGING_LEVEL: 'INFO'
DEFAULT_LOGGING_LEVEL: 'INFO'
CORS_CONFIGURATION: '{"origins": ["*"], "methods": ["*"]}'
VERIFYKO_COLDSTORAGE_TABLE: 'events'
Expand Down Expand Up @@ -94,8 +94,8 @@ microservice-chart:
tenantId: "7788edaf-0346-4068-9d79-c868aed15b3d"
tmpVolumeMount:
create: true
nodeSelector: {}
tolerations: []
nodeSelector: { }
tolerations: [ ]
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.azure.spring.data.cosmos.config.CosmosConfig;
import com.azure.spring.data.cosmos.core.ResponseDiagnostics;
import com.azure.spring.data.cosmos.core.ResponseDiagnosticsProcessor;
import com.azure.spring.data.cosmos.core.mapping.EnableCosmosAuditing;
import com.azure.spring.data.cosmos.repository.config.EnableCosmosRepositories;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -21,7 +20,6 @@
@Configuration
@EnableCosmosRepositories("it.gov.pagopa.nodoverifykoaux.repository")
@EnableConfigurationProperties
@EnableCosmosAuditing
@ConditionalOnExpression("'${info.properties.environment}'!='test'")
@Slf4j
public class CosmosDBConfig extends AbstractCosmosConfiguration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,4 @@ public void trowingApiInvocation(JoinPoint joinPoint, ResponseEntity<ProblemJson
log.info("Failed API operation {} - error: {}", MDC.get(METHOD), result);
MDC.clear();
}

@Around(value = "repository() || service()")
public Object logTrace(ProceedingJoinPoint joinPoint) throws Throwable {
Map<String, String> params = getParams(joinPoint);
log.debug("Call method {} - args: {}", joinPoint.getSignature().toShortString(), params);
Object result = joinPoint.proceed();
log.debug("Return method {} - result: {}", joinPoint.getSignature().toShortString(), result);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ public ActionController(ReconciliationService reconciliationService) {
@ApiResponse(responseCode = "400", description = "If passed date is invalid.", content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE, schema = @Schema(implementation = ProblemJson.class))),
@ApiResponse(responseCode = "500", description = "If an error occurred during execution.", content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE, schema = @Schema(implementation = ProblemJson.class)))})
@OpenAPITableMetadata(external = false, idempotency = false, readWriteIntense = OpenAPITableMetadata.ReadWrite.BOTH)
public ResponseEntity<ReconciliationStatus> reconcileEventsByDate(@Parameter(description = "The date, in yyyy-MM-dd format, on which the reconciliation will be executed.", example = "2024-01-01", required = true) @RequestParam String date) {
return ResponseEntity.ok(reconciliationService.reconcileEventsByDate(date));
public ResponseEntity<ReconciliationStatus> reconcileEventsByDate(
@Parameter(description = "The date, in yyyy-MM-dd format, on which the reconciliation will be executed.", example = "2024-01-01", required = true)
@RequestParam String date,
@Parameter(description = "The size of the batch on which the reconciliation will be executed for each steps. This avoids the large queries to storages. Defined in minutes.", example = "30")
@RequestParam(value = "batch-size-in-minutes", required = false, defaultValue = "1440") Long batchSizeInMinutes) {
return ResponseEntity.ok(reconciliationService.reconcileEventsByDate(date, batchSizeInMinutes));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ public DataStorageRepository(CosmosTemplate cosmosTemplate) {
this.cosmosTemplate = cosmosTemplate;
}

public Set<String> getIDsByDate(String date) {
String q = "SELECT VALUE e.id FROM e WHERE e.PartitionKey LIKE '" + date + "%'";
SqlQuerySpec query = new SqlQuerySpec(q);
public Set<String> getIDsByDate(String date, Long lowerBoundTimestamp, Long upperBoundTimestamp) {
SqlQuerySpec query = new SqlQuerySpec("SELECT VALUE e.id FROM e" +
" WHERE e.PartitionKey LIKE '" + date + "%'" +
" AND e.faultBean.timestamp >= " + lowerBoundTimestamp +
" AND e.faultBean.timestamp <= " + upperBoundTimestamp);
Spliterator<String> iterator = cosmosTemplate.runQuery(query, HotStorageVerifyKO.class, String.class).spliterator();
return StreamSupport.stream(iterator, false).collect(Collectors.toSet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ public TableStorageRepository(@Value("${verifyko.cold-storage.connection-string}
.getTableReference(tableName);
}

public Set<String> getIDsByDate(String date) {
public Set<String> getIDsByDate(String partitionKey, Long dateLowerBound, Long dateUpperBound) {
Set<String> ids = new HashSet<>();
String queryWhereClause = TableQuery.generateFilterCondition("PartitionKey", TableQuery.QueryComparisons.EQUAL, date);
String queryWhereClausePartitionKey = TableQuery.generateFilterCondition("PartitionKey", TableQuery.QueryComparisons.EQUAL, partitionKey);
String queryWhereClauseDateTimeLowerBound = TableQuery.generateFilterCondition("timestamp", TableQuery.QueryComparisons.GREATER_THAN_OR_EQUAL, dateLowerBound);
String queryWhereClauseDateTimeUpperBound = TableQuery.generateFilterCondition("timestamp", TableQuery.QueryComparisons.LESS_THAN_OR_EQUAL, dateUpperBound);
String queryWhereClauseDateTime = TableQuery.combineFilters(queryWhereClauseDateTimeLowerBound, TableQuery.Operators.AND, queryWhereClauseDateTimeUpperBound);
String queryWhereClause = TableQuery.combineFilters(queryWhereClausePartitionKey, TableQuery.Operators.AND, queryWhereClauseDateTime);
TableQuery<ColdStorageVerifyKO> query = TableQuery.from(ColdStorageVerifyKO.class).where(queryWhereClause).select(new String[]{"RowKey"});
Iterable<ColdStorageVerifyKO> result = table.execute(query);
result.forEach(entity -> ids.add(entity.getRowKey()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private static ReconciliationStatus generateReconciliationStatus(List<Reconciled
}

@Transactional
public ReconciliationStatus reconcileEventsByDate(String date) {
public ReconciliationStatus reconcileEventsByDate(String date, Long minutesForEachBatch) {

Date startTime = Calendar.getInstance().getTime();

Expand All @@ -87,18 +87,35 @@ public ReconciliationStatus reconcileEventsByDate(String date) {
}
String stringedDate = date.replace("-0", "-");

// Retrieving IDs by date either from cold storage and from hot storage
Set<ConvertedKey> coldStorageIDsForDate = coldStorageRepo.getIDsByDate(stringedDate).stream()
.map(ConvertedKey::new)
.collect(Collectors.toSet());
Set<String> hotStorageIDsForDate = hotStorageRepo.getIDsByDate(CommonUtility.generatePartitionKeyForHotStorage(stringedDate));
log.info(String.format("Found [%d] elements in the cold storage and [%d] in the hot storage for the date [%s] (searched as [%s])", coldStorageIDsForDate.size(), hotStorageIDsForDate.size(), date, stringedDate));
// Initialize status data
List<ReconciledEventStatus> coldToHotReconciledEvents = new LinkedList<>();
List<ReconciledEventStatus> hotToColdReconciledEvents = new LinkedList<>();
Date dateLowerBound = dateValidator.getDate(date);

long batchCounter = 1;
while (!isComputationEnded(date, dateLowerBound)) {

Date dateUpperBound = dateValidator.getDate(date, minutesForEachBatch * batchCounter);
Long dateLowerBoundTimestamp = dateLowerBound.getTime() / 1000;
Long dateUpperBoundTimestamp = dateUpperBound.getTime() / 1000;

// Reconcile events from cold storage to hot storage and retrieve the list of status info for each persisted event
List<ReconciledEventStatus> coldToHotReconciledEvents = reconcileEventsFromColdToHotStorage(coldStorageIDsForDate, hotStorageIDsForDate, stringedDate);
// Retrieving IDs by date either from cold storage and from hot storage
Set<ConvertedKey> coldStorageIDsForDate = coldStorageRepo.getIDsByDate(stringedDate, dateLowerBoundTimestamp, dateUpperBoundTimestamp).stream()
.map(ConvertedKey::new)
.collect(Collectors.toSet());
Set<String> hotStorageIDsForDate = hotStorageRepo.getIDsByDate(CommonUtility.generatePartitionKeyForHotStorage(stringedDate), dateLowerBoundTimestamp, dateUpperBoundTimestamp);
log.info(String.format("Analyzing time section [%d-%d]. Found [%d] elements in the cold storage and [%d] in the hot storage for the date [%s] (searched as [%s])", dateLowerBoundTimestamp, dateUpperBoundTimestamp, coldStorageIDsForDate.size(), hotStorageIDsForDate.size(), date, stringedDate));

// Reconcile events from hot storage to cold storage and retrieve the list of status info for each persisted event
List<ReconciledEventStatus> hotToColdReconciledEvents = reconcileEventsFromHotToColdStorage(coldStorageIDsForDate, hotStorageIDsForDate, stringedDate);
// Reconcile events from cold storage to hot storage and retrieve the list of status info for each persisted event
coldToHotReconciledEvents.addAll(reconcileEventsFromColdToHotStorage(coldStorageIDsForDate, hotStorageIDsForDate, stringedDate));

// Reconcile events from hot storage to cold storage and retrieve the list of status info for each persisted event
hotToColdReconciledEvents.addAll(reconcileEventsFromHotToColdStorage(coldStorageIDsForDate, hotStorageIDsForDate, stringedDate));

// Update batch counter and date bounds
dateLowerBound = dateUpperBound;
batchCounter++;
}

// Last, return the general status for reconciliation operation
return generateReconciliationStatus(coldToHotReconciledEvents, hotToColdReconciledEvents, startTime, date, stringedDate);
Expand Down Expand Up @@ -231,4 +248,8 @@ private List<ReconciledEventStatus> reconcileEventsFromColdToHotStorage(Set<Conv
return coldToHotReconciledEvents;
}

private boolean isComputationEnded(String analyzedDate, Date lowerBoundDate) {
return !analyzedDate.equals(this.dateValidator.getDateAsString(lowerBoundDate).substring(0, 10));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,15 @@
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;

@Slf4j
public class DateValidator {

private final SimpleDateFormat dateFormatter;

private final DateTimeFormatter dateTimeFormatter;

public DateValidator(String format) {
this.dateFormatter = new SimpleDateFormat(format);
this.dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSZ");
}

public boolean isValid(String date) {
Expand All @@ -35,7 +31,23 @@ public Date getDate(String dateAsString) {
return date;
}

public Date getDate(String dateAsString, Long minutesFromStartDay) {
Date date = null;
dateFormatter.setLenient(false);
try {
date = dateFormatter.parse(dateAsString);
date.setTime(date.getTime() + (minutesFromStartDay * 60 * 1000));
} catch (ParseException e) {
log.warn(String.format("Error while trying to parse string as date. Invalid string format: [%s] must follows 'yyyy-MM-dd' format", dateAsString));
}
return date;
}

public Long getDateAsTimestamp(String dateAsString) {
return ZonedDateTime.parse(dateAsString).toEpochSecond();
}

public String getDateAsString(Date date) {
return dateFormatter.format(date);
}
}
Loading