Skip to content

Commit

Permalink
MODINV-986: step 1: prepare InstanceIngestConsumerVerticle re-using o…
Browse files Browse the repository at this point in the history
…ther consumer verticles logic
  • Loading branch information
PBobylev committed Apr 22, 2024
1 parent 2019a17 commit 6dbaabf
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 341 deletions.
119 changes: 21 additions & 98 deletions src/main/java/org/folio/inventory/DataImportConsumerVerticle.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.folio.inventory;

import static org.folio.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED;
import static org.folio.DataImportEventTypes.DI_INVENTORY_HOLDING_CREATED;
import static org.folio.DataImportEventTypes.DI_INVENTORY_HOLDING_MATCHED;
import static org.folio.DataImportEventTypes.DI_INVENTORY_HOLDING_NOT_MATCHED;
Expand All @@ -15,6 +16,7 @@
import static org.folio.DataImportEventTypes.DI_MARC_FOR_UPDATE_RECEIVED;
import static org.folio.DataImportEventTypes.DI_PENDING_ORDER_CREATED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_CREATED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_DELETED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_MODIFIED_READY_FOR_POST_PROCESSING;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_NOT_MATCHED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_BIB_RECORD_MATCHED;
Expand All @@ -25,45 +27,21 @@
import static org.folio.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_RECORD_MODIFIED_READY_FOR_POST_PROCESSING;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_RECORD_NOT_MATCHED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_HOLDING_RECORD_CREATED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_DELETED;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_ENV;
import static org.folio.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_HOST;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_MAX_REQUEST_SIZE;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_PORT;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_REPLICATION_FACTOR;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.OKAPI_URL;

import java.util.ArrayList;
import io.vertx.core.Promise;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.DataImportEventTypes;
import org.folio.inventory.consortium.cache.ConsortiumDataCache;
import org.folio.inventory.dataimport.cache.MappingMetadataCache;
import org.folio.inventory.dataimport.cache.ProfileSnapshotCache;
import org.folio.inventory.dataimport.consumers.DataImportKafkaHandler;
import org.folio.inventory.dataimport.util.ConsumerWrapperUtil;
import org.folio.inventory.storage.Storage;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.kafka.GlobalLoadSensor;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaConsumerWrapper;
import org.folio.kafka.KafkaTopicNameHelper;
import org.folio.kafka.SubscriptionDefinition;
import org.folio.inventory.support.KafkaConsumerVerticle;
import org.folio.okapi.common.GenericCompositeFuture;
import org.folio.processing.events.EventManager;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpClient;
import io.vertx.core.json.JsonObject;

