Skip to content

Commit

Permalink
CIRCSTORE-521 add request batch event publishing
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-barannyk committed Aug 20, 2024
1 parent 81c9932 commit f37a288
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 27 deletions.
22 changes: 22 additions & 0 deletions ramls/request-queue-reordering.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "Requests batch update",
"description": "List of ids reordered requests",
"type": "object",
"properties": {
"instanceId": {
"description": "Instance ID of reordered requests",
"type": "string",
"pattern": "^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$"
},
"requestIds": {
"description": "Array of requests ids",
"type": "array",
"items": {
"type": "string",
"pattern": "^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$"
}
}
},
"additionalProperties": false
}
1 change: 1 addition & 0 deletions ramls/request-storage-batch.raml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ documentation:
types:
requests-batch: !include requests-batch.json
errors: !include raml-util/schemas/errors.schema
request-queue-reordering: !include request-queue-reordering.json

traits:
validate: !include raml-util/traits/validation.raml
Expand Down
10 changes: 3 additions & 7 deletions src/main/java/org/folio/rest/impl/RequestsBatchAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,11 @@ public void postRequestStorageBatchRequests(
return;
}

BatchResourceService batchUpdateService = new BatchResourceService(
PgUtil.postgresClient(context, okapiHeaders)
);
RequestBatchResourceService requestBatchUpdateService = new RequestBatchResourceService(
context, okapiHeaders);

RequestBatchResourceService requestBatchUpdateService =
new RequestBatchResourceService(tenantId(okapiHeaders), batchUpdateService);

