Skip to content

Commit

Permalink
Merge branch 'develop' into WM-2291_cbas_pact_testing
Browse files Browse the repository at this point in the history
  • Loading branch information
zykonda committed Dec 1, 2023
2 parents 07438b4 + 84b4480 commit 5e317cd
Show file tree
Hide file tree
Showing 22 changed files with 554 additions and 89 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cromwell.backend.standard

import java.io.IOException

import akka.actor.{Actor, ActorLogging, ActorRef}
import akka.event.LoggingReceive
import cats.implicits._
Expand Down Expand Up @@ -62,6 +61,10 @@ case class DefaultStandardAsyncExecutionActorParams
override val minimumRuntimeSettings: MinimumRuntimeSettings
) extends StandardAsyncExecutionActorParams

// Typically we want to "executeInSubshell" for encapsulation of bash code.
// Override to `false` when we need the script to set an environment variable in the parent shell.
case class ScriptPreambleData(bashString: String, executeInSubshell: Boolean = true)

/**
* An extension of the generic AsyncBackendJobExecutionActor providing a standard abstract implementation of an
* asynchronous polling backend.
Expand Down Expand Up @@ -329,7 +332,7 @@ trait StandardAsyncExecutionActor
}

/** Any custom code that should be run within commandScriptContents before the instantiated command. */
def scriptPreamble: String = ""
def scriptPreamble: ErrorOr[ScriptPreambleData] = ScriptPreambleData("").valid

