diff --git a/src/main/java/org/folio/inventory/DataImportConsumerVerticle.java b/src/main/java/org/folio/inventory/DataImportConsumerVerticle.java index ecb65db09..9226e37a8 100644 --- a/src/main/java/org/folio/inventory/DataImportConsumerVerticle.java +++ b/src/main/java/org/folio/inventory/DataImportConsumerVerticle.java @@ -1,5 +1,6 @@ package org.folio.inventory; +import static org.folio.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED; import static org.folio.DataImportEventTypes.DI_INVENTORY_HOLDING_CREATED; import static org.folio.DataImportEventTypes.DI_INVENTORY_HOLDING_MATCHED; import static org.folio.DataImportEventTypes.DI_INVENTORY_HOLDING_NOT_MATCHED; @@ -15,6 +16,7 @@ import static org.folio.DataImportEventTypes.DI_MARC_FOR_UPDATE_RECEIVED; import static org.folio.DataImportEventTypes.DI_PENDING_ORDER_CREATED; import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_CREATED; +import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_DELETED; import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_MODIFIED_READY_FOR_POST_PROCESSING; import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_NOT_MATCHED; import static org.folio.DataImportEventTypes.DI_SRS_MARC_BIB_RECORD_MATCHED; @@ -25,45 +27,21 @@ import static org.folio.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_RECORD_MODIFIED_READY_FOR_POST_PROCESSING; import static org.folio.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_RECORD_NOT_MATCHED; import static org.folio.DataImportEventTypes.DI_SRS_MARC_HOLDING_RECORD_CREATED; -import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_DELETED; -import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_ENV; -import static org.folio.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED; -import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_HOST; -import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_MAX_REQUEST_SIZE; -import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_PORT; -import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_REPLICATION_FACTOR; -import static org.folio.inventory.dataimport.util.KafkaConfigConstants.OKAPI_URL; -import java.util.ArrayList; +import io.vertx.core.Promise; import java.util.List; -import java.util.stream.Collectors; - -import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.folio.DataImportEventTypes; import org.folio.inventory.consortium.cache.ConsortiumDataCache; -import org.folio.inventory.dataimport.cache.MappingMetadataCache; import org.folio.inventory.dataimport.cache.ProfileSnapshotCache; import org.folio.inventory.dataimport.consumers.DataImportKafkaHandler; import org.folio.inventory.dataimport.util.ConsumerWrapperUtil; -import org.folio.inventory.storage.Storage; -import org.folio.kafka.AsyncRecordHandler; -import org.folio.kafka.GlobalLoadSensor; -import org.folio.kafka.KafkaConfig; -import org.folio.kafka.KafkaConsumerWrapper; -import org.folio.kafka.KafkaTopicNameHelper; -import org.folio.kafka.SubscriptionDefinition; +import org.folio.inventory.support.KafkaConsumerVerticle; import org.folio.okapi.common.GenericCompositeFuture; import org.folio.processing.events.EventManager; -import io.vertx.core.AbstractVerticle; -import io.vertx.core.Future; -import io.vertx.core.Promise; -import io.vertx.core.http.HttpClient; -import io.vertx.core.json.JsonObject; - -public class DataImportConsumerVerticle extends AbstractVerticle { +public class DataImportConsumerVerticle extends KafkaConsumerVerticle { private static final Logger LOGGER = LogManager.getLogger(DataImportConsumerVerticle.class); @@ -97,89 +75,34 @@ public class DataImportConsumerVerticle extends AbstractVerticle { DI_PENDING_ORDER_CREATED ); - private final int loadLimit = getLoadLimit(); - private final int maxDistributionNumber = getMaxDistributionNumber(); - private final List> consumerWrappers = new ArrayList<>(); - @Override public void start(Promise startPromise) { - JsonObject config = vertx.getOrCreateContext().config(); - KafkaConfig kafkaConfig = KafkaConfig.builder() - .envId(config.getString(KAFKA_ENV)) - .kafkaHost(config.getString(KAFKA_HOST)) - .kafkaPort(config.getString(KAFKA_PORT)) - .okapiUrl(config.getString(OKAPI_URL)) - .replicationFactor(Integer.parseInt(config.getString(KAFKA_REPLICATION_FACTOR))) - .maxRequestSize(Integer.parseInt(config.getString(KAFKA_MAX_REQUEST_SIZE))) - .build(); - LOGGER.info("kafkaConfig: {}", kafkaConfig); - EventManager.registerKafkaEventPublisher(kafkaConfig, vertx, maxDistributionNumber); - - HttpClient client = vertx.createHttpClient(); - Storage storage = Storage.basedUpon(config, client); + EventManager.registerKafkaEventPublisher(getKafkaConfig(), vertx, getMaxDistributionNumber()); - String profileSnapshotExpirationTime = getCacheEnvVariable(config, "inventory.profile-snapshot-cache.expiration.time.seconds"); - String mappingMetadataExpirationTime = getCacheEnvVariable(config, "inventory.mapping-metadata-cache.expiration.time.seconds"); + var profileSnapshotExpirationTime = getCacheEnvVariable(getConfig(), "inventory.profile-snapshot-cache.expiration.time.seconds"); - ProfileSnapshotCache profileSnapshotCache = new ProfileSnapshotCache(vertx, client, Long.parseLong(profileSnapshotExpirationTime)); - MappingMetadataCache mappingMetadataCache = new MappingMetadataCache(vertx, client, Long.parseLong(mappingMetadataExpirationTime)); - ConsortiumDataCache consortiumDataCache = new ConsortiumDataCache(vertx, client); + var profileSnapshotCache = new ProfileSnapshotCache(vertx, getHttpClient(), Long.parseLong(profileSnapshotExpirationTime)); + var consortiumDataCache = new ConsortiumDataCache(vertx, getHttpClient()); - DataImportKafkaHandler dataImportKafkaHandler = new DataImportKafkaHandler( - vertx, storage, client, profileSnapshotCache, kafkaConfig, mappingMetadataCache, consortiumDataCache); + var dataImportKafkaHandler = new DataImportKafkaHandler(vertx, getStorage(), getHttpClient(), profileSnapshotCache, + getKafkaConfig(), getMappingMetadataCache(), consortiumDataCache); - List>> futures = EVENT_TYPES.stream() - .map(eventType -> createKafkaConsumerWrapper(kafkaConfig, eventType, dataImportKafkaHandler)) - .collect(Collectors.toList()); + var futures = EVENT_TYPES.stream() + .map(eventType -> { + var consumerWrapper = super.createConsumer(eventType.value()); + return consumerWrapper.start(dataImportKafkaHandler, ConsumerWrapperUtil.constructModuleName()) + .map(consumerWrapper); + }) + .toList(); GenericCompositeFuture.all(futures) .onFailure(startPromise::fail) - .onSuccess(ar -> { - futures.forEach(future -> consumerWrappers.add(future.result())); - startPromise.complete(); - }); + .onSuccess(ar -> startPromise.complete()); } @Override - public void stop(Promise stopPromise) { - List> stopFutures = consumerWrappers.stream() - .map(KafkaConsumerWrapper::stop) - .collect(Collectors.toList()); - - GenericCompositeFuture.join(stopFutures).onComplete(ar -> stopPromise.complete()); - } - - private Future> createKafkaConsumerWrapper(KafkaConfig kafkaConfig, DataImportEventTypes eventType, - AsyncRecordHandler recordHandler) { - SubscriptionDefinition subscriptionDefinition = KafkaTopicNameHelper.createSubscriptionDefinition(kafkaConfig.getEnvId(), - KafkaTopicNameHelper.getDefaultNameSpace(), eventType.value()); - - KafkaConsumerWrapper consumerWrapper = KafkaConsumerWrapper.builder() - .context(context) - .vertx(vertx) - .kafkaConfig(kafkaConfig) - .loadLimit(loadLimit) - .globalLoadSensor(new GlobalLoadSensor()) - .subscriptionDefinition(subscriptionDefinition) - .build(); - - return consumerWrapper.start(recordHandler, ConsumerWrapperUtil.constructModuleName()) - .map(consumerWrapper); + protected Logger getLogger() { + return LOGGER; } - private int getLoadLimit() { - return Integer.parseInt(System.getProperty("inventory.kafka.DataImportConsumer.loadLimit", "5")); - } - - private int getMaxDistributionNumber() { - return Integer.parseInt(System.getProperty("inventory.kafka.DataImportConsumerVerticle.maxDistributionNumber", "100")); - } - - private String getCacheEnvVariable(JsonObject config, String variableName) { - String cacheExpirationTime = config.getString(variableName); - if (StringUtils.isBlank(cacheExpirationTime)) { - cacheExpirationTime = "3600"; - } - return cacheExpirationTime; - } } diff --git a/src/main/java/org/folio/inventory/InstanceIngestConsumerVerticle.java b/src/main/java/org/folio/inventory/InstanceIngestConsumerVerticle.java new file mode 100644 index 000000000..994fd1757 --- /dev/null +++ b/src/main/java/org/folio/inventory/InstanceIngestConsumerVerticle.java @@ -0,0 +1,31 @@ +package org.folio.inventory; + +import static org.folio.inventory.dataimport.util.ConsumerWrapperUtil.constructModuleName; + +import io.vertx.core.Promise; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.folio.inventory.handler.InstanceIngestEventHandler; +import org.folio.inventory.support.KafkaConsumerVerticle; + +public class InstanceIngestConsumerVerticle extends KafkaConsumerVerticle { + + private static final String INSTANCE_INGEST_TOPIC = "inventory.instance_ingress"; + private static final Logger LOGGER = LogManager.getLogger(InstanceIngestConsumerVerticle.class); + + @Override + public void start(Promise startPromise) { + var instanceIngestEventHandler = new InstanceIngestEventHandler(); + + var consumerWrapper = createConsumer(INSTANCE_INGEST_TOPIC); + + consumerWrapper.start(instanceIngestEventHandler, constructModuleName()) + .onFailure(startPromise::fail) + .onSuccess(ar -> startPromise.complete()); + } + + @Override + protected Logger getLogger() { + return LOGGER; + } +} diff --git a/src/main/java/org/folio/inventory/Launcher.java b/src/main/java/org/folio/inventory/Launcher.java index 0746df875..29a12273c 100644 --- a/src/main/java/org/folio/inventory/Launcher.java +++ b/src/main/java/org/folio/inventory/Launcher.java @@ -25,6 +25,7 @@ public class Launcher { private static final String QUICK_MARC_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG = "inventory.kafka.QuickMarcConsumerVerticle.instancesNumber"; private static final String MARC_BIB_UPDATE_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG = "inventory.kafka.MarcBibUpdateConsumerVerticle.instancesNumber"; private static final String CONSORTIUM_INSTANCE_SHARING_CONSUMER_VERTICLE_NUMBER_CONFIG = "inventory.kafka.ConsortiumInstanceSharingConsumerVerticle.instancesNumber"; + private static final String INSTANCE_INGRESS_VERTICLE_NUMBER_CONFIG = "inventory.kafka.InstanceIngestConsumerVerticle.instancesNumber"; private static final VertxAssistant vertxAssistant = new VertxAssistant(); private static String inventoryModuleDeploymentId; @@ -33,6 +34,7 @@ public class Launcher { private static String quickMarcConsumerVerticleDeploymentId; private static String marcBibUpdateConsumerVerticleDeploymentId; private static String consortiumInstanceSharingVerticleDeploymentId; + private static String instanceIngestConsumerVerticleDeploymentId; public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { @@ -96,12 +98,14 @@ private static void startConsumerVerticles(Map consumerVerticles int quickMarcConsumerVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(QUICK_MARC_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG, "1")); int marcBibUpdateConsumerVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(MARC_BIB_UPDATE_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG, "3")); int consortiumInstanceSharingVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(CONSORTIUM_INSTANCE_SHARING_CONSUMER_VERTICLE_NUMBER_CONFIG, "3")); + int instanceIngestConsumerVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(INSTANCE_INGRESS_VERTICLE_NUMBER_CONFIG, "3")); CompletableFuture future1 = new CompletableFuture<>(); CompletableFuture future2 = new CompletableFuture<>(); CompletableFuture future3 = new CompletableFuture<>(); CompletableFuture future4 = new CompletableFuture<>(); CompletableFuture future5 = new CompletableFuture<>(); + CompletableFuture future6 = new CompletableFuture<>(); vertxAssistant.deployVerticle(DataImportConsumerVerticle.class.getName(), consumerVerticlesConfig, dataImportConsumerVerticleNumber, future1); vertxAssistant.deployVerticle(MarcHridSetConsumerVerticle.class.getName(), @@ -112,12 +116,15 @@ private static void startConsumerVerticles(Map consumerVerticles consumerVerticlesConfig, marcBibUpdateConsumerVerticleNumber, future4); vertxAssistant.deployVerticle(ConsortiumInstanceSharingConsumerVerticle.class.getName(), consumerVerticlesConfig, consortiumInstanceSharingVerticleNumber, future5); + vertxAssistant.deployVerticle(InstanceIngestConsumerVerticle.class.getName(), + consumerVerticlesConfig, instanceIngestConsumerVerticleNumber, future6); consumerVerticleDeploymentId = future1.get(20, TimeUnit.SECONDS); marcInstHridSetConsumerVerticleDeploymentId = future2.get(20, TimeUnit.SECONDS); quickMarcConsumerVerticleDeploymentId = future3.get(20, TimeUnit.SECONDS); marcBibUpdateConsumerVerticleDeploymentId = future4.get(20, TimeUnit.SECONDS); consortiumInstanceSharingVerticleDeploymentId = future5.get(20, TimeUnit.SECONDS); + instanceIngestConsumerVerticleDeploymentId = future6.get(20, TimeUnit.SECONDS); } private static void stop() { @@ -133,6 +140,7 @@ private static void stop() { .thenCompose(v -> vertxAssistant.undeployVerticle(quickMarcConsumerVerticleDeploymentId)) .thenCompose(v -> vertxAssistant.undeployVerticle(marcBibUpdateConsumerVerticleDeploymentId)) .thenCompose(v -> vertxAssistant.undeployVerticle(consortiumInstanceSharingVerticleDeploymentId)) + .thenCompose(v -> vertxAssistant.undeployVerticle(instanceIngestConsumerVerticleDeploymentId)) .thenAccept(v -> vertxAssistant.stop(stopped)); stopped.thenAccept(v -> log.info("Server Stopped")); diff --git a/src/main/java/org/folio/inventory/MarcBibUpdateConsumerVerticle.java b/src/main/java/org/folio/inventory/MarcBibUpdateConsumerVerticle.java index 44aa43ad3..34a25e742 100644 --- a/src/main/java/org/folio/inventory/MarcBibUpdateConsumerVerticle.java +++ b/src/main/java/org/folio/inventory/MarcBibUpdateConsumerVerticle.java @@ -1,109 +1,35 @@ package org.folio.inventory; import static org.folio.inventory.dataimport.util.ConsumerWrapperUtil.constructModuleName; -import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_ENV; -import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_HOST; -import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_MAX_REQUEST_SIZE; -import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_PORT; -import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_REPLICATION_FACTOR; -import static org.folio.inventory.dataimport.util.KafkaConfigConstants.OKAPI_URL; -import io.vertx.core.AbstractVerticle; + import io.vertx.core.Promise; -import io.vertx.core.http.HttpClient; -import io.vertx.core.json.JsonObject; -import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.folio.inventory.dataimport.cache.MappingMetadataCache; import org.folio.inventory.dataimport.consumers.MarcBibUpdateKafkaHandler; import org.folio.inventory.dataimport.handlers.actions.InstanceUpdateDelegate; -import org.folio.inventory.storage.Storage; -import org.folio.kafka.GlobalLoadSensor; -import org.folio.kafka.KafkaConfig; -import org.folio.kafka.KafkaConsumerWrapper; -import org.folio.kafka.SubscriptionDefinition; +import org.folio.inventory.support.KafkaConsumerVerticle; -public class MarcBibUpdateConsumerVerticle extends AbstractVerticle { +public class MarcBibUpdateConsumerVerticle extends KafkaConsumerVerticle { private static final Logger LOGGER = LogManager.getLogger(MarcBibUpdateConsumerVerticle.class); - private static final GlobalLoadSensor GLOBAL_LOAD_SENSOR = new GlobalLoadSensor(); - private static final String SRS_MARC_BIB_TOPIC_NAME = "srs.marc-bib"; - private static final String METADATA_EXPIRATION_TIME = "inventory.mapping-metadata-cache.expiration.time.seconds"; - private final int loadLimit = getLoadLimit(); - private KafkaConsumerWrapper marcBibUpdateConsumerWrapper; + private static final String SRS_MARC_BIB_EVENT = "srs.marc-bib"; @Override public void start(Promise startPromise) { - JsonObject config = vertx.getOrCreateContext().config(); - KafkaConfig kafkaConfig = getKafkaConfig(config); - - HttpClient client = vertx.createHttpClient(); - Storage storage = Storage.basedUpon(config, client); - InstanceUpdateDelegate instanceUpdateDelegate = new InstanceUpdateDelegate(storage); + var instanceUpdateDelegate = new InstanceUpdateDelegate(getStorage()); - var mappingMetadataExpirationTime = getCacheEnvVariable(config, METADATA_EXPIRATION_TIME); - MappingMetadataCache mappingMetadataCache = new MappingMetadataCache(vertx, client, Long.parseLong(mappingMetadataExpirationTime)); + var marcBibUpdateKafkaHandler = new MarcBibUpdateKafkaHandler(vertx, + getMaxDistributionNumber(), getKafkaConfig(), instanceUpdateDelegate, getMappingMetadataCache()); - MarcBibUpdateKafkaHandler marcBibUpdateKafkaHandler = new MarcBibUpdateKafkaHandler(vertx, - getMaxDistributionNumber(), kafkaConfig, instanceUpdateDelegate, mappingMetadataCache); + var marcBibUpdateConsumerWrapper = createConsumer(SRS_MARC_BIB_EVENT); - marcBibUpdateConsumerWrapper = createConsumer(kafkaConfig, SRS_MARC_BIB_TOPIC_NAME); marcBibUpdateConsumerWrapper.start(marcBibUpdateKafkaHandler, constructModuleName()) .onFailure(startPromise::fail) .onSuccess(ar -> startPromise.complete()); } - private KafkaConsumerWrapper createConsumer(KafkaConfig kafkaConfig, String topicEventType) { - SubscriptionDefinition subscriptionDefinition = SubscriptionDefinition.builder() - .eventType(topicEventType) - .subscriptionPattern(formatSubscriptionPattern(kafkaConfig.getEnvId(), topicEventType)) - .build(); - - return KafkaConsumerWrapper.builder() - .context(context) - .vertx(vertx) - .kafkaConfig(kafkaConfig) - .loadLimit(loadLimit) - .globalLoadSensor(GLOBAL_LOAD_SENSOR) - .subscriptionDefinition(subscriptionDefinition) - .build(); - } - @Override - public void stop(Promise stopPromise) { - marcBibUpdateConsumerWrapper.stop() - .onComplete(ar -> stopPromise.complete()); - } - - private int getLoadLimit() { - return Integer.parseInt(System.getProperty("inventory.kafka.MarcBibUpdateConsumer.loadLimit","5")); + protected Logger getLogger() { + return LOGGER; } - private int getMaxDistributionNumber() { - return Integer.parseInt(System.getProperty("inventory.kafka.MarcBibUpdateConsumer.maxDistributionNumber", "100")); - } - - private KafkaConfig getKafkaConfig(JsonObject config) { - KafkaConfig kafkaConfig = KafkaConfig.builder() - .envId(config.getString(KAFKA_ENV)) - .kafkaHost(config.getString(KAFKA_HOST)) - .kafkaPort(config.getString(KAFKA_PORT)) - .okapiUrl(config.getString(OKAPI_URL)) - .replicationFactor(Integer.parseInt(config.getString(KAFKA_REPLICATION_FACTOR))) - .maxRequestSize(Integer.parseInt(config.getString(KAFKA_MAX_REQUEST_SIZE))) - .build(); - LOGGER.info("kafkaConfig: {}", kafkaConfig); - return kafkaConfig; - } - - private String getCacheEnvVariable(JsonObject config, String variableName) { - String cacheExpirationTime = config.getString(variableName); - if (StringUtils.isBlank(cacheExpirationTime)) { - cacheExpirationTime = "3600"; - } - return cacheExpirationTime; - } - - public static String formatSubscriptionPattern(String env, String eventType) { - return String.join("\\.", env, "\\w{1,}", eventType); - } } diff --git a/src/main/java/org/folio/inventory/MarcHridSetConsumerVerticle.java b/src/main/java/org/folio/inventory/MarcHridSetConsumerVerticle.java index 1bbaeea06..77618c085 100644 --- a/src/main/java/org/folio/inventory/MarcHridSetConsumerVerticle.java +++ b/src/main/java/org/folio/inventory/MarcHridSetConsumerVerticle.java @@ -1,73 +1,35 @@ package org.folio.inventory; -import io.vertx.core.AbstractVerticle; +import static org.folio.DataImportEventTypes.DI_SRS_MARC_BIB_INSTANCE_HRID_SET; +import static org.folio.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_HOLDING_HRID_SET; +import static org.folio.inventory.dataimport.util.ConsumerWrapperUtil.constructModuleName; + import io.vertx.core.CompositeFuture; import io.vertx.core.Promise; -import io.vertx.core.http.HttpClient; -import io.vertx.core.json.JsonObject; -import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.folio.DataImportEventTypes; -import org.folio.inventory.dataimport.cache.MappingMetadataCache; import org.folio.inventory.dataimport.consumers.MarcBibInstanceHridSetKafkaHandler; import org.folio.inventory.dataimport.consumers.MarcHoldingsRecordHridSetKafkaHandler; import org.folio.inventory.dataimport.handlers.actions.HoldingsUpdateDelegate; import org.folio.inventory.dataimport.handlers.actions.InstanceUpdateDelegate; import org.folio.inventory.services.HoldingsCollectionService; -import org.folio.inventory.storage.Storage; -import org.folio.kafka.GlobalLoadSensor; -import org.folio.kafka.KafkaConfig; -import org.folio.kafka.KafkaConsumerWrapper; -import org.folio.kafka.KafkaTopicNameHelper; -import org.folio.kafka.SubscriptionDefinition; - -import static org.folio.DataImportEventTypes.DI_SRS_MARC_BIB_INSTANCE_HRID_SET; -import static org.folio.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_HOLDING_HRID_SET; -import static org.folio.inventory.dataimport.util.ConsumerWrapperUtil.constructModuleName; -import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_ENV; -import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_HOST; -import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_MAX_REQUEST_SIZE; -import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_PORT; -import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_REPLICATION_FACTOR; -import static org.folio.inventory.dataimport.util.KafkaConfigConstants.OKAPI_URL; +import org.folio.inventory.support.KafkaConsumerVerticle; -public class MarcHridSetConsumerVerticle extends AbstractVerticle { +public class MarcHridSetConsumerVerticle extends KafkaConsumerVerticle { private static final Logger LOGGER = LogManager.getLogger(MarcHridSetConsumerVerticle.class); - private static final GlobalLoadSensor GLOBAL_LOAD_SENSOR = new GlobalLoadSensor(); - - private final int loadLimit = getLoadLimit(); - private KafkaConsumerWrapper marcBibConsumerWrapper; - private KafkaConsumerWrapper marcHoldingsConsumerWrapper; @Override public void start(Promise startPromise) { - JsonObject config = vertx.getOrCreateContext().config(); - KafkaConfig kafkaConfig = KafkaConfig.builder() - .envId(config.getString(KAFKA_ENV)) - .kafkaHost(config.getString(KAFKA_HOST)) - .kafkaPort(config.getString(KAFKA_PORT)) - .okapiUrl(config.getString(OKAPI_URL)) - .replicationFactor(Integer.parseInt(config.getString(KAFKA_REPLICATION_FACTOR))) - .maxRequestSize(Integer.parseInt(config.getString(KAFKA_MAX_REQUEST_SIZE))) - .build(); - LOGGER.info("kafkaConfig: {}", kafkaConfig); - - marcBibConsumerWrapper = createConsumerByEvent(kafkaConfig, DI_SRS_MARC_BIB_INSTANCE_HRID_SET); - marcHoldingsConsumerWrapper = createConsumerByEvent(kafkaConfig, DI_SRS_MARC_HOLDINGS_HOLDING_HRID_SET); + var marcBibConsumerWrapper = createConsumer(DI_SRS_MARC_BIB_INSTANCE_HRID_SET.value()); + var marcHoldingsConsumerWrapper = createConsumer(DI_SRS_MARC_HOLDINGS_HOLDING_HRID_SET.value()); - HttpClient client = vertx.createHttpClient(); - Storage storage = Storage.basedUpon(config, client); - HoldingsCollectionService holdingsCollectionService = new HoldingsCollectionService(); - InstanceUpdateDelegate instanceUpdateDelegate = new InstanceUpdateDelegate(storage); - HoldingsUpdateDelegate holdingsRecordUpdateDelegate = new HoldingsUpdateDelegate(storage, holdingsCollectionService); + var holdingsCollectionService = new HoldingsCollectionService(); + var instanceUpdateDelegate = new InstanceUpdateDelegate(getStorage()); + var holdingsRecordUpdateDelegate = new HoldingsUpdateDelegate(getStorage(), holdingsCollectionService); - String mappingMetadataExpirationTime = getCacheEnvVariable(config, "inventory.mapping-metadata-cache.expiration.time.seconds"); - MappingMetadataCache mappingMetadataCache = new MappingMetadataCache(vertx, client, Long.parseLong(mappingMetadataExpirationTime)); - - MarcBibInstanceHridSetKafkaHandler marcBibInstanceHridSetKafkaHandler = new MarcBibInstanceHridSetKafkaHandler(instanceUpdateDelegate, mappingMetadataCache); - MarcHoldingsRecordHridSetKafkaHandler marcHoldingsRecordHridSetKafkaHandler = new MarcHoldingsRecordHridSetKafkaHandler(holdingsRecordUpdateDelegate, mappingMetadataCache); + var marcBibInstanceHridSetKafkaHandler = new MarcBibInstanceHridSetKafkaHandler(instanceUpdateDelegate, getMappingMetadataCache()); + var marcHoldingsRecordHridSetKafkaHandler = new MarcHoldingsRecordHridSetKafkaHandler(holdingsRecordUpdateDelegate, getMappingMetadataCache()); CompositeFuture.all( marcBibConsumerWrapper.start(marcBibInstanceHridSetKafkaHandler, constructModuleName()), @@ -78,35 +40,8 @@ public void start(Promise startPromise) { } @Override - public void stop(Promise stopPromise) { - CompositeFuture.join(marcBibConsumerWrapper.stop(), marcHoldingsConsumerWrapper.stop()) - .onComplete(ar -> stopPromise.complete()); - } - - private int getLoadLimit() { - return Integer.parseInt(System.getProperty("inventory.kafka.MarcBibInstanceHridSetConsumer.loadLimit", "5")); - } - - private KafkaConsumerWrapper createConsumerByEvent(KafkaConfig kafkaConfig, DataImportEventTypes event) { - SubscriptionDefinition subscriptionDefinition = KafkaTopicNameHelper.createSubscriptionDefinition( - kafkaConfig.getEnvId(), - KafkaTopicNameHelper.getDefaultNameSpace(), event.value() - ); - return KafkaConsumerWrapper.builder() - .context(context) - .vertx(vertx) - .kafkaConfig(kafkaConfig) - .loadLimit(loadLimit) - .globalLoadSensor(GLOBAL_LOAD_SENSOR) - .subscriptionDefinition(subscriptionDefinition) - .build(); + protected Logger getLogger() { + return LOGGER; } - private String getCacheEnvVariable(JsonObject config, String variableName) { - String cacheExpirationTime = config.getString(variableName); - if (StringUtils.isBlank(cacheExpirationTime)) { - cacheExpirationTime = "3600"; - } - return cacheExpirationTime; - } } diff --git a/src/main/java/org/folio/inventory/QuickMarcConsumerVerticle.java b/src/main/java/org/folio/inventory/QuickMarcConsumerVerticle.java index e4ebdb934..b2b2a92ad 100644 --- a/src/main/java/org/folio/inventory/QuickMarcConsumerVerticle.java +++ b/src/main/java/org/folio/inventory/QuickMarcConsumerVerticle.java @@ -1,10 +1,6 @@ package org.folio.inventory; -import io.vertx.core.AbstractVerticle; -import io.vertx.core.Future; import io.vertx.core.Promise; -import io.vertx.core.http.HttpClient; -import io.vertx.core.json.JsonObject; import io.vertx.ext.web.client.WebClient; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -13,93 +9,30 @@ import org.folio.inventory.dataimport.handlers.actions.PrecedingSucceedingTitlesHelper; import org.folio.inventory.dataimport.util.ConsumerWrapperUtil; import org.folio.inventory.services.HoldingsCollectionService; -import org.folio.inventory.storage.Storage; -import org.folio.kafka.AsyncRecordHandler; -import org.folio.kafka.GlobalLoadSensor; -import org.folio.kafka.KafkaConfig; -import org.folio.kafka.KafkaConsumerWrapper; +import org.folio.inventory.support.KafkaConsumerVerticle; -import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_ENV; -import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_HOST; -import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_MAX_REQUEST_SIZE; -import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_PORT; -import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_REPLICATION_FACTOR; -import static org.folio.inventory.dataimport.util.KafkaConfigConstants.OKAPI_URL; -import static org.folio.kafka.KafkaTopicNameHelper.createSubscriptionDefinition; -import static org.folio.kafka.KafkaTopicNameHelper.getDefaultNameSpace; - -public class QuickMarcConsumerVerticle extends AbstractVerticle { +public class QuickMarcConsumerVerticle extends KafkaConsumerVerticle { private static final Logger LOGGER = LogManager.getLogger(QuickMarcConsumerVerticle.class); - private final int loadLimit = getLoadLimit(); - private final int maxDistributionNumber = getMaxDistributionNumber(); - private KafkaConsumerWrapper consumer; - @Override public void start(Promise startPromise) { - JsonObject config = vertx.getOrCreateContext().config(); - KafkaConfig kafkaConfig = getKafkaConfig(config); - - HttpClient client = vertx.createHttpClient(); - Storage storage = Storage.basedUpon(config, client); - - var precedingSucceedingTitlesHelper = new PrecedingSucceedingTitlesHelper(WebClient.wrap(client)); - HoldingsCollectionService holdingsCollectionService = new HoldingsCollectionService(); - QuickMarcKafkaHandler handler = new QuickMarcKafkaHandler(vertx, storage, maxDistributionNumber, kafkaConfig, - precedingSucceedingTitlesHelper, holdingsCollectionService); + var precedingSucceedingTitlesHelper = new PrecedingSucceedingTitlesHelper(WebClient.wrap(getHttpClient())); + var holdingsCollectionService = new HoldingsCollectionService(); + var handler = new QuickMarcKafkaHandler(vertx, getStorage(), getMaxDistributionNumber(), + getKafkaConfig(), precedingSucceedingTitlesHelper, holdingsCollectionService); - var kafkaConsumerFuture = createKafkaConsumer(kafkaConfig, QMEventTypes.QM_SRS_MARC_RECORD_UPDATED, handler); + var consumer = createConsumer(QMEventTypes.QM_SRS_MARC_RECORD_UPDATED.name()); - kafkaConsumerFuture + consumer.start(handler, ConsumerWrapperUtil.constructModuleName()) + .map(consumer) .onFailure(startPromise::fail) - .onSuccess(ar -> { - consumer = ar; - startPromise.complete(); - }); - } - - private KafkaConfig getKafkaConfig(JsonObject config) { - KafkaConfig kafkaConfig = KafkaConfig.builder() - .envId(config.getString(KAFKA_ENV)) - .kafkaHost(config.getString(KAFKA_HOST)) - .kafkaPort(config.getString(KAFKA_PORT)) - .okapiUrl(config.getString(OKAPI_URL)) - .replicationFactor(Integer.parseInt(config.getString(KAFKA_REPLICATION_FACTOR))) - .maxRequestSize(Integer.parseInt(config.getString(KAFKA_MAX_REQUEST_SIZE))) - .build(); - LOGGER.info("kafkaConfig: {}", kafkaConfig); - return kafkaConfig; + .onSuccess(ar -> startPromise.complete()); } @Override - public void stop(Promise stopPromise) { - consumer.stop().onComplete(ar -> stopPromise.complete()); + protected Logger getLogger() { + return LOGGER; } - private Future> createKafkaConsumer(KafkaConfig kafkaConfig, QMEventTypes eventType, - AsyncRecordHandler recordHandler) { - var subscriptionDefinition = createSubscriptionDefinition(kafkaConfig.getEnvId(), - getDefaultNameSpace(), eventType.name()); - - KafkaConsumerWrapper consumerWrapper = KafkaConsumerWrapper.builder() - .context(context) - .vertx(vertx) - .kafkaConfig(kafkaConfig) - .loadLimit(loadLimit) - .globalLoadSensor(new GlobalLoadSensor()) - .subscriptionDefinition(subscriptionDefinition) - .build(); - - return consumerWrapper.start(recordHandler, ConsumerWrapperUtil.constructModuleName()) - .map(consumerWrapper); - } - - private int getLoadLimit() { - return Integer.parseInt(System.getProperty("inventory.kafka.QuickMarcConsumer.loadLimit", "5")); - } - - private int getMaxDistributionNumber() { - return Integer.parseInt(System.getProperty("inventory.kafka.QuickMarcConsumerVerticle.maxDistributionNumber", "100")); - } } diff --git a/src/main/java/org/folio/inventory/handler/InstanceIngestEventHandler.java b/src/main/java/org/folio/inventory/handler/InstanceIngestEventHandler.java new file mode 100644 index 000000000..41cbed9b2 --- /dev/null +++ b/src/main/java/org/folio/inventory/handler/InstanceIngestEventHandler.java @@ -0,0 +1,21 @@ +package org.folio.inventory.handler; + +import io.vertx.core.Future; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.folio.kafka.AsyncRecordHandler; + +public class InstanceIngestEventHandler implements AsyncRecordHandler { + + private static final Logger LOGGER = LogManager.getLogger(InstanceIngestEventHandler.class); + + @Override + public Future handle(KafkaConsumerRecord kafkaConsumerRecord) { + // to extract and re-use common logic from CreateInstanceEventHandler + // 1. Change event; 2. Re-use all except: source type to be changed to BIBFRAME, DI event not to be sent + LOGGER.info("to be replaced with actual code in Step 2 of MODINV-986"); + return null; + } + +} diff --git a/src/main/java/org/folio/inventory/support/KafkaConsumerVerticle.java b/src/main/java/org/folio/inventory/support/KafkaConsumerVerticle.java new file mode 100644 index 000000000..fad00b4bd --- /dev/null +++ b/src/main/java/org/folio/inventory/support/KafkaConsumerVerticle.java @@ -0,0 +1,144 @@ +package org.folio.inventory.support; + +import static java.lang.Integer.parseInt; +import static java.lang.String.format; +import static java.lang.System.getProperty; +import static java.util.Optional.ofNullable; +import static org.apache.commons.lang.StringUtils.isBlank; +import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_ENV; +import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_HOST; +import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_MAX_REQUEST_SIZE; +import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_PORT; +import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_REPLICATION_FACTOR; +import static org.folio.inventory.dataimport.util.KafkaConfigConstants.OKAPI_URL; +import static org.folio.kafka.KafkaTopicNameHelper.createSubscriptionDefinition; +import static org.folio.kafka.KafkaTopicNameHelper.getDefaultNameSpace; + +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Verticle; +import io.vertx.core.http.HttpClient; +import io.vertx.core.json.JsonObject; +import java.util.ArrayList; +import java.util.List; +import org.apache.logging.log4j.Logger; +import org.folio.inventory.dataimport.cache.MappingMetadataCache; +import org.folio.inventory.storage.Storage; +import org.folio.kafka.GlobalLoadSensor; +import org.folio.kafka.KafkaConfig; +import org.folio.kafka.KafkaConsumerWrapper; +import org.folio.kafka.SubscriptionDefinition; +import org.folio.okapi.common.GenericCompositeFuture; + +public abstract class KafkaConsumerVerticle extends AbstractVerticle { + private static final String LOAD_LIMIT_TEMPLATE = "inventory.kafka.%s.loadLimit"; + private static final String LOAD_LIMIT_DEFAULT = "5"; + private static final String MAX_DISTRIBUTION_NUMBER_TEMPLATE = "inventory.kafka.%s.maxDistributionNumber"; + private static final String MAX_DISTRIBUTION_NUMBER_DEFAULT = "100"; + private static final String CACHE_EXPIRATION_DEFAULT = "3600"; + private static final String METADATA_EXPIRATION_TIME = "inventory.mapping-metadata-cache.expiration.time.seconds"; + private final List> consumerWrappers = new ArrayList<>(); + private KafkaConfig kafkaConfig; + private JsonObject config; + private HttpClient httpClient; + private Storage storage; + private MappingMetadataCache mappingMetadataCache; + + @Override + public void stop(Promise stopPromise) { + List> stopFutures = consumerWrappers.stream() + .map(KafkaConsumerWrapper::stop) + .toList(); + + GenericCompositeFuture.join(stopFutures) + .onComplete(ar -> stopPromise.complete()); + } + + protected abstract Logger getLogger(); + + protected KafkaConsumerWrapper createConsumer(String eventType) { + var kafkaConsumerWrapper = KafkaConsumerWrapper.builder() + .context(context) + .vertx(vertx) + .kafkaConfig(getKafkaConfig()) + .loadLimit(getLoadLimit()) + .globalLoadSensor(new GlobalLoadSensor()) + .subscriptionDefinition(getSubscriptionDefinition(getKafkaConfig().getEnvId(), eventType)) + .build(); + consumerWrappers.add(kafkaConsumerWrapper); + return kafkaConsumerWrapper; + } + + protected JsonObject getConfig() { + return ofNullable(config).orElseGet(() -> { + config = vertx.getOrCreateContext().config(); + return config; + }); + } + + protected KafkaConfig getKafkaConfig() { + return ofNullable(kafkaConfig).orElseGet(() -> { + kafkaConfig = KafkaConfig.builder() + .envId(getConfig().getString(KAFKA_ENV)) + .kafkaHost(getConfig().getString(KAFKA_HOST)) + .kafkaPort(getConfig().getString(KAFKA_PORT)) + .okapiUrl(getConfig().getString(OKAPI_URL)) + .replicationFactor(parseInt(getConfig().getString(KAFKA_REPLICATION_FACTOR))) + .maxRequestSize(parseInt(getConfig().getString(KAFKA_MAX_REQUEST_SIZE))) + .build(); + getLogger().info("kafkaConfig: {}", kafkaConfig); + return kafkaConfig; + }); + } + + protected HttpClient getHttpClient() { + return ofNullable(httpClient).orElseGet(() -> { + httpClient = vertx.createHttpClient(); + return httpClient; + }); + } + + protected Storage getStorage() { + return ofNullable(storage).orElseGet(() -> { + storage = Storage.basedUpon(getConfig(), vertx.createHttpClient()); + return storage; + }); + } + + protected MappingMetadataCache getMappingMetadataCache() { + return ofNullable(mappingMetadataCache).orElseGet(() -> { + var mappingMetadataExpirationTime = getCacheEnvVariable(getConfig(), METADATA_EXPIRATION_TIME); + mappingMetadataCache = new MappingMetadataCache(vertx, getHttpClient(), + Long.parseLong(mappingMetadataExpirationTime)); + return mappingMetadataCache; + }); + } + + protected String getCacheEnvVariable(JsonObject config, String variableName) { + var cacheExpirationTime = config.getString(variableName); + if (isBlank(cacheExpirationTime)) { + cacheExpirationTime = CACHE_EXPIRATION_DEFAULT; + } + return cacheExpirationTime; + } + + protected int getMaxDistributionNumber() { + return getConsumerProperty(MAX_DISTRIBUTION_NUMBER_TEMPLATE, MAX_DISTRIBUTION_NUMBER_DEFAULT); + } + + private SubscriptionDefinition getSubscriptionDefinition(String envId, String eventType) { + return createSubscriptionDefinition(envId, getDefaultNameSpace(), eventType); + } + + private int getLoadLimit() { + return getConsumerProperty(LOAD_LIMIT_TEMPLATE, LOAD_LIMIT_DEFAULT); + } + + private int getConsumerProperty(String nameTemplate, String defaultValue) { + var consumerClassName = getClass().getSimpleName(); + var cleanConsumerName = consumerClassName.substring(0, consumerClassName.indexOf(Verticle.class.getSimpleName())); + return parseInt(getProperty(format(nameTemplate, cleanConsumerName), defaultValue)); + } + +}