Skip to content

Commit

Permalink
Updates for apache ignite config
Browse files Browse the repository at this point in the history
  • Loading branch information
rumakt committed Feb 16, 2024
1 parent 4825379 commit fbeaa16
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,23 +1,83 @@
package org.finos.vuu.example.ignite

import org.apache.ignite.cache.{QueryEntity, QueryIndex, QueryIndexType}
import org.apache.ignite.configuration.{CacheConfiguration, DataStorageConfiguration, IgniteConfiguration}
import org.apache.ignite.kubernetes.configuration.KubernetesConnectionConfiguration
import org.apache.ignite.logger.slf4j.Slf4jLogger
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi
import org.apache.ignite.spi.discovery.tcp.ipfinder.kubernetes.TcpDiscoveryKubernetesIpFinder
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder
import org.finos.vuu.core.module.simul.model.ChildOrder
import org.finos.vuu.example.ignite.IgniteLocalConfig.{backupCount, childOrderCacheName, igniteWorkDir, logger, parentOrderCacheName}
import org.slf4j.LoggerFactory
import org.finos.vuu.example.ignite.schema.IgniteChildOrderEntity

import java.nio.file.Paths
import java.util.concurrent.atomic.AtomicBoolean
import scala.jdk.CollectionConverters.IterableHasAsJava

object IgniteLocalConfig {
private val logger = LoggerFactory.getLogger(IgniteLocalConfig.getClass)
val parentOrderCacheName = "ParentOrders"
val childOrderCacheName = "ChildOrders"
private val persistenceEnabled = new AtomicBoolean()

def create(clientMode: Boolean): IgniteConfiguration = {
def create(k8sEnvironment: Boolean = isK8s,
clientMode: Boolean = true,
persistenceEnabled: Boolean = false): IgniteLocalConfig = {
logger.info("K8s enabled : {}", k8sEnvironment)
if (k8sEnvironment) {
createConfig(k8sDiscovery(), clientMode, persistenceEnabled)
} else {
createConfig(localDiscovery(), clientMode, persistenceEnabled)
}
}

private def createConfig(tcpDiscoverySpi: TcpDiscoverySpi = localDiscovery(), clientMode: Boolean, persistenceEnabled: Boolean): IgniteLocalConfig = {
new IgniteLocalConfig(clientMode, persistenceEnabled, tcpDiscoverySpi)
}

private def localDiscovery(): TcpDiscoverySpi = {
val ipFinder = new TcpDiscoveryVmIpFinder()
ipFinder.setAddresses(List("127.0.0.1").asJavaCollection)
val tcpDiscoverySpi = new TcpDiscoverySpi().setIpFinder(ipFinder)

tcpDiscoverySpi
}

private def k8sDiscovery(): TcpDiscoverySpi = {
logger.info("Creating K8S config, Service : {}, NameSpace : {}", k8sServiceName, k8sServiceNamespace)
val k8sConnectionConfig = new KubernetesConnectionConfiguration()
k8sConnectionConfig.setNamespace(k8sServiceNamespace)
k8sConnectionConfig.setServiceName(k8sServiceName)
val ipFinder = new TcpDiscoveryKubernetesIpFinder(k8sConnectionConfig)
val tcpDiscoverySpi = new TcpDiscoverySpi().setIpFinder(ipFinder)

tcpDiscoverySpi
}

private def isK8s: Boolean = "true" == Option(System.getenv("KUBERNETES-ENV")).getOrElse("false").toLowerCase

private def k8sServiceNamespace: String = System.getenv("NAMESPACE")

private def k8sServiceName: String = System.getenv("SERVICE-NAME")

private def igniteWorkDir: String = Option(System.getenv("IGNITE-WORKDIR")).getOrElse(System.getProperty("java.io.tmpdir"))

private def backupCount: Integer = Option(System.getenv("BACKUP-COUNT")).map(it => Integer.valueOf(it)).getOrElse(0)

}


class IgniteLocalConfig(private val clientMode: Boolean,
private val persistenceEnabled: Boolean,
private val tcpDiscoverySpi: TcpDiscoverySpi) {
def igniteConfiguration(): IgniteConfiguration = {
logger.info(s"Ignite Client mode = $clientMode, Persistence Enabled = $persistenceEnabled, TcpDiscovery = $tcpDiscoverySpi")
val cfg = new IgniteConfiguration()

cfg.setGridLogger(new Slf4jLogger())
cfg.setClientMode(clientMode)
cfg.setPeerClassLoadingEnabled(true)
cfg.setWorkDirectory(Paths.get("./target/ignite").toFile.getAbsolutePath)
cfg.setWorkDirectory(igniteWorkDir)

cfg.setCacheConfiguration(
createParentOrderCacheConfig(),
Expand All @@ -28,11 +88,9 @@ object IgniteLocalConfig {
createDataStorageConfig()
)

cfg
}
cfg.setDiscoverySpi(tcpDiscoverySpi)

def setPersistenceEnabled(enabled: Boolean): Unit = {
persistenceEnabled.set(enabled)
cfg
}

private def createChildOrderCacheConfig(): CacheConfiguration[?, ?] = {
Expand All @@ -41,17 +99,19 @@ object IgniteLocalConfig {
val queryEntity = IgniteChildOrderEntity.buildQueryEntity
cacheConfiguration.setQueryEntities(List(queryEntity).asJavaCollection)
cacheConfiguration.setName(childOrderCacheName)
cacheConfiguration.setBackups(backupCount)
}

private def createParentOrderCacheConfig(): CacheConfiguration[?, ?] = {
val cacheConfiguration = new CacheConfiguration()
cacheConfiguration.setName(parentOrderCacheName)
cacheConfiguration.setBackups(backupCount)
}

private def createDataStorageConfig(): DataStorageConfiguration = {
val storageConfiguration = new DataStorageConfiguration()

storageConfiguration.getDefaultDataRegionConfiguration.setPersistenceEnabled(persistenceEnabled.get())
storageConfiguration.getDefaultDataRegionConfiguration.setPersistenceEnabled(persistenceEnabled)

storageConfiguration
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ object IgniteOrderStore {
* @return an instance of IgniteOrderStore
*/
def apply(clientMode: Boolean = true, persistenceEnabled: Boolean = false): IgniteOrderStore = {
IgniteLocalConfig.setPersistenceEnabled(persistenceEnabled)
val config = IgniteLocalConfig.create(clientMode = clientMode)
val ignite = Ignition.getOrStart(config)
val config = IgniteLocalConfig.create(clientMode = clientMode, persistenceEnabled = persistenceEnabled)
val ignite = Ignition.getOrStart(config.igniteConfiguration())

ignite.cluster().state(ClusterState.ACTIVE)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package org.finos.vuu.example.ignite
import org.apache.ignite.Ignition

object StartIgniteMain extends App {
IgniteLocalConfig.setPersistenceEnabled(false)
val configuration = IgniteLocalConfig.create(false)
val configuration = IgniteLocalConfig.create(clientMode = false).igniteConfiguration()
val ignite = Ignition.getOrStart(configuration)
}

0 comments on commit fbeaa16

Please sign in to comment.