def cwd: Path = commandDirectory
def rcPath: Path = cwd./(jobPaths.returnCodeFilename)
Expand Down Expand Up @@ -427,10 +430,27 @@ trait StandardAsyncExecutionActor
|find . -type d -exec sh -c '[ -z "$$(ls -A '"'"'{}'"'"')" ] && touch '"'"'{}'"'"'/.file' \\;
|)""".stripMargin)

val errorOrPreamble: ErrorOr[String] = scriptPreamble.map{ preambleData =>
preambleData.executeInSubshell match {
case true =>
s"""
|(
|cd ${cwd.pathAsString}
|${preambleData.bashString}
|)
|""".stripMargin
case false =>
s"""
|cd ${cwd.pathAsString}
|${preambleData.bashString}
|""".stripMargin
}
}

// The `tee` trickery below is to be able to redirect to known filenames for CWL while also streaming
// stdout and stderr for PAPI to periodically upload to cloud storage.
// https://stackoverflow.com/questions/692000/how-do-i-write-stderr-to-a-file-while-using-tee-with-a-pipe
(errorOrDirectoryOutputs, errorOrGlobFiles).mapN((directoryOutputs, globFiles) =>
(errorOrDirectoryOutputs, errorOrGlobFiles, errorOrPreamble).mapN((directoryOutputs, globFiles, preamble) =>
s"""|#!$jobShell
|DOCKER_OUTPUT_DIR_LINK
|cd ${cwd.pathAsString}
Expand All @@ -439,10 +459,9 @@ trait StandardAsyncExecutionActor
|export _JAVA_OPTIONS=-Djava.io.tmpdir="$$tmpDir"
|export TMPDIR="$$tmpDir"
|export HOME="$home"
|(
|cd ${cwd.pathAsString}
|
|SCRIPT_PREAMBLE
|)
|
|$out="$${tmpDir}/out.$$$$" $err="$${tmpDir}/err.$$$$"
|mkfifo "$$$out" "$$$err"
|trap 'rm "$$$out" "$$$err"' EXIT
Expand All @@ -464,7 +483,7 @@ trait StandardAsyncExecutionActor
|)
|mv $rcTmpPath $rcPath
|""".stripMargin
.replace("SCRIPT_PREAMBLE", scriptPreamble)
.replace("SCRIPT_PREAMBLE", preamble)
.replace("ENVIRONMENT_VARIABLES", environmentVariables)
.replace("INSTANTIATED_COMMAND", commandString)
.replace("SCRIPT_EPILOGUE", scriptEpilogue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ files {

metadata {
status: Succeeded
"outputs.cpus.cascadeLake.cpuPlatform": "Intel Cascade Lake"
"outputs.cpus.broadwell.cpuPlatform": "Intel Broadwell"
"outputs.cpus.haswell.cpuPlatform": "Intel Haswell"
"outputs.cpus.cascadeLake.cpuPlatform": "Intel Cascade Lake"
"outputs.cpus.iceLake.cpuPlatform": "Intel Ice Lake"
"outputs.cpus.rome.cpuPlatform": "AMD Rome"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ task cpu_platform {
}

workflow cpus {
call cpu_platform as haswell { input: cpu_platform = "Intel Haswell" }
call cpu_platform as broadwell { input: cpu_platform = "Intel Broadwell" }
call cpu_platform as cascadeLake { input: cpu_platform = "Intel Cascade Lake" }
call cpu_platform as rome {input: cpu_platform = "AMD Rome" }
call cpu_platform as haswell { input: cpu_platform = "Intel Haswell" }
call cpu_platform as broadwell { input: cpu_platform = "Intel Broadwell" }
call cpu_platform as cascadeLake { input: cpu_platform = "Intel Cascade Lake" }
call cpu_platform as iceLake { input: cpu_platform = "Intel Ice Lake" }
call cpu_platform as rome { input: cpu_platform = "AMD Rome" }
}
1 change: 1 addition & 0 deletions docs/RuntimeAttributes.md
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ runtime {
Note that when this options is specified, make sure the requested CPU platform is [available](https://cloud.google.com/compute/docs/regions-zones/#available) in the `zones` you selected.

The following CPU platforms are currently supported by the Google Cloud backend:
- `Intel Ice Lake`
- `Intel Cascade Lake`
- `Intel Skylake`
- `Intel Broadwell`
Expand Down
1 change: 1 addition & 0 deletions engine/src/main/scala/cromwell/engine/io/IoActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ final class IoActor(ioConfig: IoConfig,

override def onBackpressure(scale: Option[Double] = None): Unit = {
incrementBackpressure()
log.warning("IoActor notifying HighLoad")
serviceRegistryActor ! LoadMetric("IO", HighLoad)

val uncappedDelay = scale.getOrElse(1.0d) * LoadConfig.IoNormalWindowMinimum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import java.nio.file.spi.FileSystemProvider
import java.time.temporal.ChronoUnit
import java.time.{Duration, OffsetDateTime}
import java.util.UUID
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -160,12 +161,14 @@ object BlobSasTokenGenerator {
*/
def createBlobTokenGenerator(workspaceManagerClient: WorkspaceManagerApiClientProvider,
overrideWsmAuthToken: Option[String]): BlobSasTokenGenerator = {
WSMBlobSasTokenGenerator(workspaceManagerClient, overrideWsmAuthToken)
new WSMBlobSasTokenGenerator(workspaceManagerClient, overrideWsmAuthToken)
}

}

case class WSMBlobSasTokenGenerator(wsmClientProvider: WorkspaceManagerApiClientProvider,
case class WSMTerraCoordinates(wsmEndpoint: String, workspaceId: UUID, containerResourceId: UUID)

class WSMBlobSasTokenGenerator(wsmClientProvider: WorkspaceManagerApiClientProvider,
overrideWsmAuthToken: Option[String]) extends BlobSasTokenGenerator {

/**
Expand All @@ -178,17 +181,14 @@ case class WSMBlobSasTokenGenerator(wsmClientProvider: WorkspaceManagerApiClient
* @return an AzureSasCredential for accessing a blob container
*/
def generateBlobSasToken(endpoint: EndpointURL, container: BlobContainerName): Try[AzureSasCredential] = {
val wsmAuthToken: Try[String] = overrideWsmAuthToken match {
case Some(t) => Success(t)
case None => AzureCredentials.getAccessToken(None).toTry
}
val wsmAuthToken: Try[String] = getWsmAuth
container.workspaceId match {
// If this is a Terra workspace, request a token from WSM
case Success(workspaceId) => {
(for {
wsmAuth <- wsmAuthToken
wsmAzureResourceClient = wsmClientProvider.getControlledAzureResourceApi(wsmAuth)
resourceId <- getContainerResourceId(workspaceId, container, wsmAuth)
resourceId <- getContainerResourceId(workspaceId, container, Option(wsmAuth))
sasToken <- wsmAzureResourceClient.createAzureStorageContainerSasToken(workspaceId, resourceId)
} yield sasToken).recoverWith {
// If the storage account was still not found in WSM, this may be a public filesystem
Expand All @@ -201,9 +201,59 @@ case class WSMBlobSasTokenGenerator(wsmClientProvider: WorkspaceManagerApiClient
}
}

def getContainerResourceId(workspaceId: UUID, container: BlobContainerName, wsmAuth : String): Try[UUID] = {
val wsmResourceClient = wsmClientProvider.getResourceApi(wsmAuth)
wsmResourceClient.findContainerResourceId(workspaceId, container)
private val cachedContainerResourceIds = new mutable.HashMap[BlobContainerName, UUID]()

// Optionally provide wsmAuth to avoid acquiring it twice in generateBlobSasToken.
// In the case that the resourceId is not cached and no auth is provided, this function will acquire a new auth as necessary.
private def getContainerResourceId(workspaceId: UUID, container: BlobContainerName, precomputedWsmAuth: Option[String]): Try[UUID] = {
cachedContainerResourceIds.get(container) match {
case Some(id) => Try(id) //cache hit
case _ => { //cache miss
val auth: Try[String] = precomputedWsmAuth.map(auth => Try(auth)).getOrElse(getWsmAuth)
val resourceId = for {
wsmAuth <- auth
wsmResourceApi = wsmClientProvider.getResourceApi(wsmAuth)
resourceId <- wsmResourceApi.findContainerResourceId(workspaceId, container)
} yield resourceId
resourceId.map(id => cachedContainerResourceIds.put(container, id)) //NB: Modifying cache state here.
cachedContainerResourceIds.get(container) match {
case Some(uuid) => Try(uuid)
case _ => Failure(new NoSuchElementException("Could not retrieve container resource ID from WSM"))
}
}
}
}

private def getWsmAuth: Try[String] = {
overrideWsmAuthToken match {
case Some(t) => Success(t)
case None => AzureCredentials.getAccessToken(None).toTry
}
}

private def parseTerraWorkspaceIdFromPath(blobPath: BlobPath): Try[UUID] = {
if (blobPath.container.value.startsWith("sc-")) Try(UUID.fromString(blobPath.container.value.substring(3)))
else Failure(new Exception("Could not parse workspace ID from storage container. Are you sure this is a file in a Terra Workspace?"))
}

/**
* Return a REST endpoint that will reply with a sas token for the blob storage container associated with the provided blob path.
* @param blobPath A blob path of a file living in a blob container that WSM knows about (likely a workspace container).
* @param tokenDuration How long will the token last after being generated. Default is 8 hours. Sas tokens won't last longer than 24h.
* NOTE: If a blobPath is provided for a file in a container other than what this token generator was constructed for,
* this function will make two REST requests. Otherwise, the relevant data is already cached locally.
*/
def getWSMSasFetchEndpoint(blobPath: BlobPath, tokenDuration: Option[Duration] = None): Try[String] = {
val wsmEndpoint = wsmClientProvider.getBaseWorkspaceManagerUrl
val lifetimeQueryParameters: String = tokenDuration.map(d => s"?sasExpirationDuration=${d.toSeconds.intValue}").getOrElse("")
val terraInfo: Try[WSMTerraCoordinates] = for {
workspaceId <- parseTerraWorkspaceIdFromPath(blobPath)
containerResourceId <- getContainerResourceId(workspaceId, blobPath.container, None)
coordinates = WSMTerraCoordinates(wsmEndpoint, workspaceId, containerResourceId)
} yield coordinates
terraInfo.map{terraCoordinates =>
s"${terraCoordinates.wsmEndpoint}/api/workspaces/v1/${terraCoordinates.workspaceId.toString}/resources/controlled/azure/storageContainer/${terraCoordinates.containerResourceId.toString}/getSasToken${lifetimeQueryParameters}"
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ case class BlobPath private[blob](pathString: String, endpoint: EndpointURL, con
* @return Path string relative to the container root.
*/
def pathWithoutContainer : String = pathString


def getFilesystemManager: BlobFileSystemManager = fsm

override def getSymlinkSafePath(options: LinkOption*): Path = toAbsolutePath

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import scala.util.Try
trait WorkspaceManagerApiClientProvider {
def getControlledAzureResourceApi(token: String): WsmControlledAzureResourceApi
def getResourceApi(token: String): WsmResourceApi
def getBaseWorkspaceManagerUrl: String
}

class HttpWorkspaceManagerClientProvider(baseWorkspaceManagerUrl: WorkspaceManagerURL) extends WorkspaceManagerApiClientProvider {
Expand All @@ -40,6 +41,7 @@ class HttpWorkspaceManagerClientProvider(baseWorkspaceManagerUrl: WorkspaceManag
apiClient.setAccessToken(token)
WsmControlledAzureResourceApi(new ControlledAzureResourceApi(apiClient))
}
def getBaseWorkspaceManagerUrl: String = baseWorkspaceManagerUrl.value
}

case class WsmResourceApi(resourcesApi : ResourceApi) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import akka.routing.Listen
import cats.data.NonEmptyList
import com.typesafe.config.{Config, ConfigFactory, ConfigObject}
import cromwell.core.Dispatcher.ServiceDispatcher
import cromwell.services.loadcontroller.LoadControllerService.LoadMetric
import cromwell.util.GracefulShutdownHelper
import cromwell.util.GracefulShutdownHelper.ShutdownCommand
import net.ceedubs.ficus.Ficus._
Expand Down Expand Up @@ -82,7 +83,9 @@ class ServiceRegistryActor(globalConfig: Config) extends Actor with ActorLogging
def receive = {
case msg: ServiceRegistryMessage =>
services.get(msg.serviceName) match {
case Some(ref) => ref.tell(transform(msg, sender()), sender())
case Some(ref) =>
debugLogLoadMessages(msg, sender())
ref.tell(transform(msg, sender()), sender())
case None =>
log.error("Received ServiceRegistryMessage requesting service '{}' for which no service is configured. Message: {}", msg.serviceName, msg)
sender() ! ServiceRegistryFailure(msg.serviceName)
Expand All @@ -107,6 +110,15 @@ class ServiceRegistryActor(globalConfig: Config) extends Actor with ActorLogging
sender() ! ServiceRegistryFailure("Message is not a ServiceRegistryMessage: " + fool)
}

private def debugLogLoadMessages(msg: ServiceRegistryMessage, sender: ActorRef): Unit = {
msg match {
case msg: LoadMetric =>
log.debug(s"Service Registry Actor receiving $msg message from $sender")
case _ =>
()
}
}

/**
* Set the supervision strategy such that any of the individual service actors fails to initialize that we'll pass
* the error up the chain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import cromwell.backend.google.batch.runnable.WorkflowOptionKeys
import cromwell.backend.google.batch.util.{GcpBatchReferenceFilesMappingOperations, RuntimeOutputMapping}
import cromwell.filesystems.gcs.GcsPathBuilder
import cromwell.filesystems.gcs.GcsPathBuilder.ValidFullGcsPath

import java.io.FileNotFoundException
import cromwell.backend.standard.{StandardAdHocValue, StandardAsyncExecutionActor, StandardAsyncExecutionActorParams, StandardAsyncJob}
import cromwell.backend.standard.{ScriptPreambleData, StandardAdHocValue, StandardAsyncExecutionActor, StandardAsyncExecutionActorParams, StandardAsyncJob}
import cromwell.core._
import cromwell.core.io.IoCommandBuilder
import cromwell.core.path.{DefaultPathBuilder, Path}
Expand All @@ -49,6 +50,7 @@ import wom.core.FullyQualifiedName
import wom.expression.{FileEvaluation, NoIoFunctionSet}
import wom.format.MemorySize
import wom.values._

import java.io.OutputStreamWriter
import java.nio.charset.Charset
import java.util.Base64
Expand Down Expand Up @@ -663,12 +665,13 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
private val DockerMonitoringLogPath: Path = GcpBatchWorkingDisk.MountPoint.resolve(gcpBatchCallPaths.batchMonitoringLogFilename)
private val DockerMonitoringScriptPath: Path = GcpBatchWorkingDisk.MountPoint.resolve(gcpBatchCallPaths.batchMonitoringScriptFilename)

override def scriptPreamble: String = {
if (monitoringOutput.isDefined) {
override def scriptPreamble: ErrorOr[ScriptPreambleData] = {
if (monitoringOutput.isDefined)
ScriptPreambleData(
s"""|touch $DockerMonitoringLogPath
|chmod u+x $DockerMonitoringScriptPath
|$DockerMonitoringScriptPath > $DockerMonitoringLogPath &""".stripMargin
} else ""
|$DockerMonitoringScriptPath > $DockerMonitoringLogPath &""".stripMargin).valid
else ScriptPreambleData("").valid
}

