Skip to content

Commit

Permalink
Process timers before and after select. Fixes flaky #7758 test.
Browse files Browse the repository at this point in the history
  • Loading branch information
dom96 committed Aug 23, 2018
1 parent 7532b37 commit 55463a8
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 23 deletions.
43 changes: 24 additions & 19 deletions lib/pure/asyncdispatch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
include "system/inclrtl"

import os, tables, strutils, times, heapqueue, lists, options, asyncstreams
import options, math
import asyncfutures except callSoon

import nativesockets, net, deques
Expand Down Expand Up @@ -157,9 +158,6 @@ export asyncfutures, asyncstreams
## ----------------
##
## * The effect system (``raises: []``) does not work with async procedures.
## * Can't await in a ``except`` body
## * Forward declarations for async procs are broken,
## link includes workaround: https://github.com/nim-lang/Nim/issues/3182.

# TODO: Check if yielded future is nil and throw a more meaningful exception

Expand All @@ -168,31 +166,36 @@ type
timers*: HeapQueue[tuple[finishAt: float, fut: Future[void]]]
callbacks*: Deque[proc ()]

proc processTimers(p: PDispatcherBase; didSomeWork: var bool) {.inline.} =
#Process just part if timers at a step
proc processTimers(
p: PDispatcherBase, didSomeWork: var bool
): Option[int] {.inline.} =
# Pop the timers in the order in which they will expire (smaller `finishAt`).
var count = p.timers.len
let t = epochTime()
while count > 0 and t >= p.timers[0].finishAt:
p.timers.pop().fut.complete()
dec count
didSomeWork = true

# Return the number of miliseconds in which the next timer will expire.
if p.timers.len == 0: return

let milisecs = (p.timers[0].finishAt - epochTime()) * 1000
return some(ceil(milisecs).int)

proc processPendingCallbacks(p: PDispatcherBase; didSomeWork: var bool) =
while p.callbacks.len > 0:
var cb = p.callbacks.popFirst()
cb()
didSomeWork = true

proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
# If dispatcher has active timers this proc returns the timeout
# of the nearest timer. Returns `timeout` otherwise.
result = timeout
if p.timers.len > 0:
let timerTimeout = p.timers[0].finishAt
let curTime = epochTime()
if timeout == -1 or (curTime + (timeout / 1000)) > timerTimeout:
result = int((timerTimeout - curTime) * 1000)
if result < 0: result = 0
proc adjustTimeout(pollTimeout: int, nextTimer: Option[int]): int {.inline.} =
if nextTimer.isNone():
return pollTimeout

result = nextTimer.get()
if pollTimeout == -1: return
result = min(pollTimeout, result)

proc callSoon(cbproc: proc ()) {.gcsafe.}

Expand Down Expand Up @@ -299,7 +302,8 @@ when defined(windows) or defined(nimdoc):
"No handles or timers registered in dispatcher.")

result = false
let at = p.adjustedTimeout(timeout)
let nextTimer = processTimers(p, result)
let at = adjustTimeout(timeout, nextTimer)
var llTimeout =
if at == -1: winlean.INFINITE
else: at.int32
Expand Down Expand Up @@ -344,7 +348,7 @@ when defined(windows) or defined(nimdoc):
else: raiseOSError(errCode)

# Timer processing.
processTimers(p, result)
discard processTimers(p, result)
# Callback queue processing
processPendingCallbacks(p, result)

Expand Down Expand Up @@ -1231,7 +1235,8 @@ else:

result = false
var keys: array[64, ReadyKey]
var count = p.selector.selectInto(p.adjustedTimeout(timeout), keys)
let nextTimer = processTimers(p, result)
var count = p.selector.selectInto(adjustTimeout(timeout, nextTimer), keys)
for i in 0..<count:
var custom = false
let fd = keys[i].fd
Expand Down Expand Up @@ -1270,7 +1275,7 @@ else:
p.selector.updateHandle(SocketHandle(fd), newEvents)

# Timer processing.
processTimers(p, result)
discard processTimers(p, result)
# Callback queue processing
processPendingCallbacks(p, result)

Expand Down
10 changes: 6 additions & 4 deletions tests/async/t7758.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ discard """
import asyncdispatch

proc task() {.async.} =
await sleepAsync(1000)
await sleepAsync(40)

when isMainModule:
proc main() =
var counter = 0
var f = task()
while not f.finished:
inc(counter)
poll()
poll(10)

doAssert counter == 2
doAssert counter <= 4

for i in 0 .. 10: main()

0 comments on commit 55463a8

Please sign in to comment.