Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memoization of pulumi side effects #429

Merged
merged 6 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions core/src/main/scala/besom/internal/Context.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ trait Context extends TaskTracker:
private[besom] def resources: Resources
private[besom] def runInfo: RunInfo
private[besom] def monitor: Monitor
private[besom] def memo: Memo
private[besom] def getParentURN: Result[URN]
private[besom] def config: Config
private[besom] def isDryRun: Boolean
Expand Down Expand Up @@ -106,6 +107,7 @@ class ContextImpl(
private[besom] val engine: Engine,
private[besom] val taskTracker: TaskTracker,
private[besom] val resources: Resources,
private[besom] val memo: Memo,
private val stackPromise: Promise[StackResource]
) extends Context
with TaskTracker:
Expand Down Expand Up @@ -229,9 +231,10 @@ object Context:
engine: Engine,
taskTracker: TaskTracker,
resources: Resources,
memo: Memo,
stackPromise: Promise[StackResource]
): Context =
new ContextImpl(runInfo, featureSupport, config, logger, monitor, engine, taskTracker, resources, stackPromise)
new ContextImpl(runInfo, featureSupport, config, logger, monitor, engine, taskTracker, resources, memo, stackPromise)

def apply(
runInfo: RunInfo,
Expand All @@ -244,8 +247,9 @@ object Context:
): Result[Context] =
for
resources <- Resources()
memo <- Memo()
stackPromise <- Promise[StackResource]()
yield Context.create(runInfo, featureSupport, config, logger, monitor, engine, taskTracker, resources, stackPromise)
yield Context.create(runInfo, featureSupport, config, logger, monitor, engine, taskTracker, resources, memo, stackPromise)

