Skip to content

Commit

Permalink
Merge branch 'develop' into WM-2291_cbas_pact_testing
Browse files Browse the repository at this point in the history
  • Loading branch information
zykonda committed Dec 7, 2023
2 parents 6961e67 + fede859 commit d8145e9
Show file tree
Hide file tree
Showing 1,576 changed files with 49,513 additions and 33,048 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ jobs:
- build_type: centaurPapiV2beta
build_mysql: 5.7
friendly_name: Centaur Papi V2 Beta with MySQL 5.7
- build_type: centaurPapiV2betaRestart
build_mysql: 5.7
friendly_name: Centaur Papi V2 Beta (restart)
- build_type: dbms
friendly_name: DBMS
- build_type: centaurTes
Expand Down
18 changes: 18 additions & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version = 3.7.17
align.preset = none
align.openParenCallSite = true
align.openParenDefnSite = true
maxColumn = 120
continuationIndent.defnSite = 2
assumeStandardLibraryStripMargin = true
align.stripMargin = true
danglingParentheses.preset = true
rewrite.rules = [Imports, RedundantBraces, RedundantParens, SortModifiers]
rewrite.imports.sort = scalastyle
docstrings.style = keep
project.excludeFilters = [
Dependencies.scala,
Settings.scala,
build.sbt
]
runner.dialect = scala213
23 changes: 13 additions & 10 deletions CromIAM/src/main/scala/cromiam/auth/Collection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import scala.util.{Success, Try}
final case class Collection(name: String) extends AnyVal

