Skip to content

Commit

Permalink
MODINV-986: InstanceIngressEventConsumer and event model
Browse files Browse the repository at this point in the history
  • Loading branch information
PBobylev committed May 1, 2024
1 parent 6d6b7c0 commit 17d7598
Show file tree
Hide file tree
Showing 9 changed files with 246 additions and 24 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@
<path>${basedir}/ramls/holdings-record.json</path>
<path>${basedir}/ramls/holdings-records-source.json</path>
<path>${basedir}/ramls/mappingMetadataDto.json</path>
<path>${basedir}/ramls/instance-ingress-event.json</path>
</sourcePaths>
<targetPackage>org.folio</targetPackage>
<generateBuilders>true</generateBuilders>
Expand Down
92 changes: 92 additions & 0 deletions ramls/instance-ingress-event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Instance ingress event data model",
"javaType": "org.folio.rest.jaxrs.model.InstanceIngressEvent",
"type": "object",
"additionalProperties": false,
"properties": {
"id": {
"description": "UUID",
"$ref": "uuid.json"
},
"eventType": {
"type": "string",
"enum": ["CREATE_INSTANCE", "UPDATE_INSTANCE"],
"description": "Instance ingress event type"
},
"eventMetadata": {
"description": "Event metadata",
"type": "object",
"additionalProperties": false,
"properties": {
"tenantId": {
"description": "Tenant id",
"type": "string"
},
"eventTTL": {
"description": "Time-to-live (TTL) for event in minutes",
"type": "integer"
},
"correlationId": {
"description": "Id to track related events, can be a meaningful string or a UUID",
"type": "string"
},
"originalEventId": {
"description": "Id of the event that started the sequence of related events",
"$ref": "uuid.json"
},
"publisherCallback": {
"description": "Allows a publisher to provide a callback endpoint or an error Event Type to be notified that despite the fact that there are subscribers for such an event type no one has received the event within the specified period of time",
"type": "object",
"properties": {
"endpoint": {
"description": "Callback endpoint",
"type": "string"
},
"eventType": {
"description": "Error Event Type",
"type": "string"
}
}
},
"createdDate": {
"description": "Timestamp when event was created",
"type": "string",
"format": "date-time"
},
"publishedDate": {
"description": "Timestamp when event was initially published to the underlying topic",
"type": "string",
"format": "date-time"
},
"createdBy": {
"description": "Username of the user whose action caused an event",
"type": "string"
},
"publishedBy": {
"description": "Name and version of the module that published an event",
"type": "string"
}
},
"required": [
"tenantId",
"eventTTL",
"publishedBy"
]
},
"eventPayload": {
"type": "object",
"description": "An instance source record container",
"$ref": "instance-ingress-payload.json"
}
},
"excludedFromEqualsAndHashCode": [
"eventMetadata",
"eventPayload"
],
"required": [
"id",
"eventType",
"eventMetadata"
]
}
25 changes: 25 additions & 0 deletions ramls/instance-ingress-payload.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "An instance source record container",
"type": "object",
"properties": {
"sourceRecordIdentifier": {
"type": "string",
"description": "The source record identifier"
},
"sourceRecordObject": {
"type": "string",
"description": "The source record JSON object"
},
"sourceType": {
"type": "string",
"enum": ["BIBFRAME", "MARC", "FOLIO"],
"description": "Source type"
}
},
"additionalProperties": true,
"required": [
"sourceRecordObject",
"sourceType"
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import io.vertx.core.Promise;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.inventory.handler.InstanceIngressEventHandler;
import org.folio.inventory.instanceingress.InstanceIngressEventConsumer;
import org.folio.inventory.support.KafkaConsumerVerticle;

public class InstanceIngressConsumerVerticle extends KafkaConsumerVerticle {
Expand All @@ -15,7 +15,7 @@ public class InstanceIngressConsumerVerticle extends KafkaConsumerVerticle {

@Override
public void start(Promise<Void> startPromise) {
var instanceIngressEventHandler = new InstanceIngressEventHandler();
var instanceIngressEventHandler = new InstanceIngressEventConsumer(vertx);

var consumerWrapper = createConsumer(INSTANCE_INGRESS_TOPIC);

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package org.folio.inventory.instanceingress;

import static org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil.constructContext;
import static org.folio.kafka.KafkaHeaderUtils.kafkaHeadersToMap;
import static org.folio.rest.jaxrs.model.InstanceIngressEvent.EventType.CREATE_INSTANCE;
import static org.folio.rest.jaxrs.model.InstanceIngressEvent.EventType.UPDATE_INSTANCE;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.Json;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.inventory.common.Context;
import org.folio.inventory.instanceingress.handler.InstanceIngressCreateEventHandler;
import org.folio.inventory.instanceingress.handler.InstanceIngressEventHandler;
import org.folio.inventory.instanceingress.handler.InstanceIngressUpdateEventHandler;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.processing.exceptions.EventProcessingException;
import org.folio.rest.jaxrs.model.InstanceIngressEvent;
import org.folio.rest.jaxrs.model.InstanceIngressPayload;
import org.folio.rest.util.OkapiConnectionParams;

public class InstanceIngressEventConsumer implements AsyncRecordHandler<String, String> {

private static final Logger LOGGER = LogManager.getLogger(InstanceIngressEventConsumer.class);

private final Vertx vertx;

public InstanceIngressEventConsumer(Vertx vertx) {
this.vertx = vertx;
}

@Override
public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord) {
var params = new OkapiConnectionParams(kafkaHeadersToMap(consumerRecord.headers()), vertx);
var context = constructContext(params.getTenantId(), params.getToken(), params.getOkapiUrl());
var event = Json.decodeValue(consumerRecord.value(), InstanceIngressEvent.class);
LOGGER.info("Instance ingress event has been received with event type: {}", event.getEventType());
return
Future.succeededFuture(event.getEventPayload())
.compose(eventPayload -> processPayload(eventPayload, event.getEventType(), context)
.map(ar -> consumerRecord.key()), th -> {
LOGGER.error("Update record state was failed while handle event, {}", th.getMessage());
return Future.failedFuture(th.getMessage());
});
}

private Future<InstanceIngressEvent.EventType> processPayload(InstanceIngressPayload eventPayload,
InstanceIngressEvent.EventType eventType,
Context context) {
try {
Promise<InstanceIngressEvent.EventType> promise = Promise.promise();
var handler = getInstanceIngressEventHandler(eventType, context).handle(eventPayload);
handler.whenComplete((res, ex) -> {
if (ex != null) {
promise.fail(ex);
} else {
promise.complete(eventType);
}
});

return promise.future();
} catch (Exception e) {
LOGGER.warn("Error during processPayload: ", e);
return Future.failedFuture(e);
}
}

private InstanceIngressEventHandler getInstanceIngressEventHandler(InstanceIngressEvent.EventType eventType,
Context context) {
if (eventType == CREATE_INSTANCE) {
return new InstanceIngressCreateEventHandler(context);
} else if (eventType == UPDATE_INSTANCE) {
return new InstanceIngressUpdateEventHandler(context);
} else {
LOGGER.warn("Can't process eventType {}", eventType);
throw new EventProcessingException("Can't process eventType " + eventType);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.folio.inventory.instanceingress.handler;

import java.util.concurrent.CompletableFuture;
import org.folio.inventory.common.Context;
import org.folio.rest.jaxrs.model.InstanceIngressPayload;

public class InstanceIngressCreateEventHandler implements InstanceIngressEventHandler {

public InstanceIngressCreateEventHandler(Context context) {

}

@Override
public CompletableFuture<InstanceIngressPayload> handle(InstanceIngressPayload payload) {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.folio.inventory.instanceingress.handler;

import java.util.concurrent.CompletableFuture;
import org.folio.rest.jaxrs.model.InstanceIngressPayload;

public interface InstanceIngressEventHandler {

CompletableFuture<InstanceIngressPayload> handle(InstanceIngressPayload payload);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.folio.inventory.instanceingress.handler;

import java.util.concurrent.CompletableFuture;
import org.folio.inventory.common.Context;
import org.folio.rest.jaxrs.model.InstanceIngressPayload;

public class InstanceIngressUpdateEventHandler implements InstanceIngressEventHandler {

public InstanceIngressUpdateEventHandler(Context context) {

}

@Override
public CompletableFuture<InstanceIngressPayload> handle(InstanceIngressPayload payload) {
return null;
}
}

0 comments on commit 17d7598

Please sign in to comment.