Skip to content

Commit

Permalink
[CIRCSTORE-521] Publish request batch event when requests are reorder…
Browse files Browse the repository at this point in the history
…ed (#482)

* CIRCSTORE-521 add request batch event publishing

* CIRCSTORE-521 update logging

* CIRCSTORE-521 fix broken test

* CIRCSTORE-521 revert comments

* CIRCSTORE-521 refactoring

* CIRCSTORE-521 create test

* CIRCSTORE-521 update test

* CIRCSTORE-521 rename method

* CIRCSTORE-521 use uuid reference for schema

* CIRCSTORE-521 add events cleaning

* CIRCSTORE-521 Change sorting in tests

---------

Co-authored-by: alexanderkurash <alexander.kurash@gmail.com>
  • Loading branch information
roman-barannyk and alexanderkurash authored Aug 29, 2024
1 parent a7c1910 commit b35fa1d
Show file tree
Hide file tree
Showing 11 changed files with 212 additions and 60 deletions.
22 changes: 22 additions & 0 deletions ramls/request-queue-reordering.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "Requests batch update",
"description": "List of ids reordered requests",
"type": "object",
"properties": {
"instanceId": {
"description": "Instance ID of reordered requests",
"type": "string",
"$ref": "raml-util/schemas/uuid.schema"
},
"requestIds": {
"description": "Array of requests ids",
"type": "array",
"items": {
"type": "string",
"$ref": "raml-util/schemas/uuid.schema"
}
}
},
"additionalProperties": false
}
1 change: 1 addition & 0 deletions ramls/request-storage-batch.raml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ documentation:
types:
requests-batch: !include requests-batch.json
errors: !include raml-util/schemas/errors.schema
request-queue-reordering: !include request-queue-reordering.json

traits:
validate: !include raml-util/traits/validation.raml
Expand Down
25 changes: 8 additions & 17 deletions src/main/java/org/folio/rest/impl/RequestsBatchAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,14 @@
import static org.folio.rest.jaxrs.resource.RequestStorageBatch.PostRequestStorageBatchRequestsResponse.respond201;
import static org.folio.rest.jaxrs.resource.RequestStorageBatch.PostRequestStorageBatchRequestsResponse.respond422WithApplicationJson;
import static org.folio.rest.jaxrs.resource.RequestStorageBatch.PostRequestStorageBatchRequestsResponse.respond500WithTextPlain;
import static org.folio.rest.tools.utils.TenantTool.tenantId;

import java.util.Map;

import javax.ws.rs.core.Response;
import org.folio.rest.annotations.Validate;
import org.folio.rest.jaxrs.model.RequestsBatch;
import org.folio.rest.jaxrs.resource.RequestStorageBatch;
import org.folio.rest.persist.PgUtil;
import org.folio.rest.tools.utils.MetadataUtil;
import org.folio.service.BatchResourceService;
import org.folio.service.request.RequestBatchResourceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -26,7 +23,7 @@
import io.vertx.core.Handler;

