Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of Files.walk on the JVM #3383

Merged
merged 12 commits into from
Feb 16, 2024

Conversation

mpilquist
Copy link
Member

Fixes #3329.

For small walks, the overhead of the fs2/ce machinery dominates. For large walks, fs2's performance is within ~25% or so of the jvm's performance. For example, using @djspiewak's scenario with `MaxDepth = 7', I get:

fs2 took: 6600 ms
nio took: 5291 ms

Copy link
Member

@djspiewak djspiewak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming the reason the overhead begins to converge in larger traversals is because there's a single big eval.

bldr += Path.fromNioPath(path)
size += 1
if (size >= limit) {
val result = dispatcher.unsafeRunSync(channel.send(Chunk.from(bldr.result())))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really wish there were a way to suspend the visitation and continue it later. That would allow us to avoid the unsafeRunSync here and use unsafeRunAndForget instead, likely bouncing out of the interruptible once every n enqueues and passing through a Stream#append in order to preserve backpressure.

Is walkFileTree meaningfully faster than just doing the traversal by hand?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even eagerly collecting everything is only 5% faster than the channel based solution (using the 4096 limit):

def walkEager(start: Path, maxDepth: Int, followLinks: Boolean): Stream[F, Path] = {
    val doWalk = Sync[F].interruptibleMany {
      val bldr = Vector.newBuilder[Path]
      JFiles.walkFileTree(
        start.toNioPath,
        if (followLinks) Set(FileVisitOption.FOLLOW_LINKS).asJava else Set.empty.asJava,
        maxDepth,
        new SimpleFileVisitor[JPath] {
          private def enqueue(path: JPath): FileVisitResult = {
            bldr += Path.fromNioPath(path)
            FileVisitResult.CONTINUE
          }

          override def visitFile(file: JPath, attrs: JBasicFileAttributes): FileVisitResult =
            enqueue(file)

          override def visitFileFailed(file: JPath, t: IOException): FileVisitResult =
            FileVisitResult.CONTINUE

          override def preVisitDirectory(dir: JPath, attrs: JBasicFileAttributes): FileVisitResult =
            enqueue(dir)

          override def postVisitDirectory(dir: JPath, t: IOException): FileVisitResult =
            FileVisitResult.CONTINUE
        }
      )
      Chunk.from(bldr.result())
    }
    Stream.eval(doWalk).flatMap(Stream.chunk)
  }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, that's wild honestly. Have to ponder that. It's nice that we can just be lazy about our thread blocking though, since it simplifies this stuff.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@djspiewak BTW, there's a bunch of performance hackery in the JDK's file walking that's not (directly) available to us if we implement our own walk. For example, Path can cache file attributes avoiding some filesystem calls.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to close this out, I tried this prototype:

  override def walk(
      start: Path,
      maxDepth: Int,
      followLinks: Boolean,
      chunkSize: Int
  ): Stream[F, Path] =
    walkJustInTime(start, maxDepth, followLinks, chunkSize)
    // if (chunkSize == Int.MaxValue) walkEager(start, maxDepth, followLinks)
    // else walkLazy(start, maxDepth, followLinks, chunkSize)

  private def walkJustInTime(
      start: Path,
      maxDepth: Int,
      followLinks: Boolean,
      chunkSize: Int
  ): Stream[F, Path] = {

    def loop(acc: Vector[Path], toWalk: Vector[Path]): Stream[F, Path] = {
      if (toWalk.isEmpty) {
        Stream.chunk(Chunk.from(acc))
      } else {
        val path = toWalk.head

        val (toEmit, newAcc) =
          if (acc.size + 1 >= chunkSize)
            (Chunk.from(acc :+ path), Vector.empty)
          else (Chunk.empty, acc :+ path)

        val list = Sync[F].interruptibleMany {
          val npath = path.toNioPath
          if (JFiles.isDirectory(npath)) {
            val listing = JFiles.list(npath)
            try listing.iterator.asScala.map(Path.fromNioPath).toVector
            finally listing.close()
          }
          else Vector.empty
        }

        Stream.chunk(toEmit) ++ Stream.eval(list).flatMap(descendants => loop(newAcc, toWalk.drop(1) ++ descendants))
      }
    }

    loop(Vector.empty, Vector(start))
  }

Using MaxDepth = 7, I got these results:

fs2 took: 16070 ms
fs2 eager took: 13935 ms
nio took: 6356 ms

Whereas the implementation in this PR results in:

fs2 took: 8000 ms
fs2 eager took: 5975 ms
nio took: 6858 ms

Copy link
Member Author

@mpilquist mpilquist Feb 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a better prototype that does file attribute reading at the time of directory listing.

asdf  private def walkJustInTime(
      start: Path,
      maxDepth: Int,
      followLinks: Boolean,
      chunkSize: Int
  ): Stream[F, Path] = {

    def loop(acc: Vector[Path], toWalk: Vector[(Path, JBasicFileAttributes)]): Stream[F, Path] = {
      if (toWalk.isEmpty) {
        Stream.chunk(Chunk.from(acc))
      } else {
        val (path, attr) = toWalk.head

        val (toEmit, newAcc) =
          if (acc.size + 1 >= chunkSize)
            (Chunk.from(acc :+ path), Vector.empty)
          else (Chunk.empty, acc :+ path)

        if (attr.isDirectory) {
          val list = Sync[F].interruptibleMany {
            val listing = JFiles.list(path.toNioPath)
            try listing.iterator.asScala.map(p => 
              (Path.fromNioPath(p), JFiles.readAttributes(p, classOf[JBasicFileAttributes]))).toVector
            finally listing.close()
          }
          Stream.chunk(toEmit) ++ Stream.eval(list).flatMap(descendants => loop(newAcc, toWalk.drop(1) ++ descendants))
        } else Stream.chunk(toEmit) ++ loop(newAcc, toWalk.drop(1))
      }
    }

    Stream.eval(Sync[F].interruptibleMany {
      start -> JFiles.readAttributes(start.toNioPath, classOf[JBasicFileAttributes])
    }).flatMap { s => loop(Vector.empty, Vector(s)) }
  }

Performs better but still doesn't beat the walkFileTree solution:

fs2 took: 10399 ms
fs2 eager took: 8843 ms
nio took: 7202 ms

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, maybe we should switch to a version based on this:

  private def walkJustInTime(
      start: Path,
      maxDepth: Int,
      followLinks: Boolean,
      chunkSize: Int
  ): Stream[F, Path] = {
    import scala.collection.immutable.Queue

    def loop(toWalk0: Queue[(Path, JBasicFileAttributes)]): Stream[F, Path] = {
      val partialWalk = Sync[F].interruptibleMany {
        var acc = Vector.empty[Path]
        var toWalk = toWalk0

        while (acc.size < chunkSize && toWalk.nonEmpty) {
          val (path, attr) = toWalk.head
          toWalk = toWalk.drop(1)
          acc = acc :+ path
          if (attr.isDirectory) {
            val listing = JFiles.list(path.toNioPath)
            try {
              val descendants = listing.iterator.asScala.map(p => 
                (Path.fromNioPath(p), JFiles.readAttributes(p, classOf[JBasicFileAttributes]))).toVector
              toWalk = toWalk ++ descendants
            }
            finally listing.close()              
          }
            
        }

        Stream.chunk(Chunk.from(acc)) ++ (if (toWalk.isEmpty) Stream.empty else loop(toWalk))
      }
      Stream.eval(partialWalk).flatten
   }

    Stream.eval(Sync[F].interruptibleMany {
      start -> JFiles.readAttributes(start.toNioPath, classOf[JBasicFileAttributes])
    }).flatMap(s => loop(Queue(s)))
  }
fs2 took: 9312 ms
fs2 eager took: 8538 ms
nio took: 7769 ms

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So basically what we're trying to figure out is whether it's worth eating 9% overhead to avoid blocking a thread which is already getting blocked by filesystem I/O? My guess is that it's not worth it but I shall ponder a bit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed a new version:

fs2 took: 8131 ms
fs2 eager took: 5950 ms
nio took: 7346 ms

I'd like to add some tests for symbolic link following & max depth limits (we don't have any now). Then this PR should be good.

