Skip to content

Commit

Permalink
Command submission in the ledger-api-bench-tool.
Browse files Browse the repository at this point in the history
CHANGELOG_BEGIN
- [Integration Kit] - The ledger-api-bench-tool is now capable of generating test contracts for testing purposes.
CHANGELOG_END
  • Loading branch information
kamil-da committed Oct 21, 2021
1 parent 5507670 commit b8c4044
Show file tree
Hide file tree
Showing 17 changed files with 593 additions and 120 deletions.
6 changes: 6 additions & 0 deletions ledger/ledger-api-bench-tool/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ da_scala_library(
"@maven//:com_lihaoyi_pprint",
"@maven//:com_typesafe_akka_akka_actor",
"@maven//:com_typesafe_akka_akka_actor_typed",
"@maven//:com_typesafe_akka_akka_stream",
"@maven//:io_circe_circe_core",
"@maven//:io_circe_circe_yaml",
"@maven//:org_typelevel_cats_core",
],
visibility = ["//visibility:public"],
deps = [
Expand All @@ -51,6 +55,8 @@ da_scala_library(
"//libs-scala/resources",
"//libs-scala/resources-akka",
"//libs-scala/resources-grpc",
"//ledger/test-common:model-tests-%s.scala" % "1.14", # TODO: make the LF version configurable
"//ledger/test-common:dar-files-%s-lib" % "1.14", # TODO: make the LF version configurable
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:io_grpc_grpc_api",
"@maven//:io_grpc_grpc_core",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.api.benchtool

import com.daml.ledger.api.benchtool.metrics.{
MetricRegistryOwner,
MetricsCollector,
MetricsSet,
StreamMetrics,
}
import com.daml.ledger.api.benchtool.services.LedgerApiServices
import com.daml.ledger.api.benchtool.util.TypedActorSystemResourceOwner
import com.daml.ledger.resources.ResourceContext
import org.slf4j.LoggerFactory

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

object Benchmark {
private val logger = LoggerFactory.getLogger(getClass)

def run(
config: Config,
apiServices: LedgerApiServices,
)(implicit ec: ExecutionContext, resourceContext: ResourceContext): Future[Unit] = {
val resources = for {
system <- TypedActorSystemResourceOwner.owner()
registry <- new MetricRegistryOwner(
reporter = config.metricsReporter,
reportingInterval = config.reportingPeriod,
logger = logger,
)
} yield (system, registry)

resources.use { case (system, registry) =>
Future
.traverse(config.streams) {
case streamConfig: Config.StreamConfig.TransactionsStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
metrics = MetricsSet.transactionMetrics(streamConfig.objectives),
logger = logger,
exposedMetrics = Some(
MetricsSet
.transactionExposedMetrics(streamConfig.name, registry, config.reportingPeriod)
),
)(system, ec)
.flatMap { observer =>
apiServices.transactionService.transactions(streamConfig, observer)
}
case streamConfig: Config.StreamConfig.TransactionTreesStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
metrics = MetricsSet.transactionTreesMetrics(streamConfig.objectives),
logger = logger,
exposedMetrics = Some(
MetricsSet.transactionTreesExposedMetrics(
streamConfig.name,
registry,
config.reportingPeriod,
)
),
)(system, ec)
.flatMap { observer =>
apiServices.transactionService.transactionTrees(streamConfig, observer)
}
case streamConfig: Config.StreamConfig.ActiveContractsStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
metrics = MetricsSet.activeContractsMetrics,
logger = logger,
exposedMetrics = Some(
MetricsSet.activeContractsExposedMetrics(
streamConfig.name,
registry,
config.reportingPeriod,
)
),
)(system, ec)
.flatMap { observer =>
apiServices.activeContractsService.getActiveContracts(streamConfig, observer)
}
case streamConfig: Config.StreamConfig.CompletionsStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
metrics = MetricsSet.completionsMetrics,
logger = logger,
exposedMetrics = Some(
MetricsSet
.completionsExposedMetrics(streamConfig.name, registry, config.reportingPeriod)
),
)(system, ec)
.flatMap { observer =>
apiServices.commandCompletionService.completions(streamConfig, observer)
}
}
.transform {
case Success(results) =>
if (results.contains(MetricsCollector.Message.MetricsResult.ObjectivesViolated))
Failure(new RuntimeException("Metrics objectives not met."))
else Success(())
case Failure(ex) =>
Failure(ex)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.daml.ledger.api.v1.value.Identifier
import com.daml.metrics.MetricsReporter
import scopt.{OptionDef, OptionParser, Read}

import java.io.File
import scala.concurrent.duration.FiniteDuration
import scala.util.{Failure, Success, Try}

Expand All @@ -32,7 +33,6 @@ object Cli {
opt[Config.StreamConfig]("consume-stream")
.abbr("s")
.unbounded()
.minOccurs(1)
.text(
s"Stream configuration."
)
Expand All @@ -43,6 +43,15 @@ object Cli {
config.copy(streams = config.streams :+ streamConfig)
}

opt[File]("contract-set-descriptor")
.hidden() // TODO: uncomment when production-ready
.abbr("d")
.optional()
.text("A contract set descriptor file.")
.action { case (descriptorFile, config) =>
config.copy(contractSetDescriptorFile = Some(descriptorFile))
}

opt[FiniteDuration]("log-interval")
.abbr("r")
.text("Stream metrics log interval.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.api.v1.value.Identifier
import com.daml.metrics.MetricsReporter

import java.io.File
import scala.concurrent.duration._

case class Config(
Expand All @@ -16,6 +17,7 @@ case class Config(
tls: TlsConfiguration,
streams: List[Config.StreamConfig],
reportingPeriod: FiniteDuration,
contractSetDescriptorFile: Option[File],
metricsReporter: MetricsReporter,
)

Expand Down Expand Up @@ -90,6 +92,7 @@ object Config {
tls = TlsConfiguration.Empty.copy(enabled = false),
streams = List.empty[Config.StreamConfig],
reportingPeriod = 5.seconds,
contractSetDescriptorFile = None,
metricsReporter = MetricsReporter.Console,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,8 @@

package com.daml.ledger.api.benchtool

import com.daml.ledger.api.benchtool.metrics.{
MetricRegistryOwner,
MetricsCollector,
MetricsSet,
StreamMetrics,
}
import com.daml.ledger.api.benchtool.services.{
ActiveContractsService,
CommandCompletionService,
LedgerIdentityService,
TransactionService,
}
import com.daml.ledger.api.benchtool.util.TypedActorSystemResourceOwner
import com.daml.ledger.api.benchtool.generating.ContractProducer
import com.daml.ledger.api.benchtool.services._
import com.daml.ledger.api.tls.TlsConfiguration
import com.daml.ledger.resources.{ResourceContext, ResourceOwner}
import io.grpc.Channel
Expand All @@ -31,127 +20,60 @@ import java.util.concurrent.{
}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.{Failure, Success}

object LedgerApiBenchTool {
def main(args: Array[String]): Unit = {
Cli.config(args) match {
case Some(config) =>
val benchmark = runBenchmark(config)(ExecutionContext.Implicits.global)
val result = run(config)(ExecutionContext.Implicits.global)
.recover { case ex =>
println(s"Error: ${ex.getMessage}")
sys.exit(1)
}(scala.concurrent.ExecutionContext.Implicits.global)
Await.result(benchmark, atMost = Duration.Inf)
Await.result(result, atMost = Duration.Inf)
()
case _ =>
logger.error("Invalid configuration arguments.")
}
}

private def runBenchmark(config: Config)(implicit ec: ExecutionContext): Future[Unit] = {
private def run(config: Config)(implicit ec: ExecutionContext): Future[Unit] = {
val printer = pprint.PPrinter(200, 1000)
logger.info(s"Starting benchmark with configuration:\n${printer(config).toString()}")

implicit val resourceContext: ResourceContext = ResourceContext(ec)

val resources = for {
executorService <- threadPoolExecutorOwner(config.concurrency)
channel <- channelOwner(config.ledger, config.tls, executorService)
system <- TypedActorSystemResourceOwner.owner()
registry <- new MetricRegistryOwner(
reporter = config.metricsReporter,
reportingInterval = config.reportingPeriod,
logger = logger,
)
} yield (channel, system, registry)

resources.use { case (channel, system, registry) =>
val ledgerIdentityService: LedgerIdentityService = new LedgerIdentityService(channel)
val ledgerId: String = ledgerIdentityService.fetchLedgerId()
val transactionService = new TransactionService(channel, ledgerId)
val activeContractsService = new ActiveContractsService(channel, ledgerId)
val commandCompletionService = new CommandCompletionService(channel, ledgerId)
Future
.traverse(config.streams) {
case streamConfig: Config.StreamConfig.TransactionsStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
metrics = MetricsSet.transactionMetrics(streamConfig.objectives),
logger = logger,
exposedMetrics = Some(
MetricsSet
.transactionExposedMetrics(streamConfig.name, registry, config.reportingPeriod)
),
)(system, ec)
.flatMap { observer =>
transactionService.transactions(streamConfig, observer)
}
case streamConfig: Config.StreamConfig.TransactionTreesStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
metrics = MetricsSet.transactionTreesMetrics(streamConfig.objectives),
logger = logger,
exposedMetrics = Some(
MetricsSet.transactionTreesExposedMetrics(
streamConfig.name,
registry,
config.reportingPeriod,
)
),
)(system, ec)
.flatMap { observer =>
transactionService.transactionTrees(streamConfig, observer)
}
case streamConfig: Config.StreamConfig.ActiveContractsStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
metrics = MetricsSet.activeContractsMetrics,
logger = logger,
exposedMetrics = Some(
MetricsSet.activeContractsExposedMetrics(
streamConfig.name,
registry,
config.reportingPeriod,
)
),
)(system, ec)
.flatMap { observer =>
activeContractsService.getActiveContracts(streamConfig, observer)
}
case streamConfig: Config.StreamConfig.CompletionsStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
metrics = MetricsSet.completionsMetrics,
logger = logger,
exposedMetrics = Some(
MetricsSet
.completionsExposedMetrics(streamConfig.name, registry, config.reportingPeriod)
),
)(system, ec)
.flatMap { observer =>
commandCompletionService.completions(streamConfig, observer)
}
}
.transform {
case Success(results) =>
if (results.contains(MetricsCollector.Message.MetricsResult.ObjectivesViolated))
Failure(new RuntimeException("Metrics objectives not met."))
else Success(())
case Failure(ex) =>
Failure(ex)
}
apiServicesOwner(config).use { apiServices =>
def testContractsGenerationStep(): Future[Unit] = config.contractSetDescriptorFile match {
case None =>
Future.successful(
logger.info("No contract set descriptor file provided. Skipping contracts generation.")
)
case Some(descriptorFile) => ContractProducer(apiServices).create(descriptorFile)
}

def benchmarkStep(): Future[Unit] = if (config.streams.isEmpty) {
Future.successful(logger.info(s"No streams defined. Skipping the benchmark step."))
} else {
Benchmark.run(config, apiServices)
}

for {
_ <- testContractsGenerationStep()
_ <- benchmarkStep()
} yield ()
}
}

private def apiServicesOwner(
config: Config
)(implicit ec: ExecutionContext): ResourceOwner[LedgerApiServices] =
for {
executorService <- threadPoolExecutorOwner(config.concurrency)
channel <- channelOwner(config.ledger, config.tls, executorService)
services <- ResourceOwner.forFuture(() => LedgerApiServices.forChannel(channel))
} yield services

private def channelOwner(
ledger: Config.Ledger,
tls: TlsConfiguration,
Expand Down
Loading

0 comments on commit b8c4044

Please sign in to comment.