private[actors] def generateInputs(jobDescriptor: BackendJobDescriptor): Set[GcpBatchInput] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@ object MachineConstraints {
if (googleLegacyMachineSelection) {
s"predefined-$cpu-${memory.to(MemoryUnit.MB).amount.intValue()}"
} else {
// If someone requests Intel Cascade Lake as their CPU platform then switch the machine type to n2.
// Similarly, CPU platform of AMD Rome corresponds to the machine type n2d.
// Users specify a CPU platform in their WDL, but GCP also needs to know which machine type to use.
// The below logic infers the machine type from the requested CPU.
// The heuristic we're using is: find the newest 'General Purpose' type that supports the given CPU.
// https://cloud.google.com/compute/docs/machine-resource
// For example, if someone requests Intel Cascade Lake as their CPU platform, then infer the n2 machine type.
// Infer n2d from AMD Rome, etc.
val customMachineType =
cpuPlatformOption match {
case Some(PipelinesApiRuntimeAttributes.CpuPlatformIntelIceLakeValue) => N2CustomMachineType
case Some(PipelinesApiRuntimeAttributes.CpuPlatformIntelCascadeLakeValue) => N2CustomMachineType
case Some(PipelinesApiRuntimeAttributes.CpuPlatformAMDRomeValue) => N2DCustomMachineType
case _ => N1CustomMachineType
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cromwell.backend.google.pipelines.common

import java.net.SocketTimeoutException

import _root_.io.grpc.Status
import akka.actor.ActorRef
import akka.http.scaladsl.model.{ContentType, ContentTypes}
Expand All @@ -27,7 +26,7 @@ import cromwell.backend.google.pipelines.common.errors.FailedToDelocalizeFailure
import cromwell.backend.google.pipelines.common.io._
import cromwell.backend.google.pipelines.common.monitoring.{CheckpointingConfiguration, MonitoringImage}
import cromwell.backend.io.DirectoryFunctions
import cromwell.backend.standard.{StandardAdHocValue, StandardAsyncExecutionActor, StandardAsyncExecutionActorParams, StandardAsyncJob}
import cromwell.backend.standard.{ScriptPreambleData, StandardAdHocValue, StandardAsyncExecutionActor, StandardAsyncExecutionActorParams, StandardAsyncJob}
import cromwell.core._
import cromwell.core.io.IoCommandBuilder
import cromwell.core.path.{DefaultPathBuilder, Path}
Expand Down Expand Up @@ -380,12 +379,13 @@ class PipelinesApiAsyncBackendJobExecutionActor(override val standardParams: Sta

private lazy val isDockerImageCacheUsageRequested = runtimeAttributes.useDockerImageCache.getOrElse(useDockerImageCache(jobDescriptor.workflowDescriptor))

override def scriptPreamble: String = {
if (monitoringOutput.isDefined) {
override def scriptPreamble: ErrorOr[ScriptPreambleData] = {
if (monitoringOutput.isDefined)
ScriptPreambleData(
s"""|touch $DockerMonitoringLogPath
|chmod u+x $DockerMonitoringScriptPath
|$DockerMonitoringScriptPath > $DockerMonitoringLogPath &""".stripMargin
} else ""
|$DockerMonitoringScriptPath > $DockerMonitoringLogPath &""".stripMargin).valid
else ScriptPreambleData("").valid
}

override def globParentDirectory(womGlobFile: WomGlobFile): Path = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ object PipelinesApiRuntimeAttributes {
// via `gcloud compute zones describe us-central1-a`
val CpuPlatformIntelCascadeLakeValue = "Intel Cascade Lake"
val CpuPlatformAMDRomeValue = "AMD Rome"
val CpuPlatformIntelIceLakeValue = "Intel Ice Lake"

val UseDockerImageCacheKey = "useDockerImageCache"
private val useDockerImageCacheValidationInstance = new BooleanRuntimeAttributesValidation(UseDockerImageCacheKey).optional
Expand Down
Loading

0 comments on commit 5e317cd

Please sign in to comment.