def apply(
runInfo: RunInfo,
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/scala/besom/internal/Memo.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package besom.internal

import java.util.concurrent.ConcurrentHashMap

pawelprazak marked this conversation as resolved.
Show resolved Hide resolved
class Memo(private val chm: ConcurrentHashMap[(String, String), Promise[?]]):
def memoize[A](typ: String, name: String, effect: Result[A]): Result[A] =
Promise[A]().flatMap { promise =>
val existing = chm.putIfAbsent((typ, name), promise)
if existing == null then
effect
.flatMap(promise.fulfill)
.recover(e => promise.fail(e))
.flatMap(_ => promise.get)
else existing.get.asInstanceOf[Result[A]]
}

object Memo:
def apply(): Result[Memo] = Result.defer {
val chm = ConcurrentHashMap[(String, String), Promise[?]]()

new Memo(chm)
}
12 changes: 7 additions & 5 deletions core/src/main/scala/besom/internal/ResourceDecoder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import besom.internal.logging.*
import besom.types.{Label, URN, ResourceId}

trait ResourceDecoder[A <: Resource]: // TODO rename to something more sensible
def makeResolver(using Context, BesomMDC[Label]): Result[(A, ResourceResolver[A])]
def makeResourceAndResolver(using Context, BesomMDC[Label]): Result[(A, ResourceResolver[A])]

object ResourceDecoder:
inline def forResource[R <: Resource: ResourceDecoder]: ResourceDecoder[R] = summon[ResourceDecoder[R]]

inline def derived[A <: Resource]: ResourceDecoder[A] = ${ derivedImpl[A] }

class CustomPropertyExtractor[A](propertyName: String, decoder: Decoder[A]):
Expand Down Expand Up @@ -60,7 +62,7 @@ object ResourceDecoder:

end CustomPropertyExtractor

def makeResolver[A <: Resource](
private[ResourceDecoder] def makeResourceAndResolver[A <: Resource](
fromProduct: Product => A,
customPropertyExtractors: Vector[CustomPropertyExtractor[?]]
)(using Context, BesomMDC[Label]): Result[(A, ResourceResolver[A])] =
Expand Down Expand Up @@ -126,7 +128,7 @@ object ResourceDecoder:

(resource, resolver)
}
end makeResolver
end makeResourceAndResolver

private def derivedImpl[A <: Resource: Type](using q: Quotes): Expr[ResourceDecoder[A]] =
Expr.summon[Mirror.Of[A]].get match
Expand Down Expand Up @@ -156,8 +158,8 @@ object ResourceDecoder:

'{
new ResourceDecoder[A]:
def makeResolver(using Context, BesomMDC[Label]): Result[(A, ResourceResolver[A])] =
ResourceDecoder.makeResolver(
def makeResourceAndResolver(using Context, BesomMDC[Label]): Result[(A, ResourceResolver[A])] =
ResourceDecoder.makeResourceAndResolver(
fromProduct = ${ m }.fromProduct,
customPropertyExtractors = ${ customPropertyExtractorsExpr }.toVector
)
Expand Down
91 changes: 57 additions & 34 deletions core/src/main/scala/besom/internal/ResourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,36 @@ import scala.annotation.unused

type Providers = Map[String, ProviderResource]

enum Mode(val logStr: String):
case GetWithUrn(urn: URN) extends Mode("Getting")
case ReadWithId(id: ResourceId) extends Mode("Reading")
case Register extends Mode("Registering")

override def toString(): String = logStr

def suffix: String = this match
case GetWithUrn(urn) => s"from Pulumi state with URN ${urn.asString}"
case ReadWithId(id) => s"from infrastructure with foreign id (import id) ${id}"
case Register => ""

class ResourceOps(using ctx: Context, mdc: BesomMDC[Label]):

// register resource outputs *NEEDS TO* be memoized
private[besom] def registerResourceOutputsInternal(
urnResult: Result[URN],
outputs: Result[Struct]
): Result[Unit] =
urnResult.flatMap { urn =>
outputs.flatMap { struct =>
val runSideEffects = outputs.flatMap { struct =>
val request = RegisterResourceOutputsRequest(
urn = urn.asString,
outputs = if struct.fields.isEmpty then None else Some(struct)
)

ctx.monitor.registerResourceOutputs(request)
}

ctx.memo.memoize("registerResourceOutputs", urn.asString, runSideEffects)
}

private[besom] def readOrRegisterResourceInternal[R <: Resource: ResourceDecoder, A: ArgsEncoder](
Expand All @@ -37,34 +52,30 @@ class ResourceOps(using ctx: Context, mdc: BesomMDC[Label]):
options: ResourceOptions,
remote: Boolean
): Result[R] =
summon[ResourceDecoder[R]].makeResolver.flatMap { (resource, resolver) =>
// this order is then repeated in registerReadOrGetResource
val modeResult = options.hasURN.zip(options.hasImportId).map { case (hasUrn, hasImportId) =>
if hasUrn then "Getting" else if hasImportId then "Reading" else "Registering"
}

modeResult.flatMap { mode =>
log.debug(s"$mode resource, trying to add to cache...") *>
ctx.resources.cacheResource(typ, name, args, options, resource).flatMap { addedToCache =>
if addedToCache then
for
options <- options.resolve
_ <- log.debug(s"$mode resource, added to cache...")
state <- createResourceState(typ, name, resource, options, remote)
_ <- log.debug(s"Created resource state")
_ <- ctx.resources.add(resource, state)
_ <- log.debug(s"Added resource to resources")
inputs <- prepareResourceInputs(resource, state, args, options)
_ <- log.debug(s"Prepared inputs for resource")
_ <- addChildToParentResource(resource, options.parent)
_ <- log.debug(s"Added child to parent (isDefined: ${options.parent.isDefined})")
_ <- registerReadOrGetResource(resource, state, resolver, inputs, options, remote)
yield resource
else ctx.resources.getCachedResource(typ, name, args, options).map(_.asInstanceOf[R])
}
}
resolveMode(options).flatMap { mode =>
def runSideEffects =
for
(resource, resolver) <- ResourceDecoder.forResource[R].makeResourceAndResolver
_ <- log.debug(s"$mode resource ${mode.suffix}")
options <- options.resolve
_ <- log.debug(s"$mode resource, added to cache...")
state <- createResourceState(typ, name, resource, options, remote)
_ <- log.debug(s"Created resource state")
_ <- ctx.resources.add(resource, state)
_ <- log.debug(s"Added resource to resources")
inputs <- prepareResourceInputs(resource, state, args, options)
_ <- log.debug(s"Prepared inputs for resource")
_ <- addChildToParentResource(resource, options.parent)
_ <- log.debug(s"Added child to parent (isDefined: ${options.parent.isDefined})")
_ <- registerReadOrGetResource(resource, state, resolver, inputs, options, remote)
yield resource

mode match
pawelprazak marked this conversation as resolved.
Show resolved Hide resolved
case Mode.GetWithUrn(_) => runSideEffects // DO NOT memoize Get
case Mode.Register | Mode.ReadWithId(_) => ctx.memo.memoize(typ, name, runSideEffects) // DO memoize Register and Read
}

// invoke is not memoized
private[besom] def invokeInternal[A: ArgsEncoder, R: Decoder](tok: FunctionToken, args: A, opts: InvokeOptions): Output[R] =
def decodeResponse(resultAsValue: Value, props: Map[String, Set[Resource]]): Result[OutputData[R]] =
val resourceLabel = mdc.get(Key.LabelKey)
Expand Down Expand Up @@ -132,13 +143,25 @@ class ResourceOps(using ctx: Context, mdc: BesomMDC[Label]):
for
maybeProviderId <- maybeProviderIdResult
req <- buildInvokeRequest(invokeArgs.serialized, maybeProviderId, version)
_ <- log.debug(s"Invoke RPC prepared, req=${pprint(req)}")
_ <- log.debug(s"Invoke RPC prepared, req:\n${pprint(req)}")
res <- ctx.monitor.invoke(req)
_ <- log.debug(s"Invoke RPC executed, res=${pprint(res)}")
_ <- log.debug(s"Invoke RPC executed, res:\n${pprint(res)}")
parsed <- parseInvokeResponse(tok, res)
yield parsed
end executeInvoke

private[internal] def resolveMode(options: ResourceOptions): Result[Mode] =
// this order is then repeated in registerReadOrGetResource
options.hasURN.zip(options.hasImportId).flatMap { case (hasUrn, hasImportId) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'd move this internal logic to the companion object of the Mode and use pat-mat instead of if for readability because it allow for a sort of "table of allowed states"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't be patmat, this is async

if hasUrn then options.getURN.map(Mode.GetWithUrn(_))
else if hasImportId then
options.getImportId.flatMap {
case Some(importId) => Result.pure(Mode.ReadWithId(importId))
case None => Result.fail(Exception("importId can't be empty here :|")) // sanity check
}
else Result.pure(Mode.Register)
}

private[internal] def registerReadOrGetResource[R <: Resource](
resource: Resource,
state: ResourceState,
Expand Down Expand Up @@ -203,7 +226,7 @@ class ResourceOps(using ctx: Context, mdc: BesomMDC[Label]):

for
_ <- log.debug(s"Resolving resource ${state.asLabel} with $debugMessageSuffix")
_ <- log.trace(s"Resolving resource ${state.asLabel} with: ${pprint(eitherErrorOrResult)}")
_ <- log.trace(s"Resolving resource ${state.asLabel} with:\n${pprint(eitherErrorOrResult)}")
errOrUnit <- resolver.resolve(eitherErrorOrResult).either
_ <- errOrUnit.fold(ctx.fail, _ => Result.unit) // fail context if resource resolution fails
errOrUnitMsg = errOrUnit.fold(t => s"with an error: ${t.getMessage}", _ => "successfully")
Expand Down Expand Up @@ -387,14 +410,14 @@ class ResourceOps(using ctx: Context, mdc: BesomMDC[Label]):
.flatMap { req =>
for
_ <- log.debug(s"Executing ReadResourceRequest for ${state.asLabel}")
_ <- log.trace(s"ReadResourceRequest for ${state.asLabel}: ${pprint(req)}")
_ <- log.trace(s"ReadResourceRequest for ${state.asLabel}:\n${pprint(req)}")
resp <- ctx.monitor.readResource(req)
yield resp
}
.flatMap { response =>
for
_ <- log.debug(s"Received ReadResourceResponse for ${state.asLabel}")
_ <- log.trace(s"ReadResourceResponse for ${state.asLabel}: ${pprint(response)}")
_ <- log.trace(s"ReadResourceResponse for ${state.asLabel}:\n${pprint(response)}")
rawResourceResult <- RawResourceResult.fromResponse(response, id)
yield rawResourceResult
}
Expand Down Expand Up @@ -488,14 +511,14 @@ class ResourceOps(using ctx: Context, mdc: BesomMDC[Label]):
.flatMap { req =>
for
_ <- log.debug(s"Executing RegisterResourceRequest for ${state.asLabel}")
_ <- log.trace(s"RegisterResourceRequest for ${state.asLabel}: ${pprint(req)}")
_ <- log.trace(s"RegisterResourceRequest for ${state.asLabel}:\n${pprint(req)}")
resp <- ctx.monitor.registerResource(req)
yield resp
}
.flatMap { response =>
for
_ <- log.debug(s"Received RegisterResourceResponse for ${state.asLabel}")
_ <- log.trace(s"RegisterResourceResponse for ${state.asLabel}: ${pprint(response)}")
_ <- log.trace(s"RegisterResourceResponse for ${state.asLabel}:\n${pprint(response)}")
rawResourceResult <- RawResourceResult.fromResponse(response)
yield rawResourceResult
}
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/besom/internal/ResourceOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ sealed trait ResourceOptions:

private[besom] def hasURN: Result[Boolean] = urn.map(_.isDefined).getValueOrElse(false)

private[besom] def getURN: Result[URN] = urn.getValueOrElse(None).flatMap {
case Some(urn) => Result.pure(urn)
case None => Result.fail(Exception("URN is not defined"))
}

private[besom] def hasImportId(using Context): Result[Boolean] = this match
case cr: CustomResourceOptions => cr.importId.map(_.isDefined).getValueOrElse(false)
case sr: StackReferenceResourceOptions => sr.importId.map(_.isDefined).getValueOrElse(false)
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/besom/internal/Result.scala
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ enum Result[+A]:
case Sleep(r: () => Result[A], duration: Long, debug: Debug)
case GetFinalizers(debug: Debug) extends Result[Finalizers]

def withFilter(p: A => Boolean)(using Debug): Result[A] = this

def flatMap[B](f: A => Result[B])(using Debug): Result[B] = BiFlatMap(
this,
{
Expand Down
28 changes: 3 additions & 25 deletions core/src/main/scala/besom/internal/resources.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
package besom.internal

import scala.annotation.unused

//noinspection ScalaFileName
class Resources private (
private val resources: Ref[Map[Resource, ResourceState]],
private val cache: Ref[Map[(String, String), Promise[Resource]]]
):
class Resources private (private val resources: Ref[Map[Resource, ResourceState]]):
def add(resource: ProviderResource, state: ProviderResourceState): Result[Unit] =
resources.update(_ + (resource -> state))

Expand Down Expand Up @@ -86,26 +80,10 @@ class Resources private (
def updateStateFor(resource: Resource)(f: ResourceState => ResourceState): Result[Unit] =
resources.update(_.updatedWith(resource)(_.map(f)))

def cacheResource(typ: String, name: String, @unused args: Any, @unused opts: ResourceOptions, resource: Resource): Result[Boolean] =
cache.get.flatMap(_.get((typ, name)) match
case Some(_) => Result.pure(false)
case None =>
Promise[Resource]().flatMap { promise =>
cache
.update(_.updated((typ, name), promise))
.flatMap(_ => promise.fulfill(resource) *> Result.pure(true))
}
)

def getCachedResource(typ: String, name: String, @unused args: Any, @unused opts: ResourceOptions): Result[Resource] =
cache.get.flatMap(_((typ, name)).get)

end Resources

//noinspection ScalaFileName
object Resources:
def apply(): Result[Resources] =
for
resources <- Ref(Map.empty[Resource, ResourceState])
cache <- Ref(Map.empty[(String, String), Promise[Resource]])
yield new Resources(resources, cache)
for resources <- Ref(Map.empty[Resource, ResourceState])
yield new Resources(resources)
3 changes: 2 additions & 1 deletion core/src/test/scala/besom/internal/DummyContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ object DummyContext:
logger <- BesomLogger.local()
config <- Config(runInfo.project, isProjectName = true, configMap = configMap, configSecretKeys = configSecretKeys)
resources <- Resources()
given Context = Context.create(runInfo, featureSupport, config, logger, monitor, engine, taskTracker, resources, stackPromise)
memo <- Memo()
given Context = Context.create(runInfo, featureSupport, config, logger, monitor, engine, taskTracker, resources, memo, stackPromise)
_ <- stackPromise.fulfill(StackResource()(using ComponentBase(Output(besom.types.URN.empty))))
yield summon[Context]

Expand Down
Loading
Loading