@@ -389,6 +391,54 @@ private[file] trait FilesCompanionPlatform {
.resource(Resource.fromAutoCloseable(javaCollection))
.flatMap(ds => Stream.fromBlockingIterator[F](collectionIterator(ds), pathStreamChunkSize))

override def walk(start: Path, maxDepth: Int, followLinks: Boolean): Stream[F, Path] =
Stream.resource(Dispatcher.sequential[F]).flatMap { dispatcher =>
Stream.eval(Channel.bounded[F, Chunk[Path]](10)).flatMap { channel =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw @armanbilge one thing that occurs to me is that our fancy new unsafe queue thing isn't going to help very much if someone's using Channel.

@mpilquist
Copy link
Member Author

mpilquist commented Feb 10, 2024

It appears Files.walkFileTree on Scala Native doesn't throw FileSystemLoopException. We could just skip the walkEager optimization on Scala Native for now I guess but then we're back to more platform specific traits.

Opened scala-native/scala-native#3744 for tracking upstream.

@mpilquist
Copy link
Member Author

Okay this is ready for final review. Here's how we netted out performance wise:

fs2 took: 7574 ms
fs2 eager took: 5809 ms
nio took: 6956 ms

@@ -389,6 +391,140 @@ private[file] trait FilesCompanionPlatform {
.resource(Resource.fromAutoCloseable(javaCollection))
.flatMap(ds => Stream.fromBlockingIterator[F](collectionIterator(ds), pathStreamChunkSize))

protected def walkEager(start: Path, options: WalkOptions): Stream[F, Path] = {
val doWalk = Sync[F].interruptibleMany {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it really need the Many?

var acc = Vector.empty[Path]
var toWalk = toWalk0

while (acc.size < options.chunkSize && toWalk.nonEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be worth checking Thread.interrupted()

@mpilquist mpilquist merged commit 768039d into typelevel:main Feb 16, 2024
15 checks passed
@mpilquist mpilquist deleted the topic/walk-performance branch February 16, 2024 15:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Bad performance of fs2.io.file.Files[IO].walk()
2 participants