Skip to content

Commit

Permalink
finos#1195 enable metric tracking for each instance of session table
Browse files Browse the repository at this point in the history
- reason TBA
  • Loading branch information
junaidzm13 committed Feb 22, 2024
1 parent 5748bb2 commit d7b3346
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class LifecycleContainer(implicit clock: Clock) extends StrictLogging {
val thread = new Runner("lifeCycleJoinRunner", () => {Thread.sleep(1000)})
thread.runInBackground()

val dependencyGraph = new DirectedAcyclicGraph[LifecycleEnabled]()
private val dependencyGraph = new DirectedAcyclicGraph[LifecycleEnabled]()

def autoShutdownHook(): Unit = {
val container = this
Expand All @@ -238,33 +238,14 @@ class LifecycleContainer(implicit clock: Clock) extends StrictLogging {
}, "lcShutdownHook"))
}

case class LifeCycleComponentContext(comp: LifecycleEnabled, container: LifecycleContainer){
def dependsOn(comp2: LifecycleEnabled): Unit = {
if(!dependencyGraph.containsEdge(comp, comp2))
dependencyGraph.addEdge(comp, comp2)
else
logger.warn(s"lifecycle already contains edge $comp, $comp2")
}

def dependsOn(comps: LifecycleEnabled*): Unit = {

comps.foreach( c => {
if(!dependencyGraph.containsEdge(comp, c))
dependencyGraph.addEdge(comp, c)
else
logger.warn(s"lifecycle already contains edge $comp, $c")
})
}
}

def apply(comp: LifecycleEnabled): LifeCycleComponentContext = {

if(!dependencyGraph.containsNode(comp))
dependencyGraph.addNode(comp)
else
logger.warn(s"lifecycle already contains component $comp")

new LifeCycleComponentContext(comp, this)
LifeCycleComponentContext(comp, this, dependencyGraph)
}

def add(component: LifecycleEnabled): Unit = {}
Expand Down Expand Up @@ -320,3 +301,23 @@ class LifecycleContainer(implicit clock: Clock) extends StrictLogging {
logger.debug("Shutdown lifecycle")
}
}