object Collection {

/**
* Parses a raw JSON string to make sure it fits the standard pattern (see below) for labels,
* performs some CromIAM-specific checking to ensure the user isn't attempting to manipulate the
Expand All @@ -19,13 +20,14 @@ object Collection {
*/
def validateLabels(labelsJson: Option[String]): Directive1[Option[Map[String, JsValue]]] = {

val labels = labelsJson map { l =>
Try(l.parseJson) match {
case Success(JsObject(json)) if json.keySet.contains(CollectionLabelName) => throw new LabelContainsCollectionException
case Success(JsObject(json)) => json
case _ => throw InvalidLabelsException(l)
}
val labels = labelsJson map { l =>
Try(l.parseJson) match {
case Success(JsObject(json)) if json.keySet.contains(CollectionLabelName) =>
throw new LabelContainsCollectionException
case Success(JsObject(json)) => json
case _ => throw InvalidLabelsException(l)
}
}

provide(labels)
}
Expand All @@ -34,15 +36,16 @@ object Collection {
val LabelsKey = "labels"

// LabelContainsCollectionException is a class because of ScalaTest, some of the constructs don't play well w/ case objects
final class LabelContainsCollectionException extends Exception(s"Submitted labels contain the key $CollectionLabelName, which is not allowed\n")
final case class InvalidLabelsException(labels: String) extends Exception(s"Labels must be a valid JSON object, received: $labels\n")
final class LabelContainsCollectionException
extends Exception(s"Submitted labels contain the key $CollectionLabelName, which is not allowed\n")
final case class InvalidLabelsException(labels: String)
extends Exception(s"Labels must be a valid JSON object, received: $labels\n")

/**
* Returns the default collection for a user.
*/
def forUser(user: User): Collection = {
def forUser(user: User): Collection =
Collection(user.userId.value)
}

implicit val collectionJsonReader = new JsonReader[Collection] {
import spray.json.DefaultJsonProtocol._
Expand Down
1 change: 0 additions & 1 deletion CromIAM/src/main/scala/cromiam/auth/User.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,3 @@ import org.broadinstitute.dsde.workbench.model.WorkbenchUserId
* Wraps the concept of an authenticated workbench user including their numeric ID as well as their bearer token
*/
final case class User(userId: WorkbenchUserId, authorization: Authorization)

63 changes: 37 additions & 26 deletions CromIAM/src/main/scala/cromiam/cromwell/CromwellClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,16 @@ import scala.concurrent.{ExecutionContextExecutor, Future}
*
* FIXME: Look for ways to synch this up with the mothership
*/
class CromwellClient(scheme: String, interface: String, port: Int, log: LoggingAdapter, serviceRegistryActorRef: ActorRef)(implicit system: ActorSystem,
ece: ExecutionContextExecutor,
materializer: ActorMaterializer)
extends SprayJsonSupport with DefaultJsonProtocol with StatusCheckedSubsystem with CromIamInstrumentation{
class CromwellClient(scheme: String,
interface: String,
port: Int,
log: LoggingAdapter,
serviceRegistryActorRef: ActorRef
)(implicit system: ActorSystem, ece: ExecutionContextExecutor, materializer: ActorMaterializer)
extends SprayJsonSupport
with DefaultJsonProtocol
with StatusCheckedSubsystem
with CromIamInstrumentation {

val cromwellUrl = new URL(s"$scheme://$interface:$port")
val cromwellApiVersion = "v1"
Expand All @@ -41,35 +47,38 @@ class CromwellClient(scheme: String, interface: String, port: Int, log: LoggingA

def collectionForWorkflow(workflowId: String,
user: User,
cromIamRequest: HttpRequest): FailureResponseOrT[Collection] = {
cromIamRequest: HttpRequest
): FailureResponseOrT[Collection] = {
import CromwellClient.EnhancedWorkflowLabels

log.info("Requesting collection for " + workflowId + " for user " + user.userId + " from metadata")

// Look up in Cromwell what the collection is for this workflow. If it doesn't exist, fail the Future
val cromwellApiLabelFunc = () => cromwellApiClient.labels(WorkflowId.fromString(workflowId), headers = List(user.authorization)) flatMap {
_.caasCollection match {
case Some(c) => FailureResponseOrT.pure[IO, HttpResponse](c)
case None =>
val exception = new IllegalArgumentException(s"Workflow $workflowId has no associated collection")
val failure = IO.raiseError[Collection](exception)
FailureResponseOrT.right[HttpResponse](failure)
val cromwellApiLabelFunc = () =>
cromwellApiClient.labels(WorkflowId.fromString(workflowId), headers = List(user.authorization)) flatMap {
_.caasCollection match {
case Some(c) => FailureResponseOrT.pure[IO, HttpResponse](c)
case None =>
val exception = new IllegalArgumentException(s"Workflow $workflowId has no associated collection")
val failure = IO.raiseError[Collection](exception)
FailureResponseOrT.right[HttpResponse](failure)
}
}
}

instrumentRequest(cromwellApiLabelFunc, cromIamRequest, wfCollectionPrefix)
}

def forwardToCromwell(httpRequest: HttpRequest): FailureResponseOrT[HttpResponse] = {
val future = {
// See CromwellClient's companion object for info on these header modifications
val headers = httpRequest.headers.filterNot(header => header.name == TimeoutAccessHeader || header.name == HostHeader)
val headers =
httpRequest.headers.filterNot(header => header.name == TimeoutAccessHeader || header.name == HostHeader)
val cromwellRequest = httpRequest
.copy(uri = httpRequest.uri.withAuthority(interface, port).withScheme(scheme))
.withHeaders(headers)
Http().singleRequest(cromwellRequest)
} recoverWith {
case e => Future.failed(CromwellConnectionFailure(e))
} recoverWith { case e =>
Future.failed(CromwellConnectionFailure(e))
}
future.asFailureResponseOrT
}
Expand All @@ -86,7 +95,7 @@ class CromwellClient(scheme: String, interface: String, port: Int, log: LoggingA
use the current workflow id.
This is all called from inside the context of a Future, so exceptions will be properly caught.
*/
*/
metadata.value.parseJson.asJsObject.fields.get("rootWorkflowId").map(_.convertTo[String]).getOrElse(workflowId)
}

Expand All @@ -96,11 +105,13 @@ class CromwellClient(scheme: String, interface: String, port: Int, log: LoggingA
Grab the metadata from Cromwell filtered down to the rootWorkflowId. Then transform the response to get just the
root workflow ID itself
*/
val cromwellApiMetadataFunc = () => cromwellApiClient.metadata(
WorkflowId.fromString(workflowId),
args = Option(Map("includeKey" -> List("rootWorkflowId"))),
headers = List(user.authorization)).map(metadataToRootWorkflowId
)
val cromwellApiMetadataFunc = () =>
cromwellApiClient
.metadata(WorkflowId.fromString(workflowId),
args = Option(Map("includeKey" -> List("rootWorkflowId"))),
headers = List(user.authorization)
)
.map(metadataToRootWorkflowId)

instrumentRequest(cromwellApiMetadataFunc, cromIamRequest, rootWfIdPrefix)
}
Expand All @@ -120,14 +131,14 @@ object CromwellClient {
// See: https://broadworkbench.atlassian.net/browse/DDO-2190
val HostHeader = "Host"

final case class CromwellConnectionFailure(f: Throwable) extends Exception(s"Unable to connect to Cromwell (${f.getMessage})", f)
final case class CromwellConnectionFailure(f: Throwable)
extends Exception(s"Unable to connect to Cromwell (${f.getMessage})", f)

implicit class EnhancedWorkflowLabels(val wl: WorkflowLabels) extends AnyVal {

import Collection.{CollectionLabelName, collectionJsonReader}
import Collection.{collectionJsonReader, CollectionLabelName}

def caasCollection: Option[Collection] = {
def caasCollection: Option[Collection] =
wl.labels.fields.get(CollectionLabelName).map(_.convertTo[Collection])
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ trait CromIamInstrumentation extends CromwellInstrumentation {
val rootWfIdPrefix = NonEmptyList.one("root-workflow-id")
val wfCollectionPrefix = NonEmptyList.one("workflow-collection")


def convertRequestToPath(httpRequest: HttpRequest): NonEmptyList[String] = NonEmptyList.of(
// Returns the path of the URI only, without query parameters (e.g: api/engine/workflows/metadata)
httpRequest.uri.path.toString().stripPrefix("/")
httpRequest.uri.path
.toString()
.stripPrefix("/")
// Replace UUIDs with [id] to keep paths same regardless of the workflow
.replaceAll(CromIamInstrumentation.UUIDRegex, "[id]"),
// Name of the method (e.g: GET)
Expand All @@ -43,15 +44,19 @@ trait CromIamInstrumentation extends CromwellInstrumentation {
def makePathFromRequestAndResponse(httpRequest: HttpRequest, httpResponse: HttpResponse): InstrumentationPath =
convertRequestToPath(httpRequest).concatNel(NonEmptyList.of(httpResponse.status.intValue.toString))

def sendTimingApi(statsDPath: InstrumentationPath, timing: FiniteDuration, prefixToStatsd: NonEmptyList[String]): Unit = {
def sendTimingApi(statsDPath: InstrumentationPath,
timing: FiniteDuration,
prefixToStatsd: NonEmptyList[String]
): Unit =
sendTiming(prefixToStatsd.concatNel(statsDPath), timing, CromIamPrefix)
}

def instrumentationPrefixForSam(methodPrefix: NonEmptyList[String]): NonEmptyList[String] = samPrefix.concatNel(methodPrefix)
def instrumentationPrefixForSam(methodPrefix: NonEmptyList[String]): NonEmptyList[String] =
samPrefix.concatNel(methodPrefix)

def instrumentRequest[A](func: () => FailureResponseOrT[A],
httpRequest: HttpRequest,
prefix: NonEmptyList[String]): FailureResponseOrT[A] = {
prefix: NonEmptyList[String]
): FailureResponseOrT[A] = {
def now(): Deadline = Deadline.now

val startTimestamp = now()
Expand Down
Loading

0 comments on commit d8145e9

Please sign in to comment.