requestBatchUpdateService.executeRequestBatchUpdate(entity.getRequests(),
updateResult -> {
requestBatchUpdateService.executeRequestBatchUpdate(entity.getRequests(), updateResult -> {
// Successfully updated
if (updateResult.succeeded()) {
LOG.debug("Batch update executed successfully");
Expand Down
35 changes: 23 additions & 12 deletions src/main/java/org/folio/service/BatchResourceService.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,43 +38,54 @@ public BatchResourceService(PostgresClient postgresClient) {
* @param batchFactories - Factory to create a batch update chunk.
* @param onFinishHandler - Callback.
*/
public void executeBatchUpdate(
public Future<Void> executeBatchUpdate(
List<Function<SQLConnection, Future<RowSet<Row>>>> batchFactories,
Handler<AsyncResult<Void>> onFinishHandler) {

Promise<Void> promise = Promise.promise();

postgresClient.startTx(connectionResult -> {
if (connectionResult.failed()) {
LOG.warn("Can not start transaction", connectionResult.cause());
onFinishHandler.handle(failedFuture(connectionResult.cause()));
LOG.warn("Cannot start transaction", connectionResult.cause());
onFinishHandler.handle(Future.failedFuture(connectionResult.cause()));
promise.fail(connectionResult.cause());
return;
}

SQLConnection connection = connectionResult.result();
Future<RowSet<Row>> lastUpdate = Future.succeededFuture();

// Using this future for chaining updates
Future<RowSet<Row>> lastUpdate = succeededFuture();
for (Function<SQLConnection, Future<RowSet<Row>>> factory : batchFactories) {
lastUpdate = lastUpdate.compose(prev -> factory.apply(connection));
}

// Handle overall update result and decide on whether to commit or rollback transaction
lastUpdate.onComplete(updateResult -> {
if (updateResult.failed()) {
LOG.warn("Batch update rejected", updateResult.cause());

// Rollback transaction and keep original cause.
postgresClient.rollbackTx(connectionResult,
rollback -> onFinishHandler.handle(failedFuture(updateResult.cause()))
);
postgresClient.rollbackTx(connectionResult, rollback -> {
onFinishHandler.handle(Future.failedFuture(updateResult.cause()));
promise.fail(updateResult.cause());
});
} else {
LOG.debug("Update successful, committing transaction");

postgresClient.endTx(connectionResult, onFinishHandler);
postgresClient.endTx(connectionResult, commitResult -> {
if (commitResult.succeeded()) {
onFinishHandler.handle(Future.succeededFuture());
promise.complete();
} else {
LOG.warn("Failed to commit transaction", commitResult.cause());
onFinishHandler.handle(Future.failedFuture(commitResult.cause()));
promise.fail(commitResult.cause());
}
});
}
});
});
}

return promise.future();
}
/**
* Creates update single entity batch function.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static org.folio.support.kafka.topic.CirculationStorageKafkaTopic.CIRCULATION_SETTINGS;
import static org.folio.support.kafka.topic.CirculationStorageKafkaTopic.LOAN;
import static org.folio.support.kafka.topic.CirculationStorageKafkaTopic.REQUEST;
import static org.folio.support.kafka.topic.CirculationStorageKafkaTopic.REQUEST_QUEUE_REORDERING;
import static org.folio.support.kafka.topic.CirculationStorageKafkaTopic.RULES;

import java.util.Map;
Expand All @@ -19,6 +20,7 @@
import org.folio.rest.jaxrs.model.CirculationSetting;
import org.folio.rest.jaxrs.model.Loan;
import org.folio.rest.jaxrs.model.Request;
import org.folio.rest.jaxrs.model.RequestQueueReordering;

import io.vertx.core.Context;
import lombok.extern.log4j.Log4j2;
Expand Down Expand Up @@ -53,6 +55,15 @@ public static EntityChangedEventPublisher<String, Request> requestEventPublisher
new RequestRepository(vertxContext, okapiHeaders));
}

public static EntityChangedEventPublisher<String, RequestQueueReordering>
requestBatchEventPublisher(Context vertxContext, Map<String, String> okapiHeaders) {

return new EntityChangedEventPublisher<>(okapiHeaders, RequestQueueReordering::getInstanceId,
NULL_ID, new EntityChangedEventFactory<>(), new DomainEventPublisher<>(vertxContext,
REQUEST_QUEUE_REORDERING.fullTopicName(tenantId(okapiHeaders)),
FailureHandler.noOperation()), null);
}

public static EntityChangedEventPublisher<String, CheckIn> checkInEventPublisher(
Context vertxContext, Map<String, String> okapiHeaders) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,28 @@

import static java.util.stream.IntStream.rangeClosed;
import static org.folio.rest.persist.PostgresClient.convertToPsqlStandard;
import static org.folio.rest.tools.utils.TenantTool.tenantId;
import static org.folio.service.event.EntityChangedEventPublisherFactory.requestBatchEventPublisher;
import static org.folio.support.ModuleConstants.REQUEST_TABLE;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.folio.rest.jaxrs.model.Request;
import org.folio.rest.jaxrs.model.RequestQueueReordering;
import org.folio.rest.persist.PgUtil;
import org.folio.rest.persist.SQLConnection;
import org.folio.service.BatchResourceService;
import org.folio.service.event.EntityChangedEventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.sqlclient.Row;
Expand All @@ -29,12 +36,13 @@ public class RequestBatchResourceService {

private final BatchResourceService batchResourceService;
private final String tenantName;
private final EntityChangedEventPublisher<String, RequestQueueReordering> eventPublisher;

public RequestBatchResourceService(String tenantName,
BatchResourceService batchResourceService) {

this.batchResourceService = batchResourceService;
this.tenantName = tenantName;
public RequestBatchResourceService(Context context, Map<String, String> okapiHeaders) {
this.batchResourceService = new BatchResourceService(PgUtil.postgresClient(context,
okapiHeaders));
this.tenantName = tenantId(okapiHeaders);
this.eventPublisher = requestBatchEventPublisher(context, okapiHeaders);
}

/**
Expand All @@ -59,8 +67,8 @@ public RequestBatchResourceService(String tenantName,
* @param requests - List of requests to execute in batch.
* @param onFinishHandler - Callback function.
*/
public void executeRequestBatchUpdate(
List<Request> requests, Handler<AsyncResult<Void>> onFinishHandler) {
public void executeRequestBatchUpdate(List<Request> requests,
Handler<AsyncResult<Void>> onFinishHandler) {

LOG.debug("Removing positions for all request to go through positions constraint");
List<Function<SQLConnection, Future<RowSet<Row>>>> allDatabaseOperations =
Expand All @@ -80,7 +88,20 @@ public void executeRequestBatchUpdate(
LOG.info("Executing batch update, total records to update [{}] (including remove positions)",
allDatabaseOperations.size());

batchResourceService.executeBatchUpdate(allDatabaseOperations, onFinishHandler);
RequestQueueReordering payload = mapRequestsToPayload(requests);
LOG.info("executeRequestBatchUpdate:: payload: {}", payload);

batchResourceService.executeBatchUpdate(allDatabaseOperations, onFinishHandler)
.compose(v -> eventPublisher.publishCreated(payload.getInstanceId(), payload));
}

private RequestQueueReordering mapRequestsToPayload(List<Request> requests) {

return new RequestQueueReordering()
.withRequestIds(requests.stream()
.map(Request::getId)
.toList())
.withInstanceId(requests.get(0).getInstanceId());
}

private Function<SQLConnection, Future<RowSet<Row>>> removePositionsForRequestsBatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

public enum CirculationStorageKafkaTopic implements KafkaTopic {
REQUEST("request", 10),
REQUEST_QUEUE_REORDERING("request-queue-reordering", 10),
CIRCULATION_SETTINGS("circulation-settings", 10),
LOAN("loan", 10),
CHECK_IN("check-in", 10),
Expand Down

0 comments on commit f37a288

Please sign in to comment.