case class LifeCycleComponentContext(comp: LifecycleEnabled,
container: LifecycleContainer,
dependencyGraph: DirectedAcyclicGraph[LifecycleEnabled]) extends StrictLogging {
def dependsOn(comp2: LifecycleEnabled): Unit = {
if (!dependencyGraph.containsEdge(comp, comp2))
dependencyGraph.addEdge(comp, comp2)
else
logger.warn(s"lifecycle already contains edge $comp, $comp2")
}

def dependsOn(comps: LifecycleEnabled*): Unit = {
comps.foreach(c => {
if (!dependencyGraph.containsEdge(comp, c))
dependencyGraph.addEdge(comp, c)
else
logger.warn(s"lifecycle already contains edge $comp, $c")
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.finos.toolbox.jmx.MetricsProvider
import org.finos.toolbox.lifecycle.LifecycleContainer
import org.finos.toolbox.thread.LifeCycleRunner
import org.finos.toolbox.time.Clock
import org.finos.vuu.viewport.ViewPortTable

class MetricsTableProvider(table: DataTable, tableContainer: TableContainer)(implicit clock: Clock, lifecycleContainer: LifecycleContainer,
metrics: MetricsProvider) extends Provider with StrictLogging {
Expand All @@ -30,25 +31,24 @@ class MetricsTableProvider(table: DataTable, tableContainer: TableContainer)(imp
def runOnce(): Unit = {

try {

val tables = tableContainer.getTables()

tables.foreach(tableDef => {

val counter = metrics.counter(tableDef.table + ".processUpdates.Counter");
val size = tableContainer.getTable(tableDef.table).size()

val meter = metrics.meter(tableDef.table + ".processUpdates.Meter")

val upMap = Map("table" -> (tableDef.module + "-" + tableDef.table), "updateCount" -> counter.getCount, "size" -> size, "updatesPerSecond" -> meter.getOneMinuteRate);

table.processUpdate(tableDef.table, RowWithData(tableDef.table, upMap), clock.now())

})

tables.foreach(vpTable => table.processUpdate(vpTable.table, RowWithData(vpTable.table, getMetricsData(vpTable)), clock.now()))
} catch {
case e: Exception =>
logger.error("Error occured in metrics", e)
}
}

private def getMetricsData(vpTable: ViewPortTable): Map[String, Any] = {
val counter = metrics.counter(vpTable.table + ".processUpdates.Counter")
val size = tableContainer.getTable(vpTable.table).size()
val meter = metrics.meter(vpTable.table + ".processUpdates.Meter")

Map(
"table" -> (vpTable.module + "-" + vpTable.table),
"updateCount" -> counter.getCount,
"size" -> size,
"updatesPerSecond" -> meter.getOneMinuteRate
)
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
package org.finos.vuu.core.table

import org.finos.vuu.api.{SessionTableDef, TableDef}
import org.finos.vuu.api.TableDef
import org.finos.vuu.core.index._
import org.finos.vuu.provider.{JoinTableProvider, Provider}
import org.finos.vuu.viewport.{RowProcessor, RowSource, ViewPortColumns}
import org.finos.toolbox.collection.array.ImmutableArray
import org.finos.toolbox.jmx.MetricsProvider
import org.finos.toolbox.text.AsciiUtil
import org.finos.vuu.feature.inmem.InMemTablePrimaryKeys
import org.finos.vuu.net.ClientSessionId

import java.util
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters
import scala.jdk.CollectionConverters


trait DataTable extends KeyedObservable[RowKeyUpdate] with RowSource {
Expand Down Expand Up @@ -242,7 +240,7 @@ class InMemDataTable(val tableDef: TableDef, val joinProvider: JoinTableProvider
}
}

def plusName(s: String): String = tableDef.name + "." + s
def plusName(s: String): String = name + "." + s

override protected def createDataTableData(): TableData = {
InMemDataTableData(new ConcurrentHashMap[String, RowData](), InMemTablePrimaryKeys(ImmutableArray.empty))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,18 @@ import org.finos.vuu.api.SessionTableDef
import org.finos.vuu.net.ClientSessionId
import org.finos.vuu.provider.JoinTableProvider

class InMemSessionDataTable(val clientSessionId: ClientSessionId, sessionTableDef: SessionTableDef, joinTableProvider: JoinTableProvider)(implicit metrics: MetricsProvider, clock: Clock) extends InMemDataTable(sessionTableDef, joinTableProvider)(metrics) with SessionTable {
class InMemSessionDataTable private (val clientSessionId: ClientSessionId,
sessionTableDef: SessionTableDef,
joinTableProvider: JoinTableProvider,
final val creationInstant: Long)
(implicit metrics: MetricsProvider) extends InMemDataTable(sessionTableDef, joinTableProvider) with SessionTable {

final val createInstant = clock.now()
override def name: String = s"session:$clientSessionId/simple-" + sessionTableDef.name + "_" + createInstant.toString
def tableId: String = name + "@" + hashCode()
def this(clientSessionId: ClientSessionId, sessionTableDef: SessionTableDef, joinTableProvider: JoinTableProvider)
(implicit metrics: MetricsProvider, clock: Clock) = {
this(clientSessionId, sessionTableDef, joinTableProvider, creationInstant = clock.now())
}

override def name: String = s"session:$clientSessionId/simple-" + sessionTableDef.name + "_" + creationInstant.toString

override def sessionId: ClientSessionId = clientSessionId

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ class TableContainer(val joinTableProvider: JoinTableProvider)(implicit val metr
def getTables(): Array[ViewPortTable] = {
val tableList = IteratorHasAsScala(tables.values().iterator()).asScala
tableList
.map(table => ViewPortTable(table.getTableDef.name, if (table.getTableDef.getModule() != null) table.getTableDef.getModule().name else "null")).toArray[ViewPortTable].sortBy(_.table)
.map(table => ViewPortTable(table.name, Option(table.getTableDef.getModule()).map(_.name).getOrElse("null")))
.toArray[ViewPortTable].sortBy(_.table)
}

def getNonSessionTables: Array[ViewPortTable] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package org.finos.vuu.viewport

import io.vertx.core.spi.metrics.Metrics
import org.finos.toolbox.jmx.MetricsProvider
import org.finos.toolbox.time.Clock
import org.finos.vuu.api.SessionTableDef
import org.finos.vuu.core.table.{InMemDataTable, InMemSessionDataTable, SessionTable, TableContainer}
import org.finos.vuu.core.table.{InMemDataTable, InMemSessionDataTable, TableContainer}
import org.finos.vuu.feature.ViewPortTableCreator
import org.finos.vuu.net.ClientSessionId

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.finos.vuu.core.module.metrics

import org.finos.toolbox.jmx.{MetricsProvider, MetricsProviderImpl}
import org.finos.toolbox.lifecycle.{LifeCycleComponentContext, LifecycleContainer, LifecycleEnabled}
import org.finos.toolbox.time.{Clock, TestFriendlyClock}
import org.finos.vuu.api.{Indices, TableDef, VisualLinks}
import org.finos.vuu.core.module.ModuleFactory.stringToString
import org.finos.vuu.core.module.metrics.MetricsTableProviderTest.{createMockTable, createTestTableDef}
import org.finos.vuu.core.table.{Column, Columns, DataTable, TableContainer}
import org.finos.vuu.test.TestFriendlyJoinTableProvider
import org.scalamock.scalatest.MockFactory
import org.scalatest.featurespec.AnyFeatureSpec
import org.scalatest.matchers.should.Matchers

class MetricsTableProviderTest extends AnyFeatureSpec with Matchers with MockFactory {
private implicit val metricsProvider: MetricsProvider = new MetricsProviderImpl()
private implicit val clock: Clock = new TestFriendlyClock(10001)
private implicit val lifecycleContainer: LifecycleContainer = stub[LifecycleContainer]
private val lifeCycleComponentContext = stub[LifeCycleComponentContext]
(lifecycleContainer.apply _).when(*).returns(lifeCycleComponentContext)
(lifeCycleComponentContext.dependsOn: LifecycleEnabled => Unit).when(*).returns()

private val joinProvider = new TestFriendlyJoinTableProvider()
private val mockTable = stub[DataTable]
private val tableContainer = new TableContainer(joinProvider)

private val metricsTableProvider = new MetricsTableProvider(mockTable, tableContainer)

Feature("runOnce") {
Scenario("can get and update expected list of tables") {
tableContainer.createTable(createTestTableDef(name = "instruments"))
tableContainer.addTable(createMockTable(tableName = "instrumentsSessionTable_1", tableDefName = "instruments"))
tableContainer.addTable(createMockTable(tableName = "instrumentsSessionTable_2", tableDefName = "instruments"))
tableContainer.createTable(createTestTableDef(name = "other"))

metricsTableProvider.runOnce()

(mockTable.processUpdate _).verify("instruments", *, *).once
(mockTable.processUpdate _).verify("instrumentsSessionTable_1", *, *).once
(mockTable.processUpdate _).verify("instrumentsSessionTable_2", *, *).once
(mockTable.processUpdate _).verify("other", *, *).once
}
}
}

object MetricsTableProviderTest extends MockFactory {
private def createTestTableDef(name: String,
keyField: String = "id",
columns: Array[Column] = Columns.fromNames("id".long(), "field".string())): TableDef = {
new TableDef(name, keyField, columns, Seq.empty, false, VisualLinks(), Indices())
}

private def createMockTable(tableName: String, tableDefName: String): DataTable = {
val table = stub[DataTable]
(table.name _).when().returns(tableName)
(table.getTableDef _).when().returns(createTestTableDef(tableDefName))
table
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.finos.vuu.core.table

import org.finos.toolbox.jmx.{MetricsProvider, MetricsProviderImpl}
import org.finos.toolbox.time.{Clock, DefaultClock}
import org.finos.vuu.api.{Indices, SessionTableDef}
import org.finos.vuu.core.module.ModuleFactory.stringToString
import org.finos.vuu.net.ClientSessionId
import org.finos.vuu.test.TestFriendlyJoinTableProvider
import org.scalatest.featurespec.AnyFeatureSpec
import org.scalatest.matchers.should.Matchers

class InMemSessionDataTableTest extends AnyFeatureSpec with Matchers {
private implicit val metricsProvider: MetricsProvider = new MetricsProviderImpl()
private implicit val clock: Clock = new DefaultClock()
private val clientSessionId = ClientSessionId(sessionId = "sessionId", user = "user")
private val joinProvider = new TestFriendlyJoinTableProvider()

private val sessionTableDef: SessionTableDef = new SessionTableDef(
name = "test-table",
keyField = "id",
columns = Columns.fromNames("id".long(), "field".string()),
indices = Indices(),
joinFields = Seq.empty
)

private val inMemSessionDataTable = new InMemSessionDataTable(clientSessionId, sessionTableDef, joinProvider)

Feature("Metrics update") {
Scenario("Should correctly update metrics WHEN processUpdate called") {
inMemSessionDataTable.processUpdate("1", RowWithData("1", Map("id" -> 1, "field" -> "value1")), clock.now())
inMemSessionDataTable.processUpdate("2", RowWithData("2", Map("id" -> 2, "field" -> "value2")), clock.now())

val counter = metricsProvider.counter(inMemSessionDataTable.name + ".processUpdates.Counter")
val meter = metricsProvider.meter(inMemSessionDataTable.name + ".processUpdates.Meter")

counter.getCount shouldEqual 2
meter.getCount shouldEqual 2
}
}

}

0 comments on commit d7b3346

Please sign in to comment.