Skip to content

Commit

Permalink
Add rate limiter to wave requests (#5608)
Browse files Browse the repository at this point in the history

Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Co-authored-by: Ben Sherman <bentshermann@gmail.com>
  • Loading branch information
pditommaso and bentsherman authored Jan 15, 2025
1 parent aa258b4 commit ecf6829
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 1 deletion.
14 changes: 14 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/cli/CmdInspect.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package nextflow.cli
import com.beust.jcommander.DynamicParameter
import com.beust.jcommander.Parameter
import com.beust.jcommander.Parameters
import groovy.transform.CompileDynamic
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.Session
Expand Down Expand Up @@ -91,11 +92,24 @@ class CmdInspect extends CmdBase {
}

protected void applyInspect(Session session) {
// slow down max rate when concretize is specified
if( concretize ) {
configureMaxRate(session.config)
}
// run the inspector
new ContainersInspector(concretize)
.withFormat(format)
.withIgnoreErrors(ignoreErrors)
.printContainers()
}

@CompileDynamic
protected void configureMaxRate(Map config) {
if( config.wave == null )
config.wave = new HashMap()
if( config.wave.httpClient == null )
config.wave.httpClient = new HashMap()
config.wave.httpClient.maxRate = '5/30sec'
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2013-2024, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.cli

import spock.lang.Specification

/**
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
class CmdInspectTest extends Specification {

def 'should configure max rate' () {
given:
def cmd = new CmdInspect()

when:
def cfg1 = [:]
cmd.configureMaxRate(cfg1)
then:
cfg1 == [wave:[httpClient:[maxRate:'5/30sec']]]

when:
def cfg2 = [wave:[enabled:true, httpClient: [something:true, maxRate: '1/s']]]
cmd.configureMaxRate(cfg2)
then:
cfg2 == [wave:[enabled:true, httpClient: [something:true, maxRate: '5/30sec']]]
}

}
17 changes: 17 additions & 0 deletions plugins/nf-wave/src/main/io/seqera/wave/plugin/WaveClient.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import java.util.function.Predicate

import com.google.common.cache.Cache
import com.google.common.cache.CacheBuilder
import com.google.common.util.concurrent.RateLimiter
import com.google.common.util.concurrent.UncheckedExecutionException
import com.google.gson.Gson
import com.google.gson.reflect.TypeToken
Expand Down Expand Up @@ -125,6 +126,8 @@ class WaveClient {

final private URL s5cmdConfigUrl

final private RateLimiter limiter

WaveClient(Session session) {
this.session = session
this.config = new WaveConfig(session.config.wave as Map ?: Collections.emptyMap(), SysEnv.get())
Expand All @@ -137,6 +140,7 @@ class WaveClient {
log.debug "Wave config: $config"
this.packer = new Packer().withPreserveTimestamp(config.preserveFileTimestamp())
this.waveRegistry = new URI(endpoint).getAuthority()
this.limiter = RateLimiter.create( config.httpOpts().maxRate().rate )
// create cache
this.cache = CacheBuilder<String, Handle>
.newBuilder()
Expand Down Expand Up @@ -257,7 +261,20 @@ class WaveClient {
return sendRequest(request)
}

private void checkLimiter() {
final ts = System.currentTimeMillis()
try {
limiter.acquire()
} finally {
final delta = System.currentTimeMillis()-ts
if( delta>0 )
log.debug "Request limiter blocked ${Duration.ofMillis(delta)}"
}
}


SubmitContainerTokenResponse sendRequest(SubmitContainerTokenRequest request) {
checkLimiter()
return sendRequest0(request, 1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package io.seqera.wave.plugin.config
import groovy.transform.CompileStatic
import groovy.transform.ToString
import nextflow.util.Duration
import nextflow.util.RateUnit

/**
* Model the HTTP client settings to connect the Wave service
*
Expand All @@ -31,12 +33,18 @@ class HttpOpts {

final private Duration connectTimeout

final private RateUnit maxRate

HttpOpts(Map opts) {
connectTimeout = opts.connectTimeout as Duration ?: Duration.of('30s')
maxRate = opts.maxRate as RateUnit ?: RateUnit.of('1/sec')
}

java.time.Duration connectTimeout() {
return java.time.Duration.ofMillis(connectTimeout.toMillis())
}

RateUnit maxRate() {
return maxRate
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package io.seqera.wave.plugin.config

import java.time.Duration

import nextflow.util.RateUnit
import spock.lang.Specification
/**
*
Expand All @@ -31,11 +32,13 @@ class HttpOptsTest extends Specification {
def opts = new HttpOpts([:])
then:
opts.connectTimeout() == Duration.ofSeconds(30)
opts.maxRate() == RateUnit.of('1/sec')

when:
opts = new HttpOpts([connectTimeout:'50s'])
opts = new HttpOpts([connectTimeout:'50s', maxRate: '10/s'])
then:
opts.connectTimeout() == Duration.ofSeconds(50)
opts.maxRate() == RateUnit.of('10/s')

}
}

0 comments on commit ecf6829

Please sign in to comment.