public class DataImportConsumerVerticle extends AbstractVerticle {
public class DataImportConsumerVerticle extends KafkaConsumerVerticle {

private static final Logger LOGGER = LogManager.getLogger(DataImportConsumerVerticle.class);

Expand Down Expand Up @@ -97,89 +75,34 @@ public class DataImportConsumerVerticle extends AbstractVerticle {
DI_PENDING_ORDER_CREATED
);

private final int loadLimit = getLoadLimit();
private final int maxDistributionNumber = getMaxDistributionNumber();
private final List<KafkaConsumerWrapper<String, String>> consumerWrappers = new ArrayList<>();

@Override
public void start(Promise<Void> startPromise) {
JsonObject config = vertx.getOrCreateContext().config();
KafkaConfig kafkaConfig = KafkaConfig.builder()
.envId(config.getString(KAFKA_ENV))
.kafkaHost(config.getString(KAFKA_HOST))
.kafkaPort(config.getString(KAFKA_PORT))
.okapiUrl(config.getString(OKAPI_URL))
.replicationFactor(Integer.parseInt(config.getString(KAFKA_REPLICATION_FACTOR)))
.maxRequestSize(Integer.parseInt(config.getString(KAFKA_MAX_REQUEST_SIZE)))
.build();
LOGGER.info("kafkaConfig: {}", kafkaConfig);
EventManager.registerKafkaEventPublisher(kafkaConfig, vertx, maxDistributionNumber);

HttpClient client = vertx.createHttpClient();
Storage storage = Storage.basedUpon(config, client);
EventManager.registerKafkaEventPublisher(getKafkaConfig(), vertx, getMaxDistributionNumber());

String profileSnapshotExpirationTime = getCacheEnvVariable(config, "inventory.profile-snapshot-cache.expiration.time.seconds");
String mappingMetadataExpirationTime = getCacheEnvVariable(config, "inventory.mapping-metadata-cache.expiration.time.seconds");
var profileSnapshotExpirationTime = getCacheEnvVariable(getConfig(), "inventory.profile-snapshot-cache.expiration.time.seconds");

ProfileSnapshotCache profileSnapshotCache = new ProfileSnapshotCache(vertx, client, Long.parseLong(profileSnapshotExpirationTime));
MappingMetadataCache mappingMetadataCache = new MappingMetadataCache(vertx, client, Long.parseLong(mappingMetadataExpirationTime));
ConsortiumDataCache consortiumDataCache = new ConsortiumDataCache(vertx, client);
var profileSnapshotCache = new ProfileSnapshotCache(vertx, getHttpClient(), Long.parseLong(profileSnapshotExpirationTime));
var consortiumDataCache = new ConsortiumDataCache(vertx, getHttpClient());

DataImportKafkaHandler dataImportKafkaHandler = new DataImportKafkaHandler(
vertx, storage, client, profileSnapshotCache, kafkaConfig, mappingMetadataCache, consortiumDataCache);
var dataImportKafkaHandler = new DataImportKafkaHandler(vertx, getStorage(), getHttpClient(), profileSnapshotCache,
getKafkaConfig(), getMappingMetadataCache(), consortiumDataCache);

List<Future<KafkaConsumerWrapper<String, String>>> futures = EVENT_TYPES.stream()
.map(eventType -> createKafkaConsumerWrapper(kafkaConfig, eventType, dataImportKafkaHandler))
.collect(Collectors.toList());
var futures = EVENT_TYPES.stream()
.map(eventType -> {
var consumerWrapper = super.createConsumer(eventType.value());
return consumerWrapper.start(dataImportKafkaHandler, ConsumerWrapperUtil.constructModuleName())
.map(consumerWrapper);
})
.toList();

GenericCompositeFuture.all(futures)
.onFailure(startPromise::fail)
.onSuccess(ar -> {
futures.forEach(future -> consumerWrappers.add(future.result()));
startPromise.complete();
});
.onSuccess(ar -> startPromise.complete());
}

@Override
public void stop(Promise<Void> stopPromise) {
List<Future<Void>> stopFutures = consumerWrappers.stream()
.map(KafkaConsumerWrapper::stop)
.collect(Collectors.toList());

GenericCompositeFuture.join(stopFutures).onComplete(ar -> stopPromise.complete());
}

private Future<KafkaConsumerWrapper<String, String>> createKafkaConsumerWrapper(KafkaConfig kafkaConfig, DataImportEventTypes eventType,
AsyncRecordHandler<String, String> recordHandler) {
SubscriptionDefinition subscriptionDefinition = KafkaTopicNameHelper.createSubscriptionDefinition(kafkaConfig.getEnvId(),
KafkaTopicNameHelper.getDefaultNameSpace(), eventType.value());

KafkaConsumerWrapper<String, String> consumerWrapper = KafkaConsumerWrapper.<String, String>builder()
.context(context)
.vertx(vertx)
.kafkaConfig(kafkaConfig)
.loadLimit(loadLimit)
.globalLoadSensor(new GlobalLoadSensor())
.subscriptionDefinition(subscriptionDefinition)
.build();

return consumerWrapper.start(recordHandler, ConsumerWrapperUtil.constructModuleName())
.map(consumerWrapper);
protected Logger getLogger() {
return LOGGER;
}

private int getLoadLimit() {
return Integer.parseInt(System.getProperty("inventory.kafka.DataImportConsumer.loadLimit", "5"));
}

private int getMaxDistributionNumber() {
return Integer.parseInt(System.getProperty("inventory.kafka.DataImportConsumerVerticle.maxDistributionNumber", "100"));
}

private String getCacheEnvVariable(JsonObject config, String variableName) {
String cacheExpirationTime = config.getString(variableName);
if (StringUtils.isBlank(cacheExpirationTime)) {
cacheExpirationTime = "3600";
}
return cacheExpirationTime;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.folio.inventory;

import static org.folio.inventory.dataimport.util.ConsumerWrapperUtil.constructModuleName;

import io.vertx.core.Promise;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.inventory.handler.InstanceIngestEventHandler;
import org.folio.inventory.support.KafkaConsumerVerticle;

public class InstanceIngestConsumerVerticle extends KafkaConsumerVerticle {

private static final String INSTANCE_INGEST_TOPIC = "inventory.instance_ingress";
private static final Logger LOGGER = LogManager.getLogger(InstanceIngestConsumerVerticle.class);

@Override
public void start(Promise<Void> startPromise) {
var instanceIngestEventHandler = new InstanceIngestEventHandler();

var consumerWrapper = createConsumer(INSTANCE_INGEST_TOPIC);

consumerWrapper.start(instanceIngestEventHandler, constructModuleName())
.onFailure(startPromise::fail)
.onSuccess(ar -> startPromise.complete());
}

@Override
protected Logger getLogger() {
return LOGGER;
}
}
8 changes: 8 additions & 0 deletions src/main/java/org/folio/inventory/Launcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class Launcher {
private static final String QUICK_MARC_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG = "inventory.kafka.QuickMarcConsumerVerticle.instancesNumber";
private static final String MARC_BIB_UPDATE_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG = "inventory.kafka.MarcBibUpdateConsumerVerticle.instancesNumber";
private static final String CONSORTIUM_INSTANCE_SHARING_CONSUMER_VERTICLE_NUMBER_CONFIG = "inventory.kafka.ConsortiumInstanceSharingConsumerVerticle.instancesNumber";
private static final String INSTANCE_INGRESS_VERTICLE_NUMBER_CONFIG = "inventory.kafka.InstanceIngestConsumerVerticle.instancesNumber";
private static final VertxAssistant vertxAssistant = new VertxAssistant();

private static String inventoryModuleDeploymentId;
Expand All @@ -33,6 +34,7 @@ public class Launcher {
private static String quickMarcConsumerVerticleDeploymentId;
private static String marcBibUpdateConsumerVerticleDeploymentId;
private static String consortiumInstanceSharingVerticleDeploymentId;
private static String instanceIngestConsumerVerticleDeploymentId;

public static void main(String[] args)
throws InterruptedException, ExecutionException, TimeoutException {
Expand Down Expand Up @@ -96,12 +98,14 @@ private static void startConsumerVerticles(Map<String, Object> consumerVerticles
int quickMarcConsumerVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(QUICK_MARC_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG, "1"));
int marcBibUpdateConsumerVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(MARC_BIB_UPDATE_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG, "3"));
int consortiumInstanceSharingVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(CONSORTIUM_INSTANCE_SHARING_CONSUMER_VERTICLE_NUMBER_CONFIG, "3"));
int instanceIngestConsumerVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(INSTANCE_INGRESS_VERTICLE_NUMBER_CONFIG, "3"));

CompletableFuture<String> future1 = new CompletableFuture<>();
CompletableFuture<String> future2 = new CompletableFuture<>();
CompletableFuture<String> future3 = new CompletableFuture<>();
CompletableFuture<String> future4 = new CompletableFuture<>();
CompletableFuture<String> future5 = new CompletableFuture<>();
CompletableFuture<String> future6 = new CompletableFuture<>();
vertxAssistant.deployVerticle(DataImportConsumerVerticle.class.getName(),
consumerVerticlesConfig, dataImportConsumerVerticleNumber, future1);
vertxAssistant.deployVerticle(MarcHridSetConsumerVerticle.class.getName(),
Expand All @@ -112,12 +116,15 @@ private static void startConsumerVerticles(Map<String, Object> consumerVerticles
consumerVerticlesConfig, marcBibUpdateConsumerVerticleNumber, future4);
vertxAssistant.deployVerticle(ConsortiumInstanceSharingConsumerVerticle.class.getName(),
consumerVerticlesConfig, consortiumInstanceSharingVerticleNumber, future5);
vertxAssistant.deployVerticle(InstanceIngestConsumerVerticle.class.getName(),
consumerVerticlesConfig, instanceIngestConsumerVerticleNumber, future6);

consumerVerticleDeploymentId = future1.get(20, TimeUnit.SECONDS);
marcInstHridSetConsumerVerticleDeploymentId = future2.get(20, TimeUnit.SECONDS);
quickMarcConsumerVerticleDeploymentId = future3.get(20, TimeUnit.SECONDS);
marcBibUpdateConsumerVerticleDeploymentId = future4.get(20, TimeUnit.SECONDS);
consortiumInstanceSharingVerticleDeploymentId = future5.get(20, TimeUnit.SECONDS);
instanceIngestConsumerVerticleDeploymentId = future6.get(20, TimeUnit.SECONDS);
}

private static void stop() {
Expand All @@ -133,6 +140,7 @@ private static void stop() {
.thenCompose(v -> vertxAssistant.undeployVerticle(quickMarcConsumerVerticleDeploymentId))
.thenCompose(v -> vertxAssistant.undeployVerticle(marcBibUpdateConsumerVerticleDeploymentId))
.thenCompose(v -> vertxAssistant.undeployVerticle(consortiumInstanceSharingVerticleDeploymentId))
.thenCompose(v -> vertxAssistant.undeployVerticle(instanceIngestConsumerVerticleDeploymentId))
.thenAccept(v -> vertxAssistant.stop(stopped));

stopped.thenAccept(v -> log.info("Server Stopped"));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,109 +1,35 @@
package org.folio.inventory;

import static org.folio.inventory.dataimport.util.ConsumerWrapperUtil.constructModuleName;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_ENV;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_HOST;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_MAX_REQUEST_SIZE;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_PORT;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_REPLICATION_FACTOR;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.OKAPI_URL;
import io.vertx.core.AbstractVerticle;

import io.vertx.core.Promise;
import io.vertx.core.http.HttpClient;
import io.vertx.core.json.JsonObject;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.inventory.dataimport.cache.MappingMetadataCache;
import org.folio.inventory.dataimport.consumers.MarcBibUpdateKafkaHandler;
import org.folio.inventory.dataimport.handlers.actions.InstanceUpdateDelegate;
import org.folio.inventory.storage.Storage;
import org.folio.kafka.GlobalLoadSensor;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaConsumerWrapper;
import org.folio.kafka.SubscriptionDefinition;
import org.folio.inventory.support.KafkaConsumerVerticle;

public class MarcBibUpdateConsumerVerticle extends AbstractVerticle {
public class MarcBibUpdateConsumerVerticle extends KafkaConsumerVerticle {
private static final Logger LOGGER = LogManager.getLogger(MarcBibUpdateConsumerVerticle.class);
private static final GlobalLoadSensor GLOBAL_LOAD_SENSOR = new GlobalLoadSensor();
private static final String SRS_MARC_BIB_TOPIC_NAME = "srs.marc-bib";
private static final String METADATA_EXPIRATION_TIME = "inventory.mapping-metadata-cache.expiration.time.seconds";
private final int loadLimit = getLoadLimit();
private KafkaConsumerWrapper<String, String> marcBibUpdateConsumerWrapper;
private static final String SRS_MARC_BIB_EVENT = "srs.marc-bib";

@Override
public void start(Promise<Void> startPromise) {
JsonObject config = vertx.getOrCreateContext().config();
KafkaConfig kafkaConfig = getKafkaConfig(config);

HttpClient client = vertx.createHttpClient();
Storage storage = Storage.basedUpon(config, client);
InstanceUpdateDelegate instanceUpdateDelegate = new InstanceUpdateDelegate(storage);
var instanceUpdateDelegate = new InstanceUpdateDelegate(getStorage());

var mappingMetadataExpirationTime = getCacheEnvVariable(config, METADATA_EXPIRATION_TIME);
MappingMetadataCache mappingMetadataCache = new MappingMetadataCache(vertx, client, Long.parseLong(mappingMetadataExpirationTime));
var marcBibUpdateKafkaHandler = new MarcBibUpdateKafkaHandler(vertx,
getMaxDistributionNumber(), getKafkaConfig(), instanceUpdateDelegate, getMappingMetadataCache());

MarcBibUpdateKafkaHandler marcBibUpdateKafkaHandler = new MarcBibUpdateKafkaHandler(vertx,
getMaxDistributionNumber(), kafkaConfig, instanceUpdateDelegate, mappingMetadataCache);
var marcBibUpdateConsumerWrapper = createConsumer(SRS_MARC_BIB_EVENT);

marcBibUpdateConsumerWrapper = createConsumer(kafkaConfig, SRS_MARC_BIB_TOPIC_NAME);
marcBibUpdateConsumerWrapper.start(marcBibUpdateKafkaHandler, constructModuleName())
.onFailure(startPromise::fail)
.onSuccess(ar -> startPromise.complete());
}

private KafkaConsumerWrapper<String, String> createConsumer(KafkaConfig kafkaConfig, String topicEventType) {
SubscriptionDefinition subscriptionDefinition = SubscriptionDefinition.builder()
.eventType(topicEventType)
.subscriptionPattern(formatSubscriptionPattern(kafkaConfig.getEnvId(), topicEventType))
.build();

return KafkaConsumerWrapper.<String, String>builder()
.context(context)
.vertx(vertx)
.kafkaConfig(kafkaConfig)
.loadLimit(loadLimit)
.globalLoadSensor(GLOBAL_LOAD_SENSOR)
.subscriptionDefinition(subscriptionDefinition)
.build();
}

@Override
public void stop(Promise<Void> stopPromise) {
marcBibUpdateConsumerWrapper.stop()
.onComplete(ar -> stopPromise.complete());
}

private int getLoadLimit() {
return Integer.parseInt(System.getProperty("inventory.kafka.MarcBibUpdateConsumer.loadLimit","5"));
protected Logger getLogger() {
return LOGGER;
}

private int getMaxDistributionNumber() {
return Integer.parseInt(System.getProperty("inventory.kafka.MarcBibUpdateConsumer.maxDistributionNumber", "100"));
}

private KafkaConfig getKafkaConfig(JsonObject config) {
KafkaConfig kafkaConfig = KafkaConfig.builder()
.envId(config.getString(KAFKA_ENV))
.kafkaHost(config.getString(KAFKA_HOST))
.kafkaPort(config.getString(KAFKA_PORT))
.okapiUrl(config.getString(OKAPI_URL))
.replicationFactor(Integer.parseInt(config.getString(KAFKA_REPLICATION_FACTOR)))
.maxRequestSize(Integer.parseInt(config.getString(KAFKA_MAX_REQUEST_SIZE)))
.build();
LOGGER.info("kafkaConfig: {}", kafkaConfig);
return kafkaConfig;
}

private String getCacheEnvVariable(JsonObject config, String variableName) {
String cacheExpirationTime = config.getString(variableName);
if (StringUtils.isBlank(cacheExpirationTime)) {
cacheExpirationTime = "3600";
}
return cacheExpirationTime;
}

public static String formatSubscriptionPattern(String env, String eventType) {
return String.join("\\.", env, "\\w{1,}", eventType);
}
}
Loading

0 comments on commit 6dbaabf

Please sign in to comment.