diff --git a/example/apache-ignite/src/main/scala/org/finos/vuu/example/ignite/IgniteLocalConfig.scala b/example/apache-ignite/src/main/scala/org/finos/vuu/example/ignite/IgniteLocalConfig.scala index 89fdb0478..7662f89ca 100644 --- a/example/apache-ignite/src/main/scala/org/finos/vuu/example/ignite/IgniteLocalConfig.scala +++ b/example/apache-ignite/src/main/scala/org/finos/vuu/example/ignite/IgniteLocalConfig.scala @@ -1,23 +1,83 @@ package org.finos.vuu.example.ignite +import org.apache.ignite.cache.{QueryEntity, QueryIndex, QueryIndexType} import org.apache.ignite.configuration.{CacheConfiguration, DataStorageConfiguration, IgniteConfiguration} +import org.apache.ignite.kubernetes.configuration.KubernetesConnectionConfiguration +import org.apache.ignite.logger.slf4j.Slf4jLogger +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi +import org.apache.ignite.spi.discovery.tcp.ipfinder.kubernetes.TcpDiscoveryKubernetesIpFinder +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder +import org.finos.vuu.core.module.simul.model.ChildOrder +import org.finos.vuu.example.ignite.IgniteLocalConfig.{backupCount, childOrderCacheName, igniteWorkDir, logger, parentOrderCacheName} +import org.slf4j.LoggerFactory import org.finos.vuu.example.ignite.schema.IgniteChildOrderEntity import java.nio.file.Paths -import java.util.concurrent.atomic.AtomicBoolean import scala.jdk.CollectionConverters.IterableHasAsJava object IgniteLocalConfig { + private val logger = LoggerFactory.getLogger(IgniteLocalConfig.getClass) val parentOrderCacheName = "ParentOrders" val childOrderCacheName = "ChildOrders" - private val persistenceEnabled = new AtomicBoolean() - def create(clientMode: Boolean): IgniteConfiguration = { + def create(k8sEnvironment: Boolean = isK8s, + clientMode: Boolean = true, + persistenceEnabled: Boolean = false): IgniteLocalConfig = { + logger.info("K8s enabled : {}", k8sEnvironment) + if (k8sEnvironment) { + createConfig(k8sDiscovery(), clientMode, persistenceEnabled) + } else { + createConfig(localDiscovery(), clientMode, persistenceEnabled) + } + } + + private def createConfig(tcpDiscoverySpi: TcpDiscoverySpi = localDiscovery(), clientMode: Boolean, persistenceEnabled: Boolean): IgniteLocalConfig = { + new IgniteLocalConfig(clientMode, persistenceEnabled, tcpDiscoverySpi) + } + + private def localDiscovery(): TcpDiscoverySpi = { + val ipFinder = new TcpDiscoveryVmIpFinder() + ipFinder.setAddresses(List("127.0.0.1").asJavaCollection) + val tcpDiscoverySpi = new TcpDiscoverySpi().setIpFinder(ipFinder) + + tcpDiscoverySpi + } + + private def k8sDiscovery(): TcpDiscoverySpi = { + logger.info("Creating K8S config, Service : {}, NameSpace : {}", k8sServiceName, k8sServiceNamespace) + val k8sConnectionConfig = new KubernetesConnectionConfiguration() + k8sConnectionConfig.setNamespace(k8sServiceNamespace) + k8sConnectionConfig.setServiceName(k8sServiceName) + val ipFinder = new TcpDiscoveryKubernetesIpFinder(k8sConnectionConfig) + val tcpDiscoverySpi = new TcpDiscoverySpi().setIpFinder(ipFinder) + + tcpDiscoverySpi + } + + private def isK8s: Boolean = "true" == Option(System.getenv("KUBERNETES-ENV")).getOrElse("false").toLowerCase + + private def k8sServiceNamespace: String = System.getenv("NAMESPACE") + + private def k8sServiceName: String = System.getenv("SERVICE-NAME") + + private def igniteWorkDir: String = Option(System.getenv("IGNITE-WORKDIR")).getOrElse(System.getProperty("java.io.tmpdir")) + + private def backupCount: Integer = Option(System.getenv("BACKUP-COUNT")).map(it => Integer.valueOf(it)).getOrElse(0) + +} + + +class IgniteLocalConfig(private val clientMode: Boolean, + private val persistenceEnabled: Boolean, + private val tcpDiscoverySpi: TcpDiscoverySpi) { + def igniteConfiguration(): IgniteConfiguration = { + logger.info(s"Ignite Client mode = $clientMode, Persistence Enabled = $persistenceEnabled, TcpDiscovery = $tcpDiscoverySpi") val cfg = new IgniteConfiguration() + cfg.setGridLogger(new Slf4jLogger()) cfg.setClientMode(clientMode) cfg.setPeerClassLoadingEnabled(true) - cfg.setWorkDirectory(Paths.get("./target/ignite").toFile.getAbsolutePath) + cfg.setWorkDirectory(igniteWorkDir) cfg.setCacheConfiguration( createParentOrderCacheConfig(), @@ -28,11 +88,9 @@ object IgniteLocalConfig { createDataStorageConfig() ) - cfg - } + cfg.setDiscoverySpi(tcpDiscoverySpi) - def setPersistenceEnabled(enabled: Boolean): Unit = { - persistenceEnabled.set(enabled) + cfg } private def createChildOrderCacheConfig(): CacheConfiguration[?, ?] = { @@ -41,17 +99,19 @@ object IgniteLocalConfig { val queryEntity = IgniteChildOrderEntity.buildQueryEntity cacheConfiguration.setQueryEntities(List(queryEntity).asJavaCollection) cacheConfiguration.setName(childOrderCacheName) + cacheConfiguration.setBackups(backupCount) } private def createParentOrderCacheConfig(): CacheConfiguration[?, ?] = { val cacheConfiguration = new CacheConfiguration() cacheConfiguration.setName(parentOrderCacheName) + cacheConfiguration.setBackups(backupCount) } private def createDataStorageConfig(): DataStorageConfiguration = { val storageConfiguration = new DataStorageConfiguration() - storageConfiguration.getDefaultDataRegionConfiguration.setPersistenceEnabled(persistenceEnabled.get()) + storageConfiguration.getDefaultDataRegionConfiguration.setPersistenceEnabled(persistenceEnabled) storageConfiguration } diff --git a/example/apache-ignite/src/main/scala/org/finos/vuu/example/ignite/IgniteOrderStore.scala b/example/apache-ignite/src/main/scala/org/finos/vuu/example/ignite/IgniteOrderStore.scala index 1429166d9..e331b1aa1 100644 --- a/example/apache-ignite/src/main/scala/org/finos/vuu/example/ignite/IgniteOrderStore.scala +++ b/example/apache-ignite/src/main/scala/org/finos/vuu/example/ignite/IgniteOrderStore.scala @@ -21,9 +21,8 @@ object IgniteOrderStore { * @return an instance of IgniteOrderStore */ def apply(clientMode: Boolean = true, persistenceEnabled: Boolean = false): IgniteOrderStore = { - IgniteLocalConfig.setPersistenceEnabled(persistenceEnabled) - val config = IgniteLocalConfig.create(clientMode = clientMode) - val ignite = Ignition.getOrStart(config) + val config = IgniteLocalConfig.create(clientMode = clientMode, persistenceEnabled = persistenceEnabled) + val ignite = Ignition.getOrStart(config.igniteConfiguration()) ignite.cluster().state(ClusterState.ACTIVE) diff --git a/example/apache-ignite/src/main/scala/org/finos/vuu/example/ignite/StartIgniteMain.scala b/example/apache-ignite/src/main/scala/org/finos/vuu/example/ignite/StartIgniteMain.scala index 3520f3b92..8d6c937c2 100644 --- a/example/apache-ignite/src/main/scala/org/finos/vuu/example/ignite/StartIgniteMain.scala +++ b/example/apache-ignite/src/main/scala/org/finos/vuu/example/ignite/StartIgniteMain.scala @@ -3,7 +3,6 @@ package org.finos.vuu.example.ignite import org.apache.ignite.Ignition object StartIgniteMain extends App { - IgniteLocalConfig.setPersistenceEnabled(false) - val configuration = IgniteLocalConfig.create(false) + val configuration = IgniteLocalConfig.create(clientMode = false).igniteConfiguration() val ignite = Ignition.getOrStart(configuration) }