Skip to content

Commit

Permalink
Fix File staging race condition + beta AWS parallel downloader
Browse files Browse the repository at this point in the history
This commit resolves a race condition in the FilePorter class which
takes care of stading files from a remote file system to the workflow
work directory.

Along with this implements an experimental multi-part parallel downloader.
To enable this feature use the env variable:

  export NXF_S3_DOWNLOAD_PARALLEL=true
  • Loading branch information
pditommaso committed Dec 20, 2021
1 parent 3a7bf8c commit b7c262d
Show file tree
Hide file tree
Showing 34 changed files with 2,196 additions and 38 deletions.
1 change: 1 addition & 0 deletions modules/nextflow/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ dependencies {
api ('org.jsoup:jsoup:1.11.2')
api 'jline:jline:2.9'
api 'org.pf4j:pf4j:3.4.1'
api 'dev.failsafe:failsafe:3.1.0'

testImplementation 'org.subethamail:subethasmtp:3.1.7'

Expand Down
36 changes: 31 additions & 5 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ import nextflow.trace.TraceObserverFactory
import nextflow.trace.TraceRecord
import nextflow.trace.WorkflowStatsObserver
import nextflow.util.Barrier
import nextflow.util.BlockingThreadExecutorFactory
import nextflow.util.ConfigHelper
import nextflow.util.Duration
import nextflow.util.HistoryFile
import nextflow.util.NameGenerator
import nextflow.util.ThreadPoolBuilder
import nextflow.util.VersionNumber
import sun.misc.Signal
import sun.misc.SignalHandler
Expand Down Expand Up @@ -1311,15 +1311,41 @@ class Session implements ISession {
@Memoized // <-- this guarantees that the same executor is used across different publish dir in the same session
@CompileStatic
synchronized ExecutorService getFileTransferThreadPool() {
final factory = new BlockingThreadExecutorFactory()
final DEFAULT_MIN_THREAD = Math.min(Runtime.runtime.availableProcessors(), 4)
final DEFAULT_MAX_THREAD = DEFAULT_MIN_THREAD
final DEFAULT_QUEUE = 10_000
final DEFAULT_KEEP_ALIVE = Duration.of('60sec')
final DEFAULT_MAX_AWAIT = Duration.of('12 hour')

def minThreads = config.navigate("threadPool.FileTransfer.minThreads", DEFAULT_MIN_THREAD) as Integer
def maxThreads = config.navigate("threadPool.FileTransfer.maxThreads", DEFAULT_MAX_THREAD) as Integer
def maxQueueSize = config.navigate("threadPool.FileTransfer.maxQueueSize", DEFAULT_QUEUE) as Integer
def keepAlive = config.navigate("threadPool.FileTransfer.keepAlive", DEFAULT_KEEP_ALIVE) as Duration
def maxAwait = config.navigate("threadPool.FileTransfer.maxAwait", DEFAULT_MAX_AWAIT) as Duration
def allowThreadTimeout = config.navigate("threadPool.FileTransfer.allowThreadTimeout", false) as Boolean

if( minThreads>maxThreads ) {
log.debug("FileTransfer minThreads ($minThreads) cannot be greater than maxThreads ($maxThreads) - Setting minThreads to $maxThreads")
minThreads = maxThreads
}

final pool = new ThreadPoolBuilder()
.withName('FileTransfer')
.withMaxThreads( Runtime.runtime.availableProcessors()*3 )
final pool = factory.create()
.withMinSize(minThreads)
.withMaxSize(maxThreads)
.withQueueSize(maxQueueSize)
.withKeepAliveTime(keepAlive)
.withAllowCoreThreadTimeout(allowThreadTimeout)
.build()

this.onShutdown {
final max = factory.maxAwait.millis
final max = maxAwait.millis
final t0 = System.currentTimeMillis()
// start shutdown process
if( aborted ) {
pool.shutdownNow()
return
}
pool.shutdown()
// wait for ongoing file transfer to complete
int count=0
Expand Down
26 changes: 14 additions & 12 deletions modules/nextflow/src/main/groovy/nextflow/file/FilePorter.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger

import groovy.transform.CompileStatic
import groovy.transform.PackageScope
Expand Down Expand Up @@ -94,24 +95,20 @@ class FilePorter {
FileTransfer transfer = stagingTransfers.get(source)
if( transfer == null ) {
transfer = createFileTransfer(source, stageDir)
transfer.refCount = 1
transfer.result = submitForExecution(transfer)
stagingTransfers.put(source, transfer)
}
else
transfer.refCount ++
// increment the ref count
transfer.refCount.incrementAndGet()

return transfer
}
}

protected void decOrRemove(FileTransfer action) {
synchronized (stagingTransfers) {
final key = action.source
assert stagingTransfers.containsKey(key)
if( --action.refCount == 0 ) {
stagingTransfers.remove(key)
}
//assert stagingTransfers.containsKey(key)
if( action.refCount.decrementAndGet() == 0 ) {
stagingTransfers.remove(action.source)
}
}

Expand Down Expand Up @@ -231,15 +228,18 @@ class FilePorter {
*/
final int maxRetries

volatile int refCount
final AtomicInteger refCount
volatile Future result
private String message
private int debugDelay

FileTransfer(Path foreignPath, Path stagePath, int maxRetries=0) {
this.source = foreignPath
this.target = stagePath
this.maxRetries = maxRetries
this.message = "Staging foreign file: ${source.toUriString()}"
this.refCount = new AtomicInteger(0)
this.debugDelay = System.getProperty('filePorter.debugDelay') as Integer ?: 0
}

@Override
Expand All @@ -261,7 +261,7 @@ class FilePorter {
return stageForeignFile0(filePath, stagePath)
}
catch( IOException e ) {
if( count++ < maxRetries && !(e instanceof NoSuchFileException )) {
if( count++ < maxRetries && e !instanceof NoSuchFileException && e !instanceof InterruptedIOException && !Thread.currentThread().isInterrupted() ) {
def message = "Unable to stage foreign file: ${filePath.toUriString()} (try ${count}) -- Cause: $e.message"
log.isDebugEnabled() ? log.warn(message, e) : log.warn(message)

Expand Down Expand Up @@ -289,6 +289,8 @@ class FilePorter {
return target
}
log.debug "Copying foreign file ${source.toUriString()} to work dir: ${target.toUriString()}"
if( debugDelay )
sleep ( new Random().nextInt(debugDelay) )
return FileHelper.copyPath(source, target)
}

Expand All @@ -309,7 +311,7 @@ class FilePorter {
finally {
// if the files sizes don't match, delete it
if( !same ) {
log.debug "Invalid cahed stage path - deleting: $target"
log.debug "Invalid cached stage path - deleting: $target"
safeDelete(target)
}
return same
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1698,7 +1698,7 @@ class TaskProcessor {
if( obj instanceof Path )
return obj

if( obj == null )
if( !obj == null )
throw new ProcessUnrecoverableException("Path value cannot be null")

if( !(obj instanceof CharSequence) )
Expand All @@ -1711,8 +1711,10 @@ class TaskProcessor {
return FileHelper.asPath(str)
if( FileHelper.getUrlProtocol(str) )
return FileHelper.asPath(str)

throw new ProcessUnrecoverableException("Not a valid path value: $str")
if( !str )
throw new ProcessUnrecoverableException("Path value cannot be empty")

throw new ProcessUnrecoverableException("Not a valid path value: '$str'")
}

protected List<FileHolder> normalizeInputToFiles( Object obj, int count, boolean coerceToPath, FilePorter.Batch batch ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import nextflow.Session
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@Slf4j
@Deprecated
@CompileStatic
class BlockingThreadExecutorFactory {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright 2020-2021, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.util


import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.RejectedExecutionHandler
import java.util.concurrent.ThreadFactory
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
/**
* Builder class to create instance of {@link ThreadPoolExecutor}
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@Slf4j
@CompileStatic
class ThreadPoolBuilder {

static AtomicInteger poolCount = new AtomicInteger()

private String name

private int minSize

private int maxSize

private BlockingQueue<Runnable> workQueue

private int queueSize = -1

private Long keepAliveTime

private RejectedExecutionHandler rejectionPolicy

private ThreadFactory threadFactory

private boolean allowCoreThreadTimeout

String getName() { name }

int getMinSize() { minSize }

int getMaxSize() { maxSize }

int getQueueSize() { queueSize }

BlockingQueue<Runnable> getWorkQueue() { workQueue }

Long getKeepAliveTime() { keepAliveTime }

RejectedExecutionHandler getRejectionPolicy() { rejectionPolicy }

ThreadFactory getThreadFactory() { threadFactory }

boolean getAllowCoreThreadTimeout() { allowCoreThreadTimeout }

ThreadPoolBuilder withName(String name) {
if( name ) {
this.name = name
this.threadFactory = new CustomThreadFactory(name)
}
return this
}

ThreadPoolBuilder withThreadFactory(ThreadFactory threadFactory) {
assert !name || !threadFactory, "Property 'threadFactory' or 'name' was already set"
this.threadFactory = threadFactory
return this
}

ThreadPoolBuilder withRejectionPolicy(RejectedExecutionHandler rejectionPolicy) {
this.rejectionPolicy = rejectionPolicy
return this
}

ThreadPoolBuilder withMinSize(int min) {
this.minSize = min
return this
}

ThreadPoolBuilder withMaxSize(int max) {
this.maxSize = max
return this
}

ThreadPoolBuilder withQueueSize(int size) {
this.queueSize = size
this.workQueue = new LinkedBlockingQueue<Runnable>(size)
return this
}

ThreadPoolBuilder withQueue(BlockingQueue<Runnable> workQueue) {
this.workQueue = workQueue
return this
}

ThreadPoolBuilder withKeepAliveTime( long millis ) {
keepAliveTime = millis
return this
}

ThreadPoolBuilder withKeepAliveTime(Duration duration ) {
keepAliveTime = duration.toMillis()
return this
}

ThreadPoolBuilder withAllowCoreThreadTimeout(boolean flag) {
this.allowCoreThreadTimeout = flag
return this
}

ThreadPoolExecutor build() {
assert minSize <= maxSize

if( !name )
name = "nf-thread-pool-${poolCount.getAndIncrement()}"

if(keepAliveTime==null)
keepAliveTime = 60_000
if( workQueue==null )
workQueue = new LinkedBlockingQueue<>()
if( rejectionPolicy==null )
rejectionPolicy = new ThreadPoolExecutor.CallerRunsPolicy()
if( threadFactory==null )
threadFactory = new CustomThreadFactory(name)

log.debug "Creating thread pool '$name' minSize=$minSize; maxSize=$maxSize; workQueue=${workQueue.getClass().getSimpleName()}[${queueSize}]; allowCoreThreadTimeout=$allowCoreThreadTimeout"

final result = new ThreadPoolExecutor(
minSize,
maxSize,
keepAliveTime, TimeUnit.MILLISECONDS,
workQueue,
threadFactory,
rejectionPolicy)

result.allowCoreThreadTimeOut(allowCoreThreadTimeout)

return result
}


static ThreadPoolExecutor io(String name=null) {
io(10, 100, 10_000, name)
}


static ThreadPoolExecutor io(int min, int max, int queue, String name=null) {
new ThreadPoolBuilder()
.withMinSize(min)
.withMaxSize(max)
.withQueueSize(queue)
.withName(name)
.build()
}

}
Loading

0 comments on commit b7c262d

Please sign in to comment.