public class RequestsBatchAPI implements RequestStorageBatch {
private static final Logger LOG = LoggerFactory.getLogger(RequestsBatchAPI.class);
private static final Logger log = LoggerFactory.getLogger(RequestsBatchAPI.class);

@Validate
@Override
Expand All @@ -38,36 +35,30 @@ public void postRequestStorageBatchRequests(
MetadataUtil.populateMetadata(entity.getRequests(), okapiHeaders);
} catch (Throwable e) {
String msg = "Cannot populate metadata of request list elements: " + e.getMessage();
LOG.error(msg, e);
log.error(msg, e);
asyncResultHandler.handle(succeededFuture(respond500WithTextPlain(msg)));
return;
}

BatchResourceService batchUpdateService = new BatchResourceService(
PgUtil.postgresClient(context, okapiHeaders)
);

RequestBatchResourceService requestBatchUpdateService =
new RequestBatchResourceService(tenantId(okapiHeaders), batchUpdateService);

requestBatchUpdateService.executeRequestBatchUpdate(entity.getRequests(),
updateResult -> {
log.info("postRequestStorageBatchRequests:: requests: {}", entity.getRequests());
new RequestBatchResourceService(context, okapiHeaders)
.executeRequestBatchUpdate(entity.getRequests(), updateResult -> {
// Successfully updated
if (updateResult.succeeded()) {
LOG.debug("Batch update executed successfully");
log.debug("Batch update executed successfully");
asyncResultHandler.handle(succeededFuture(respond201()));
return;
}

// Update failed due to can not have more then one request in the same position
if (hasSamePositionConstraintViolated(updateResult.cause())) {
LOG.warn("Same position constraint violated", updateResult.cause());
log.warn("Same position constraint violated", updateResult.cause());
asyncResultHandler.handle(succeededFuture(
respond422WithApplicationJson(samePositionInQueueError(null, null))
));
} else {
// Other failure occurred
LOG.warn("Unhandled error occurred during update", updateResult.cause());
log.warn("Unhandled error occurred during update", updateResult.cause());
asyncResultHandler.handle(succeededFuture(
respond500WithTextPlain(updateResult.cause().getMessage())
));
Expand Down
40 changes: 27 additions & 13 deletions src/main/java/org/folio/service/BatchResourceService.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import io.vertx.sqlclient.RowSet;

public class BatchResourceService {
private static final Logger LOG = LoggerFactory.getLogger(BatchResourceService.class);
private static final Logger log = LoggerFactory.getLogger(BatchResourceService.class);
private static final String WHERE_CLAUSE = "WHERE id = '%s'";

private final PostgresClient postgresClient;
Expand All @@ -38,14 +38,17 @@ public BatchResourceService(PostgresClient postgresClient) {
* @param batchFactories - Factory to create a batch update chunk.
* @param onFinishHandler - Callback.
*/
public void executeBatchUpdate(
public Future<Void> executeBatchUpdate(
List<Function<SQLConnection, Future<RowSet<Row>>>> batchFactories,
Handler<AsyncResult<Void>> onFinishHandler) {

Promise<Void> promise = Promise.promise();

postgresClient.startTx(connectionResult -> {
if (connectionResult.failed()) {
LOG.warn("Can not start transaction", connectionResult.cause());
log.warn("Cannot start transaction", connectionResult.cause());
onFinishHandler.handle(failedFuture(connectionResult.cause()));
promise.fail(connectionResult.cause());
return;
}

Expand All @@ -60,21 +63,32 @@ public void executeBatchUpdate(
// Handle overall update result and decide on whether to commit or rollback transaction
lastUpdate.onComplete(updateResult -> {
if (updateResult.failed()) {
LOG.warn("Batch update rejected", updateResult.cause());
log.warn("Batch update rejected", updateResult.cause());

// Rollback transaction and keep original cause.
postgresClient.rollbackTx(connectionResult,
rollback -> onFinishHandler.handle(failedFuture(updateResult.cause()))
);
postgresClient.rollbackTx(connectionResult, rollback -> {
onFinishHandler.handle(failedFuture(updateResult.cause()));
promise.fail(updateResult.cause());
});
} else {
LOG.debug("Update successful, committing transaction");

postgresClient.endTx(connectionResult, onFinishHandler);
log.debug("Update successful, committing transaction");

postgresClient.endTx(connectionResult, commitResult -> {
if (commitResult.succeeded()) {
onFinishHandler.handle(succeededFuture());
promise.complete();
} else {
log.warn("Failed to commit transaction", commitResult.cause());
onFinishHandler.handle(failedFuture(commitResult.cause()));
promise.fail(commitResult.cause());
}
});
}
});
});
}

return promise.future();
}
/**
* Creates update single entity batch function.
*
Expand All @@ -92,7 +106,7 @@ public <T> Function<SQLConnection, Future<RowSet<Row>>> updateSingleEntityBatchF
final Promise<RowSet<Row>> promise = promise();
final Future<SQLConnection> connectionResult = succeededFuture(connection);

LOG.debug("Updating entity {} with id {}", entity, id);
log.debug("Updating entity {} with id {}", entity, id);

postgresClient.update(connectionResult, tableName, entity, "jsonb",
String.format(WHERE_CLAUSE, id), false, promise);
Expand All @@ -113,7 +127,7 @@ public Function<SQLConnection, Future<RowSet<Row>>> queryWithParamsBatchFactory(
String query, Collection<?> params) {

return connection -> {
LOG.debug("Executing SQL [{}], got [{}] parameters", query, params.size());
log.debug("Executing SQL [{}], got [{}] parameters", query, params.size());

final Promise<RowSet<Row>> promise = promise();
final Future<SQLConnection> connectionResult = succeededFuture(connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static org.folio.support.kafka.topic.CirculationStorageKafkaTopic.CIRCULATION_SETTINGS;
import static org.folio.support.kafka.topic.CirculationStorageKafkaTopic.LOAN;
import static org.folio.support.kafka.topic.CirculationStorageKafkaTopic.REQUEST;
import static org.folio.support.kafka.topic.CirculationStorageKafkaTopic.REQUEST_QUEUE_REORDERING;
import static org.folio.support.kafka.topic.CirculationStorageKafkaTopic.RULES;

import java.util.Map;
Expand All @@ -19,6 +20,7 @@
import org.folio.rest.jaxrs.model.CirculationSetting;
import org.folio.rest.jaxrs.model.Loan;
import org.folio.rest.jaxrs.model.Request;
import org.folio.rest.jaxrs.model.RequestQueueReordering;

import io.vertx.core.Context;
import lombok.extern.log4j.Log4j2;
Expand Down Expand Up @@ -53,6 +55,15 @@ public static EntityChangedEventPublisher<String, Request> requestEventPublisher
new RequestRepository(vertxContext, okapiHeaders));
}

public static EntityChangedEventPublisher<String, RequestQueueReordering>
requestBatchEventPublisher(Context vertxContext, Map<String, String> okapiHeaders) {

return new EntityChangedEventPublisher<>(okapiHeaders, RequestQueueReordering::getInstanceId,
NULL_ID, new EntityChangedEventFactory<>(), new DomainEventPublisher<>(vertxContext,
REQUEST_QUEUE_REORDERING.fullTopicName(tenantId(okapiHeaders)),
FailureHandler.noOperation()), null);
}

public static EntityChangedEventPublisher<String, CheckIn> checkInEventPublisher(
Context vertxContext, Map<String, String> okapiHeaders) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,28 @@

import static java.util.stream.IntStream.rangeClosed;
import static org.folio.rest.persist.PostgresClient.convertToPsqlStandard;
import static org.folio.rest.tools.utils.TenantTool.tenantId;
import static org.folio.service.event.EntityChangedEventPublisherFactory.requestBatchEventPublisher;
import static org.folio.support.ModuleConstants.REQUEST_TABLE;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.folio.rest.jaxrs.model.Request;
import org.folio.rest.jaxrs.model.RequestQueueReordering;
import org.folio.rest.persist.PgUtil;
import org.folio.rest.persist.SQLConnection;
import org.folio.service.BatchResourceService;
import org.folio.service.event.EntityChangedEventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.sqlclient.Row;
Expand All @@ -29,12 +36,13 @@ public class RequestBatchResourceService {

private final BatchResourceService batchResourceService;
private final String tenantName;
private final EntityChangedEventPublisher<String, RequestQueueReordering> eventPublisher;

public RequestBatchResourceService(String tenantName,
BatchResourceService batchResourceService) {

this.batchResourceService = batchResourceService;
this.tenantName = tenantName;
public RequestBatchResourceService(Context context, Map<String, String> okapiHeaders) {
this.batchResourceService = new BatchResourceService(PgUtil.postgresClient(context,
okapiHeaders));
this.tenantName = tenantId(okapiHeaders);
this.eventPublisher = requestBatchEventPublisher(context, okapiHeaders);
}

/**
Expand All @@ -59,8 +67,8 @@ public RequestBatchResourceService(String tenantName,
* @param requests - List of requests to execute in batch.
* @param onFinishHandler - Callback function.
*/
public void executeRequestBatchUpdate(
List<Request> requests, Handler<AsyncResult<Void>> onFinishHandler) {
public void executeRequestBatchUpdate(List<Request> requests,
Handler<AsyncResult<Void>> onFinishHandler) {

LOG.debug("Removing positions for all request to go through positions constraint");
List<Function<SQLConnection, Future<RowSet<Row>>>> allDatabaseOperations =
Expand All @@ -80,7 +88,21 @@ public void executeRequestBatchUpdate(
LOG.info("Executing batch update, total records to update [{}] (including remove positions)",
allDatabaseOperations.size());

batchResourceService.executeBatchUpdate(allDatabaseOperations, onFinishHandler);
RequestQueueReordering payload = mapRequestsToPayload(requests);
LOG.info("executeRequestBatchUpdate:: instanceId: {}, requests: {}",
payload.getInstanceId(), payload.getRequestIds());

batchResourceService.executeBatchUpdate(allDatabaseOperations, onFinishHandler)
.compose(v -> eventPublisher.publishCreated(payload.getInstanceId(), payload));
}

private RequestQueueReordering mapRequestsToPayload(List<Request> requests) {

return new RequestQueueReordering()
.withRequestIds(requests.stream()
.map(Request::getId)
.toList())
.withInstanceId(requests.get(0).getInstanceId());
}

private Function<SQLConnection, Future<RowSet<Row>>> removePositionsForRequestsBatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

public enum CirculationStorageKafkaTopic implements KafkaTopic {
REQUEST("request", 10),
REQUEST_QUEUE_REORDERING("request-queue-reordering", 10),
CIRCULATION_SETTINGS("circulation-settings", 10),
LOAN("loan", 10),
CHECK_IN("check-in", 10),
Expand Down
Loading

0 comments on commit b35fa1d

Please sign in to comment.