Skip to content

Commit

Permalink
WX-735 Fix incorrect and/or nondeterministic filesystem ordering (#6930)
Browse files Browse the repository at this point in the history
  • Loading branch information
aednichols authored Oct 14, 2022
1 parent b0a2541 commit 622c8e6
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package cromwell.core.path

import akka.actor.ActorSystem
import cromwell.core.WorkflowOptions
import cromwell.core.path.PathBuilderFactory.PriorityDefault

import scala.concurrent.{ExecutionContext, Future}

case object DefaultPathBuilderFactory extends PathBuilderFactory {
override def withOptions(options: WorkflowOptions)(implicit actorSystem: ActorSystem, ec: ExecutionContext) = Future.successful(DefaultPathBuilder)
val name = "local"
val tuple = name -> this

override def priority: Int = PriorityDefault
}
21 changes: 13 additions & 8 deletions core/src/main/scala/cromwell/core/path/PathBuilderFactory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,33 @@ import cromwell.core.{Dispatcher, WorkflowOptions}
import cats.syntax.traverse._
import cats.instances.list._
import cats.instances.future._
import cromwell.core.path.PathBuilderFactory.PriorityStandard

import scala.concurrent.{ExecutionContext, Future}

object PathBuilderFactory {
// Given a list of factories, instantiates the corresponding path builders
def instantiatePathBuilders(factories: List[PathBuilderFactory], workflowOptions: WorkflowOptions)(implicit as: ActorSystem): Future[List[PathBuilder]] = {
implicit val ec: ExecutionContext = as.dispatchers.lookup(Dispatcher.IoDispatcher)
// The DefaultPathBuilderFactory always needs to be last.
// The reason is path builders are tried in order, and the default one is very generous in terms of paths it "thinks" it supports
// For instance, it will return a Path for a gcs url even though it doesn't really support it
val sortedFactories = factories.sortWith({
case (_, DefaultPathBuilderFactory) => true
case (DefaultPathBuilderFactory, _) => false
case (a, b) => factories.indexOf(a) < factories.indexOf(b)
})
val sortedFactories = factories.sortBy(_.priority)
sortedFactories.traverse(_.withOptions(workflowOptions))
}

val PriorityBlob = 100 // High priority to evaluate first, because blob files may inadvertently match other filesystems
val PriorityStandard = 1000
val PriorityDefault = 10000 // "Default" is a fallback, evaluate last
}

/**
* Provide a method that can instantiate a path builder with the specified workflow options.
*/
trait PathBuilderFactory {
def withOptions(options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext): Future[PathBuilder]

/**
* Candidate filesystems are considered in a stable order, as some requests may match multiple filesystems.
* To customize this order, the priority of a filesystem may be adjusted. Lower number == higher priority.
* @return This filesystem's priority
*/
def priority: Int = PriorityStandard
}
14 changes: 9 additions & 5 deletions engine/src/main/scala/cromwell/engine/EngineFilesystems.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,23 @@ import cromwell.core.filesystem.CromwellFileSystems
import cromwell.core.path.{DefaultPathBuilderFactory, PathBuilder, PathBuilderFactory}
import net.ceedubs.ficus.Ficus._

import scala.collection.immutable.SortedMap
import scala.concurrent.Future

object EngineFilesystems {
private val config: Config = ConfigFactory.load

private val defaultFileSystemFactory: Map[String, PathBuilderFactory] =
private val defaultFileSystemFactory: SortedMap[String, PathBuilderFactory] =
Option(DefaultPathBuilderFactory.tuple)
.filter(_ => config.as[Boolean]("engine.filesystems.local.enabled"))
.toMap
.to(collection.immutable.SortedMap)

private val pathBuilderFactories: Map[String, PathBuilderFactory] = {
CromwellFileSystems.instance.factoriesFromConfig(config.as[Config]("engine"))
.unsafe("Failed to instantiate engine filesystem") ++ defaultFileSystemFactory
private val pathBuilderFactories: SortedMap[String, PathBuilderFactory] = {
// Unordered maps are a classical source of randomness injection into a system
(
CromwellFileSystems.instance.factoriesFromConfig(config.as[Config]("engine"))
.unsafe("Failed to instantiate engine filesystem") ++ defaultFileSystemFactory
).to(collection.immutable.SortedMap)
}

def configuredPathBuilderFactories: List[PathBuilderFactory] = pathBuilderFactories.values.toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import akka.actor.ActorSystem
import com.typesafe.config.Config
import cromwell.core.WorkflowOptions
import cromwell.core.path.PathBuilderFactory
import cromwell.core.path.PathBuilderFactory.PriorityBlob
import net.ceedubs.ficus.Ficus._

import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -33,4 +34,6 @@ final case class BlobPathBuilderFactory(globalConfig: Config, instanceConfig: Co
new BlobPathBuilder(container, endpoint)(fsm)
}
}

override def priority: Int = PriorityBlob
}

0 comments on commit 622c8e6

Please sign in to comment.