diff --git a/core/src/main/scala/besom/internal/Context.scala b/core/src/main/scala/besom/internal/Context.scala index 23491393..689c95f3 100644 --- a/core/src/main/scala/besom/internal/Context.scala +++ b/core/src/main/scala/besom/internal/Context.scala @@ -21,6 +21,7 @@ trait Context extends TaskTracker: private[besom] def resources: Resources private[besom] def runInfo: RunInfo private[besom] def monitor: Monitor + private[besom] def memo: Memo private[besom] def getParentURN: Result[URN] private[besom] def config: Config private[besom] def isDryRun: Boolean @@ -106,6 +107,7 @@ class ContextImpl( private[besom] val engine: Engine, private[besom] val taskTracker: TaskTracker, private[besom] val resources: Resources, + private[besom] val memo: Memo, private val stackPromise: Promise[StackResource] ) extends Context with TaskTracker: @@ -229,9 +231,10 @@ object Context: engine: Engine, taskTracker: TaskTracker, resources: Resources, + memo: Memo, stackPromise: Promise[StackResource] ): Context = - new ContextImpl(runInfo, featureSupport, config, logger, monitor, engine, taskTracker, resources, stackPromise) + new ContextImpl(runInfo, featureSupport, config, logger, monitor, engine, taskTracker, resources, memo, stackPromise) def apply( runInfo: RunInfo, @@ -244,8 +247,9 @@ object Context: ): Result[Context] = for resources <- Resources() + memo <- Memo() stackPromise <- Promise[StackResource]() - yield Context.create(runInfo, featureSupport, config, logger, monitor, engine, taskTracker, resources, stackPromise) + yield Context.create(runInfo, featureSupport, config, logger, monitor, engine, taskTracker, resources, memo, stackPromise) def apply( runInfo: RunInfo, diff --git a/core/src/main/scala/besom/internal/Memo.scala b/core/src/main/scala/besom/internal/Memo.scala new file mode 100644 index 00000000..3c86296d --- /dev/null +++ b/core/src/main/scala/besom/internal/Memo.scala @@ -0,0 +1,49 @@ +package besom.internal + +import java.util.concurrent.ConcurrentHashMap + +/** A memoization utility that caches the results of a side-effecting computation based on the type and name of the resource for which said + * computation is being performed. This is necessary in order to prevent runtime crashes caused by multiple evaluations of resource + * constructors that perform gRPC calls to the Pulumi engine. Pulumi engine is not idempotent on register resource, register resource + * outputs and read resource calls (at least for these calls we do know it will crash if called multiple times). + * + * We want to memoize as little as possible because: + * + * a) memoizing too much can lead to surprising behavior for end user (e.g. for calls that can return different results on each invocation) + * b) we don't have a sane way to use WeakConcurrentMap here because we can't control the lifecycle of the keys due to the lazy nature of + * Result monad, on each evaluation of the resource constructor referencing effect keys will be new instances of String and therefore not + * stable identifiers for use in WeakConcurrentMap. This in turn means this map WILL grow indefinitely and we can't do anything about it. + * It will get deallocated when Context that holds it is deallocated so effectively it's bounded by the lifetime of infrastructural + * program. + * + * This memoization utility uses ConcurrentHashMap to handle concurrent access to the cache on per-key level (concurrent access *does + * indeed happen* due to the nature of Pulumi SDKs that are parallel by default; quick example - Output referring to a resource constructor + * is passed to args of two other resource constructors, these resource constructors are evaluated in parallel on different threads to + * resolve their dependencies for serialization). + * + * Mechanics of memoization are quite simple: we put a Promise into the map for a given key using atomic putIfAbsent operation. If the key + * is not present in the map, we evaluate the effect and fulfill the promise with the result of the effect. If the key is present in the + * map, we return the existing promise and wait for it to be fulfilled by the effect. This way we ensure that the effect is evaluated only + * once for a given key. + * + * @param chm + * ConcurrentHashMap using tuple of type and name as a key and Promise as a value. + */ +class Memo(private val chm: ConcurrentHashMap[(String, String), Promise[?]]): + def memoize[A](typ: String, name: String, effect: Result[A]): Result[A] = + Promise[A]().flatMap { promise => + val existing = chm.putIfAbsent((typ, name), promise) + if existing == null then + effect + .flatMap(promise.fulfill) + .recover(e => promise.fail(e)) + .flatMap(_ => promise.get) + else existing.get.asInstanceOf[Result[A]] + } + +object Memo: + def apply(): Result[Memo] = Result.defer { + val chm = ConcurrentHashMap[(String, String), Promise[?]]() + + new Memo(chm) + } diff --git a/core/src/main/scala/besom/internal/ResourceDecoder.scala b/core/src/main/scala/besom/internal/ResourceDecoder.scala index bd0c4531..21dd9db6 100644 --- a/core/src/main/scala/besom/internal/ResourceDecoder.scala +++ b/core/src/main/scala/besom/internal/ResourceDecoder.scala @@ -7,10 +7,12 @@ import besom.util.*, Validated.ValidatedResult import besom.internal.logging.* import besom.types.{Label, URN, ResourceId} -trait ResourceDecoder[A <: Resource]: // TODO rename to something more sensible - def makeResolver(using Context, BesomMDC[Label]): Result[(A, ResourceResolver[A])] +trait ResourceDecoder[A <: Resource]: + def makeResourceAndResolver(using Context, BesomMDC[Label]): Result[(A, ResourceResolver[A])] object ResourceDecoder: + inline def forResource[R <: Resource: ResourceDecoder]: ResourceDecoder[R] = summon[ResourceDecoder[R]] + inline def derived[A <: Resource]: ResourceDecoder[A] = ${ derivedImpl[A] } class CustomPropertyExtractor[A](propertyName: String, decoder: Decoder[A]): @@ -60,7 +62,7 @@ object ResourceDecoder: end CustomPropertyExtractor - def makeResolver[A <: Resource]( + private[ResourceDecoder] def makeResourceAndResolver[A <: Resource]( fromProduct: Product => A, customPropertyExtractors: Vector[CustomPropertyExtractor[?]] )(using Context, BesomMDC[Label]): Result[(A, ResourceResolver[A])] = @@ -126,7 +128,7 @@ object ResourceDecoder: (resource, resolver) } - end makeResolver + end makeResourceAndResolver private def derivedImpl[A <: Resource: Type](using q: Quotes): Expr[ResourceDecoder[A]] = Expr.summon[Mirror.Of[A]].get match @@ -156,8 +158,8 @@ object ResourceDecoder: '{ new ResourceDecoder[A]: - def makeResolver(using Context, BesomMDC[Label]): Result[(A, ResourceResolver[A])] = - ResourceDecoder.makeResolver( + def makeResourceAndResolver(using Context, BesomMDC[Label]): Result[(A, ResourceResolver[A])] = + ResourceDecoder.makeResourceAndResolver( fromProduct = ${ m }.fromProduct, customPropertyExtractors = ${ customPropertyExtractorsExpr }.toVector ) diff --git a/core/src/main/scala/besom/internal/ResourceOps.scala b/core/src/main/scala/besom/internal/ResourceOps.scala index d0f22eae..73f2219d 100644 --- a/core/src/main/scala/besom/internal/ResourceOps.scala +++ b/core/src/main/scala/besom/internal/ResourceOps.scala @@ -13,14 +13,27 @@ import scala.annotation.unused type Providers = Map[String, ProviderResource] +enum Mode(val logStr: String): + case GetWithUrn(urn: URN) extends Mode("Getting") + case ReadWithId(id: ResourceId) extends Mode("Reading") + case Register extends Mode("Registering") + + override def toString(): String = logStr + + def suffix: String = this match + case GetWithUrn(urn) => s"from Pulumi state with URN ${urn.asString}" + case ReadWithId(id) => s"from infrastructure with foreign id (import id) ${id}" + case Register => "" + class ResourceOps(using ctx: Context, mdc: BesomMDC[Label]): + // register resource outputs *NEEDS TO* be memoized private[besom] def registerResourceOutputsInternal( urnResult: Result[URN], outputs: Result[Struct] ): Result[Unit] = urnResult.flatMap { urn => - outputs.flatMap { struct => + val runSideEffects = outputs.flatMap { struct => val request = RegisterResourceOutputsRequest( urn = urn.asString, outputs = if struct.fields.isEmpty then None else Some(struct) @@ -28,6 +41,10 @@ class ResourceOps(using ctx: Context, mdc: BesomMDC[Label]): ctx.monitor.registerResourceOutputs(request) } + + /** see docs: [[Memo]] + */ + ctx.memo.memoize("registerResourceOutputs", urn.asString, runSideEffects) } private[besom] def readOrRegisterResourceInternal[R <: Resource: ResourceDecoder, A: ArgsEncoder]( @@ -37,34 +54,35 @@ class ResourceOps(using ctx: Context, mdc: BesomMDC[Label]): options: ResourceOptions, remote: Boolean ): Result[R] = - summon[ResourceDecoder[R]].makeResolver.flatMap { (resource, resolver) => - // this order is then repeated in registerReadOrGetResource - val modeResult = options.hasURN.zip(options.hasImportId).map { case (hasUrn, hasImportId) => - if hasUrn then "Getting" else if hasImportId then "Reading" else "Registering" - } - - modeResult.flatMap { mode => - log.debug(s"$mode resource, trying to add to cache...") *> - ctx.resources.cacheResource(typ, name, args, options, resource).flatMap { addedToCache => - if addedToCache then - for - options <- options.resolve - _ <- log.debug(s"$mode resource, added to cache...") - state <- createResourceState(typ, name, resource, options, remote) - _ <- log.debug(s"Created resource state") - _ <- ctx.resources.add(resource, state) - _ <- log.debug(s"Added resource to resources") - inputs <- prepareResourceInputs(resource, state, args, options) - _ <- log.debug(s"Prepared inputs for resource") - _ <- addChildToParentResource(resource, options.parent) - _ <- log.debug(s"Added child to parent (isDefined: ${options.parent.isDefined})") - _ <- registerReadOrGetResource(resource, state, resolver, inputs, options, remote) - yield resource - else ctx.resources.getCachedResource(typ, name, args, options).map(_.asInstanceOf[R]) - } - } + resolveMode(options).flatMap { mode => + def runSideEffects = + for + (resource, resolver) <- ResourceDecoder.forResource[R].makeResourceAndResolver + _ <- log.debug(s"$mode resource ${mode.suffix}") + options <- options.resolve + _ <- log.debug(s"$mode resource, added to cache...") + state <- createResourceState(typ, name, resource, options, remote) + _ <- log.debug(s"Created resource state") + _ <- ctx.resources.add(resource, state) + _ <- log.debug(s"Added resource to resources") + inputs <- prepareResourceInputs(resource, state, args, options) + _ <- log.debug(s"Prepared inputs for resource") + _ <- addChildToParentResource(resource, options.parent) + _ <- log.debug(s"Added child to parent (isDefined: ${options.parent.isDefined})") + _ <- registerReadOrGetResource(resource, state, resolver, inputs, options, remote) + yield resource + + /** see docs: [[Memo]] + */ + mode match + // DO NOT memoize Get, it's the only grpc call that doesn't need to be memoized + case Mode.GetWithUrn(_) => runSideEffects + // DO memoize Register and Read, if we don't, they crash the engine on second invocation + // and laziness means WE WILL evaluate them more than once usually + case Mode.Register | Mode.ReadWithId(_) => ctx.memo.memoize(typ, name, runSideEffects) } + // invoke is not memoized private[besom] def invokeInternal[A: ArgsEncoder, R: Decoder](tok: FunctionToken, args: A, opts: InvokeOptions): Output[R] = def decodeResponse(resultAsValue: Value, props: Map[String, Set[Resource]]): Result[OutputData[R]] = val resourceLabel = mdc.get(Key.LabelKey) @@ -132,13 +150,25 @@ class ResourceOps(using ctx: Context, mdc: BesomMDC[Label]): for maybeProviderId <- maybeProviderIdResult req <- buildInvokeRequest(invokeArgs.serialized, maybeProviderId, version) - _ <- log.debug(s"Invoke RPC prepared, req=${pprint(req)}") + _ <- log.debug(s"Invoke RPC prepared, req:\n${pprint(req)}") res <- ctx.monitor.invoke(req) - _ <- log.debug(s"Invoke RPC executed, res=${pprint(res)}") + _ <- log.debug(s"Invoke RPC executed, res:\n${pprint(res)}") parsed <- parseInvokeResponse(tok, res) yield parsed end executeInvoke + private[internal] def resolveMode(options: ResourceOptions): Result[Mode] = + // this order is then repeated in registerReadOrGetResource + options.hasURN.zip(options.hasImportId).flatMap { case (hasUrn, hasImportId) => + if hasUrn then options.getURN.map(Mode.GetWithUrn(_)) + else if hasImportId then + options.getImportId.flatMap { + case Some(importId) => Result.pure(Mode.ReadWithId(importId)) + case None => Result.fail(Exception("importId can't be empty here :|")) // sanity check + } + else Result.pure(Mode.Register) + } + private[internal] def registerReadOrGetResource[R <: Resource]( resource: Resource, state: ResourceState, @@ -203,7 +233,7 @@ class ResourceOps(using ctx: Context, mdc: BesomMDC[Label]): for _ <- log.debug(s"Resolving resource ${state.asLabel} with $debugMessageSuffix") - _ <- log.trace(s"Resolving resource ${state.asLabel} with: ${pprint(eitherErrorOrResult)}") + _ <- log.trace(s"Resolving resource ${state.asLabel} with:\n${pprint(eitherErrorOrResult)}") errOrUnit <- resolver.resolve(eitherErrorOrResult).either _ <- errOrUnit.fold(ctx.fail, _ => Result.unit) // fail context if resource resolution fails errOrUnitMsg = errOrUnit.fold(t => s"with an error: ${t.getMessage}", _ => "successfully") @@ -387,14 +417,14 @@ class ResourceOps(using ctx: Context, mdc: BesomMDC[Label]): .flatMap { req => for _ <- log.debug(s"Executing ReadResourceRequest for ${state.asLabel}") - _ <- log.trace(s"ReadResourceRequest for ${state.asLabel}: ${pprint(req)}") + _ <- log.trace(s"ReadResourceRequest for ${state.asLabel}:\n${pprint(req)}") resp <- ctx.monitor.readResource(req) yield resp } .flatMap { response => for _ <- log.debug(s"Received ReadResourceResponse for ${state.asLabel}") - _ <- log.trace(s"ReadResourceResponse for ${state.asLabel}: ${pprint(response)}") + _ <- log.trace(s"ReadResourceResponse for ${state.asLabel}:\n${pprint(response)}") rawResourceResult <- RawResourceResult.fromResponse(response, id) yield rawResourceResult } @@ -488,14 +518,14 @@ class ResourceOps(using ctx: Context, mdc: BesomMDC[Label]): .flatMap { req => for _ <- log.debug(s"Executing RegisterResourceRequest for ${state.asLabel}") - _ <- log.trace(s"RegisterResourceRequest for ${state.asLabel}: ${pprint(req)}") + _ <- log.trace(s"RegisterResourceRequest for ${state.asLabel}:\n${pprint(req)}") resp <- ctx.monitor.registerResource(req) yield resp } .flatMap { response => for _ <- log.debug(s"Received RegisterResourceResponse for ${state.asLabel}") - _ <- log.trace(s"RegisterResourceResponse for ${state.asLabel}: ${pprint(response)}") + _ <- log.trace(s"RegisterResourceResponse for ${state.asLabel}:\n${pprint(response)}") rawResourceResult <- RawResourceResult.fromResponse(response) yield rawResourceResult } diff --git a/core/src/main/scala/besom/internal/ResourceOptions.scala b/core/src/main/scala/besom/internal/ResourceOptions.scala index d3361eb1..d91d7b33 100644 --- a/core/src/main/scala/besom/internal/ResourceOptions.scala +++ b/core/src/main/scala/besom/internal/ResourceOptions.scala @@ -183,6 +183,11 @@ sealed trait ResourceOptions: private[besom] def hasURN: Result[Boolean] = urn.map(_.isDefined).getValueOrElse(false) + private[besom] def getURN: Result[URN] = urn.getValueOrElse(None).flatMap { + case Some(urn) => Result.pure(urn) + case None => Result.fail(Exception("URN is not defined")) + } + private[besom] def hasImportId(using Context): Result[Boolean] = this match case cr: CustomResourceOptions => cr.importId.map(_.isDefined).getValueOrElse(false) case sr: StackReferenceResourceOptions => sr.importId.map(_.isDefined).getValueOrElse(false) diff --git a/core/src/main/scala/besom/internal/Result.scala b/core/src/main/scala/besom/internal/Result.scala index 7a562904..53533c77 100644 --- a/core/src/main/scala/besom/internal/Result.scala +++ b/core/src/main/scala/besom/internal/Result.scala @@ -158,6 +158,8 @@ enum Result[+A]: case Sleep(r: () => Result[A], duration: Long, debug: Debug) case GetFinalizers(debug: Debug) extends Result[Finalizers] + def withFilter(p: A => Boolean)(using Debug): Result[A] = this + def flatMap[B](f: A => Result[B])(using Debug): Result[B] = BiFlatMap( this, { diff --git a/core/src/main/scala/besom/internal/resources.scala b/core/src/main/scala/besom/internal/resources.scala index 8dca83d4..4aac591a 100644 --- a/core/src/main/scala/besom/internal/resources.scala +++ b/core/src/main/scala/besom/internal/resources.scala @@ -1,12 +1,6 @@ package besom.internal -import scala.annotation.unused - -//noinspection ScalaFileName -class Resources private ( - private val resources: Ref[Map[Resource, ResourceState]], - private val cache: Ref[Map[(String, String), Promise[Resource]]] -): +class Resources private (private val resources: Ref[Map[Resource, ResourceState]]): def add(resource: ProviderResource, state: ProviderResourceState): Result[Unit] = resources.update(_ + (resource -> state)) @@ -86,26 +80,9 @@ class Resources private ( def updateStateFor(resource: Resource)(f: ResourceState => ResourceState): Result[Unit] = resources.update(_.updatedWith(resource)(_.map(f))) - def cacheResource(typ: String, name: String, @unused args: Any, @unused opts: ResourceOptions, resource: Resource): Result[Boolean] = - cache.get.flatMap(_.get((typ, name)) match - case Some(_) => Result.pure(false) - case None => - Promise[Resource]().flatMap { promise => - cache - .update(_.updated((typ, name), promise)) - .flatMap(_ => promise.fulfill(resource) *> Result.pure(true)) - } - ) - - def getCachedResource(typ: String, name: String, @unused args: Any, @unused opts: ResourceOptions): Result[Resource] = - cache.get.flatMap(_((typ, name)).get) - end Resources -//noinspection ScalaFileName object Resources: def apply(): Result[Resources] = - for - resources <- Ref(Map.empty[Resource, ResourceState]) - cache <- Ref(Map.empty[(String, String), Promise[Resource]]) - yield new Resources(resources, cache) + for resources <- Ref(Map.empty[Resource, ResourceState]) + yield new Resources(resources) diff --git a/core/src/test/scala/besom/internal/DummyContext.scala b/core/src/test/scala/besom/internal/DummyContext.scala index 714215cd..e85200fd 100644 --- a/core/src/test/scala/besom/internal/DummyContext.scala +++ b/core/src/test/scala/besom/internal/DummyContext.scala @@ -50,7 +50,8 @@ object DummyContext: logger <- BesomLogger.local() config <- Config(runInfo.project, isProjectName = true, configMap = configMap, configSecretKeys = configSecretKeys) resources <- Resources() - given Context = Context.create(runInfo, featureSupport, config, logger, monitor, engine, taskTracker, resources, stackPromise) + memo <- Memo() + given Context = Context.create(runInfo, featureSupport, config, logger, monitor, engine, taskTracker, resources, memo, stackPromise) _ <- stackPromise.fulfill(StackResource()(using ComponentBase(Output(besom.types.URN.empty)))) yield summon[Context] diff --git a/core/src/test/scala/besom/internal/MemoTest.scala b/core/src/test/scala/besom/internal/MemoTest.scala new file mode 100644 index 00000000..a25cfd8c --- /dev/null +++ b/core/src/test/scala/besom/internal/MemoTest.scala @@ -0,0 +1,131 @@ +package besom.internal + +import besom.internal.RunResult.{*, given} + +class MemoTest extends munit.FunSuite: + + test("memoization leads to just one evaluation of the memoized effect") { + val atomicInt = new java.util.concurrent.atomic.AtomicInteger(0) + + val effect = Result(atomicInt.incrementAndGet()) + + val memo = Memo().unsafeRunSync() + + val memoizedEffect = memo.memoize("test", "test", effect) + + val program = + for + _ <- memoizedEffect + _ <- memoizedEffect + _ <- memoizedEffect + yield () + + program.unsafeRunSync() + + assertEquals(atomicInt.get(), 1) + } + + test("memoization is thread safe (multiple threads trying to evaluate memoized effect do not lead to multiple evaluations)") { + val atomicInt = new java.util.concurrent.atomic.AtomicInteger(0) + + val effect = Result(atomicInt.incrementAndGet()) + + val memo = Memo().unsafeRunSync() + + val memoizedEffect = memo.memoize("test", "test", effect) + + val program = + for + fibs <- Result.sequence((1 to 100).map(_ => memoizedEffect.fork).toList) + _ <- Result.sequence(fibs.map(_.join)) + yield () + + program.unsafeRunSync() + + assertEquals(atomicInt.get(), 1) + } + + test("memoization works for errors too") { + val effect = Result.unit.flatMap { _ => Result.fail(new RuntimeException("oh no")) } + + val memo = Memo().unsafeRunSync() + + val memoizedEffect = memo.memoize("test", "test", effect) + + val setOfErrors = scala.collection.mutable.Set.empty[Throwable] + + val program = + for + _ <- memoizedEffect.either.map(_.left.foreach(setOfErrors.add)) + _ <- memoizedEffect.either.map(_.left.foreach(setOfErrors.add)) + _ <- memoizedEffect.either.map(_.left.foreach(setOfErrors.add)) + yield () + + program.unsafeRunSync() + + assertEquals(setOfErrors.size, 1) + } + + test("memoization works in nested flatMap calls") { + val atomicInt = new java.util.concurrent.atomic.AtomicInteger(0) + + val memo = Memo().unsafeRunSync() + + // if only keys are the same, memoization should work always + val memoizedEffect = Result(23).flatMap(_ => memo.memoize("test", "test", Result(atomicInt.incrementAndGet()))) + + val program = + for + _ <- memoizedEffect + _ <- memoizedEffect + _ <- memoizedEffect + yield () + + program.unsafeRunSync() + + assertEquals(atomicInt.get(), 1) + } + + test("memoization works in nested flatMap calls executed on different fibers") { + val atomicInt = new java.util.concurrent.atomic.AtomicInteger(0) + + val memo = Memo().unsafeRunSync() + + // if only keys are the same, memoization should work always + val memoizedEffect = Result(23).flatMap(_ => memo.memoize("test", "test", Result(atomicInt.incrementAndGet()))) + + val program = + for + fib1 <- memoizedEffect.fork + fib2 <- memoizedEffect.fork + fib3 <- memoizedEffect.fork + _ <- Result.sequence(List(fib1, fib2, fib3).map(_.join)) + yield () + + program.unsafeRunSync() + + assertEquals(atomicInt.get(), 1) + } + + test("memoization should work for different keys in nested flatMap calls") { + val atomicInt = new java.util.concurrent.atomic.AtomicInteger(0) + + val memo = Memo().unsafeRunSync() + + // if only keys are the same, memoization should work always + val memoizedEffect = Result(1 to 3).flatMap { range => + Result.sequence(range.map(i => memo.memoize(s"test-$i", "test", Result(atomicInt.incrementAndGet())))) + } + + val program = + for + _ <- memoizedEffect + _ <- memoizedEffect + _ <- memoizedEffect + yield () + + program.unsafeRunSync() + + assertEquals(atomicInt.get(), 3) + } +end MemoTest diff --git a/core/src/test/scala/besom/internal/ResourceDecoderTest.scala b/core/src/test/scala/besom/internal/ResourceDecoderTest.scala index 8af2c837..881e0bc1 100644 --- a/core/src/test/scala/besom/internal/ResourceDecoderTest.scala +++ b/core/src/test/scala/besom/internal/ResourceDecoderTest.scala @@ -68,7 +68,7 @@ class ResourceDecoderTest extends munit.FunSuite: ) ) - val (resource, resourceResolver) = resourceDecoder.makeResolver.unsafeRunSync() + val (resource, resourceResolver) = resourceDecoder.makeResourceAndResolver.unsafeRunSync() resourceResolver.resolve(errorOrResourceResult).unsafeRunSync() @@ -110,7 +110,7 @@ class ResourceDecoderTest extends munit.FunSuite: ) ) - val (resource, resourceResolver) = resourceDecoder.makeResolver.unsafeRunSync() + val (resource, resourceResolver) = resourceDecoder.makeResourceAndResolver.unsafeRunSync() resourceResolver.resolve(errorOrResourceResult).unsafeRunSync() @@ -151,7 +151,7 @@ class ResourceDecoderTest extends munit.FunSuite: ) ) - val (resource, resourceResolver) = resourceDecoder.makeResolver.unsafeRunSync() + val (resource, resourceResolver) = resourceDecoder.makeResourceAndResolver.unsafeRunSync() resourceResolver.resolve(errorOrResourceResult).unsafeRunSync() diff --git a/experimental/src/main/scala/besom/liftoff.scala b/experimental/src/main/scala/besom/liftoff.scala index 1b14b350..187f221c 100644 --- a/experimental/src/main/scala/besom/liftoff.scala +++ b/experimental/src/main/scala/besom/liftoff.scala @@ -294,10 +294,9 @@ def redisCluster(name: NonEmptyString, nodes: Int :| Positive)(using Context): O def orElse[B >: A](alternative: Output[Option[B]]): Output[Option[B]] = output.flatMap(o => alternative.map(a => o.orElse(a))) - extension [A](output: Output[Option[List[A]]]) - def headOption: Output[Option[A]] = output.map(_.flatMap(_.headOption)) + extension [A](output: Output[Option[List[A]]]) def headOption: Output[Option[A]] = output.map(_.flatMap(_.headOption)) - val maybeIngress = nginxService.status.loadBalancer.ingress.headOption + val maybeIngress = nginxService.status.loadBalancer.ingress.headOption val hostnameOrIp: Output[Option[String]] = maybeIngress.hostname.orElse(maybeIngress.ip) Stack.exports( diff --git a/integration-tests/CoreTests.test.scala b/integration-tests/CoreTests.test.scala index 4e561ca1..ef7f1d50 100644 --- a/integration-tests/CoreTests.test.scala +++ b/integration-tests/CoreTests.test.scala @@ -108,6 +108,47 @@ class CoreTests extends munit.FunSuite { pulumi.up(ctx.stackName).call(cwd = ctx.programDir, env = ctx.env) } + FunFixture[pulumi.FixtureMultiContext]( + setup = { + val schemaName = "tls" + val result = codegen.generatePackage(PackageMetadata(schemaName, providerTlsSchemaVersion)) + scalaCli.publishLocal(result.outputDir).call() + pulumi.fixture.setup( + FixtureOpts(), + FixtureArgs( + wd / "resources" / "memoization" / "source-stack", + projectFiles = Map( + "project.scala" -> + (defaultProjectFile + CodeGen.packageDependency(schemaName, providerTlsSchemaVersion)) + ) + ), + FixtureArgs( + wd / "resources" / "memoization" / "target-stack" + ) + ) + }, + teardown = pulumi.fixture.teardown + ).test("resource memoization should work for all kinds of resources") { + case pulumi.FixtureMultiContext(ctx, Vector(ctx1, ctx2)) => + println(s"Source stack name: ${ctx1.stackName}, pulumi home: ${ctx.home}") + pulumi.up(ctx1.stackName).call(cwd = ctx1.programDir, env = ctx1.env) + val outputs1 = upickle.default.read[Map[String, ujson.Value]]( + pulumi.outputs(ctx1.stackName, "--show-secrets").call(cwd = ctx1.programDir, env = ctx1.env).out.text() + ) + + println(s"Target stack name: ${ctx2.stackName}, pulumi home: ${ctx.home}") + pulumi + .up(ctx2.stackName, "--config", s"sourceStack=organization/source-stack-test/${ctx1.stackName}") + .call(cwd = ctx2.programDir, env = ctx2.env) + val outputs2 = upickle.default.read[Map[String, ujson.Value]]( + pulumi.outputs(ctx2.stackName, "--show-secrets").call(cwd = ctx2.programDir, env = ctx2.env).out.text() + ) + + assertEquals(outputs1, outputs2) + + case _ => throw new Exception("Invalid number of contexts") + } + FunFixture[pulumi.FixtureMultiContext]( setup = { val schemaName = "tls" diff --git a/integration-tests/resources/memoization/source-stack/Main.scala b/integration-tests/resources/memoization/source-stack/Main.scala new file mode 100644 index 00000000..dba9b582 --- /dev/null +++ b/integration-tests/resources/memoization/source-stack/Main.scala @@ -0,0 +1,82 @@ +import besom.* +import besom.api.tls + +case class SomeTlsResources(x: Output[String], y: Output[String])(using ComponentBase) extends ComponentResource derives RegistersOutputs + +object SomeTlsResources: + def apply(name: NonEmptyString, sshKey: Output[tls.PrivateKey])(using Context): Output[SomeTlsResources] = + component(name, "user:component:SomeTlsResources", ComponentResourceOptions()) { + val selfSignedCert = tls.SelfSignedCert( + s"${name}-selfSignedCert", + tls.SelfSignedCertArgs( + allowedUses = List("server_auth"), + validityPeriodHours = 12, + privateKeyPem = sshKey.privateKeyPem + ) + ) + + SomeTlsResources(selfSignedCert.privateKeyPem, sshKey.publicKeyPem) + } + +//noinspection UnitMethodIsParameterless,TypeAnnotation +@main def main = Pulumi.run { + // this seems like it would create this resource on every call of this function + // ps this runs register resource call under the hood + def sshKey = tls.PrivateKey( + "sshKey", + tls.PrivateKeyArgs( + algorithm = "RSA", + rsaBits = 4096 + ) + ) + + // but it shouldn't because resource constructors should be idempotent + val sshKeyRsaBits = sshKey.rsaBits + val sshKeyAlgo = sshKey.algorithm + + // intermission for component idempotence testing + def tlsResources = SomeTlsResources("tlsResources", sshKey) + + val manyEvalsOfComponent = for + _ <- tlsResources + _ <- tlsResources + last <- tlsResources + yield last + // end intermission + + // intermission for idempotence of get methods (which execute ReadResource calls (sic!)) + def readResource = tls.PrivateKey.get("sshKey", sshKey.id) + + val manyEvalsOfReadResource = for + _ <- readResource + _ <- readResource + last <- readResource + yield last + // end intermission + + // intermission for idempotence of get resource calls + def getResource = tls.PrivateKey("read-sshKey", tls.PrivateKeyArgs(algorithm = "RSA", rsaBits = 4096), opts(urn = sshKey.urn)) + val manyEvalsOfGetResource = for + _ <- getResource + _ <- getResource + last <- getResource + yield last + // end intermission + + // intermission for idempotence of invoke methods + def getGoogleCert = tls.getCertificate(tls.GetCertificateArgs(url = "https://www.google.com")) + + val manyEvalsOfInvoke = for + _ <- getGoogleCert + _ <- getGoogleCert + last <- getGoogleCert + yield last + // end intermission + + // so this should work without any issues + Stack(manyEvalsOfComponent, manyEvalsOfReadResource, manyEvalsOfInvoke, manyEvalsOfGetResource).exports( + sshKeyRsaBits = sshKeyRsaBits, + sshKeyUrn = sshKey.urn, + sshKeyAlgo = sshKeyAlgo + ) +} diff --git a/integration-tests/resources/memoization/source-stack/Pulumi.yaml b/integration-tests/resources/memoization/source-stack/Pulumi.yaml new file mode 100644 index 00000000..e7b2b3d9 --- /dev/null +++ b/integration-tests/resources/memoization/source-stack/Pulumi.yaml @@ -0,0 +1,5 @@ +name: source-stack-test +description: Besom cross stack references source stack +runtime: + name: scala + diff --git a/integration-tests/resources/memoization/source-stack/project.scala b/integration-tests/resources/memoization/source-stack/project.scala new file mode 100644 index 00000000..eaec00c0 --- /dev/null +++ b/integration-tests/resources/memoization/source-stack/project.scala @@ -0,0 +1,7 @@ +//> using scala 3.3.1 +//> using options -java-output-version:11 -Werror -Wunused:all -Wvalue-discard -Wnonunit-statement +//> using plugin org.virtuslab::besom-compiler-plugin:0.2.3-SNAPSHOT +//> using dep org.virtuslab::besom-core:0.2.3-SNAPSHOT + +//> using repository sonatype:snapshots +//> using dep "org.virtuslab::besom-tls:5.0.0-core.0.2-SNAPSHOT" diff --git a/integration-tests/resources/memoization/target-stack/Main.scala b/integration-tests/resources/memoization/target-stack/Main.scala new file mode 100644 index 00000000..66f5ea7d --- /dev/null +++ b/integration-tests/resources/memoization/target-stack/Main.scala @@ -0,0 +1,45 @@ +package besom.internal + +import besom.* +import besom.json.* + +//noinspection UnitMethodIsParameterless,TypeAnnotation +@main def main = Pulumi.run { + val sourceStackName = config.requireString("sourceStack").map(NonEmptyString(_).get) + // StackReference is a resource too so we want to verify that calls to it are idempotent + def sourceStack = besom.StackReference( + "stackRef", + StackReferenceArgs(sourceStackName), + StackReferenceResourceOptions() + ) + + val justForTesting = for + _ <- sourceStack + _ <- sourceStack + yield () + + val sshKeyUrn = sourceStack.flatMap( + _.getOutput("sshKeyUrn") + .map { + case Some(JsString(s)) => s + case other => throw RuntimeException(s"Expected string, got $other") + } + .flatMap(URN.parse) + ) + + val rsaBits = sourceStack.flatMap(_.getOutput("sshKeyRsaBits").map { + case Some(JsNumber(d)) => d.toInt + case other => throw RuntimeException(s"Expected string, got $other") + }) + + val algo = sourceStack.flatMap(_.getOutput("sshKeyAlgo").map { + case Some(JsString(s)) => s + case other => throw RuntimeException(s"Expected string, got $other") + }) + + Stack(justForTesting).exports( + sshKeyUrn = sshKeyUrn, + sshKeyRsaBits = rsaBits, + sshKeyAlgo = algo + ) +} diff --git a/integration-tests/resources/memoization/target-stack/Pulumi.yaml b/integration-tests/resources/memoization/target-stack/Pulumi.yaml new file mode 100644 index 00000000..42dd13e8 --- /dev/null +++ b/integration-tests/resources/memoization/target-stack/Pulumi.yaml @@ -0,0 +1,5 @@ +name: target-stack-test +description: Besom cross stack references source stack +runtime: + name: scala + diff --git a/integration-tests/resources/memoization/target-stack/project.scala b/integration-tests/resources/memoization/target-stack/project.scala new file mode 100644 index 00000000..32cf3931 --- /dev/null +++ b/integration-tests/resources/memoization/target-stack/project.scala @@ -0,0 +1,6 @@ +//> using scala 3.3.1 +//> using options -java-output-version:11 -Werror -Wunused:all -Wvalue-discard -Wnonunit-statement +//> using plugin org.virtuslab::besom-compiler-plugin:0.2.3-SNAPSHOT +//> using dep org.virtuslab::besom-core:0.2.3-SNAPSHOT + +//> using repository sonatype:snapshots