Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WX-735 Fix incorrect and/or nondeterministic filesystem ordering #6930

Merged
merged 9 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package cromwell.core.path

import akka.actor.ActorSystem
import cromwell.core.WorkflowOptions
import cromwell.core.path.PathBuilderFactory.PriorityDefault

import scala.concurrent.{ExecutionContext, Future}

case object DefaultPathBuilderFactory extends PathBuilderFactory {
override def withOptions(options: WorkflowOptions)(implicit actorSystem: ActorSystem, ec: ExecutionContext) = Future.successful(DefaultPathBuilder)
val name = "local"
val tuple = name -> this

override def priority: Int = PriorityDefault
}
21 changes: 13 additions & 8 deletions core/src/main/scala/cromwell/core/path/PathBuilderFactory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,33 @@ import cromwell.core.{Dispatcher, WorkflowOptions}
import cats.syntax.traverse._
import cats.instances.list._
import cats.instances.future._
import cromwell.core.path.PathBuilderFactory.PriorityStandard

import scala.concurrent.{ExecutionContext, Future}

object PathBuilderFactory {
// Given a list of factories, instantiates the corresponding path builders
def instantiatePathBuilders(factories: List[PathBuilderFactory], workflowOptions: WorkflowOptions)(implicit as: ActorSystem): Future[List[PathBuilder]] = {
implicit val ec: ExecutionContext = as.dispatchers.lookup(Dispatcher.IoDispatcher)
// The DefaultPathBuilderFactory always needs to be last.
// The reason is path builders are tried in order, and the default one is very generous in terms of paths it "thinks" it supports
// For instance, it will return a Path for a gcs url even though it doesn't really support it
val sortedFactories = factories.sortWith({
case (_, DefaultPathBuilderFactory) => true
case (DefaultPathBuilderFactory, _) => false
case (a, b) => factories.indexOf(a) < factories.indexOf(b)
})
val sortedFactories = factories.sortBy(_.priority)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really much easier to think about - Nice! Does this still guarantee that the default is last?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops sorry I missed it!

sortedFactories.traverse(_.withOptions(workflowOptions))
}

val PriorityBlob = 100 // High priority to evaluate first, because blob files may inadvertently match other filesystems
val PriorityStandard = 1000
val PriorityDefault = 10000 // "Default" is a fallback, evaluate last
}

/**
* Provide a method that can instantiate a path builder with the specified workflow options.
*/
trait PathBuilderFactory {
def withOptions(options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext): Future[PathBuilder]

/**
* Candidate filesystems are considered in a stable order, as some requests may match multiple filesystems.
* To customize this order, the priority of a filesystem may be adjusted. Lower number == higher priority.
* @return This filesystem's priority
*/
def priority: Int = PriorityStandard
}
14 changes: 9 additions & 5 deletions engine/src/main/scala/cromwell/engine/EngineFilesystems.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,23 @@ import cromwell.core.filesystem.CromwellFileSystems
import cromwell.core.path.{DefaultPathBuilderFactory, PathBuilder, PathBuilderFactory}
import net.ceedubs.ficus.Ficus._

import scala.collection.immutable.SortedMap
import scala.concurrent.Future

object EngineFilesystems {
private val config: Config = ConfigFactory.load

private val defaultFileSystemFactory: Map[String, PathBuilderFactory] =
private val defaultFileSystemFactory: SortedMap[String, PathBuilderFactory] =
Option(DefaultPathBuilderFactory.tuple)
.filter(_ => config.as[Boolean]("engine.filesystems.local.enabled"))
.toMap
.to(collection.immutable.SortedMap)

private val pathBuilderFactories: Map[String, PathBuilderFactory] = {
CromwellFileSystems.instance.factoriesFromConfig(config.as[Config]("engine"))
.unsafe("Failed to instantiate engine filesystem") ++ defaultFileSystemFactory
private val pathBuilderFactories: SortedMap[String, PathBuilderFactory] = {
// Unordered maps are a classical source of randomness injection into a system
(
CromwellFileSystems.instance.factoriesFromConfig(config.as[Config]("engine"))
.unsafe("Failed to instantiate engine filesystem") ++ defaultFileSystemFactory
).to(collection.immutable.SortedMap)
}

def configuredPathBuilderFactories: List[PathBuilderFactory] = pathBuilderFactories.values.toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import akka.actor.ActorSystem
import com.typesafe.config.Config
import cromwell.core.WorkflowOptions
import cromwell.core.path.PathBuilderFactory
import cromwell.core.path.PathBuilderFactory.PriorityBlob
import net.ceedubs.ficus.Ficus._

import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -33,4 +34,6 @@ final case class BlobPathBuilderFactory(globalConfig: Config, instanceConfig: Co
new BlobPathBuilder(container, endpoint)(fsm)
}
}

override def priority: Int = PriorityBlob
}