Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support cancellation of timer operations #110

Merged
merged 3 commits into from
Nov 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ TESTS = \
tests/parameters.scm \
tests/preemption.scm \
tests/speedup.scm \
tests/timer-wheel.scm
tests/timer-wheel.scm \
tests/cancel-timer.scm

# The following tests require SOCK_NONBLOCK and SOCK_CLOEXEC. For now we just
# run them on a platform that supports epoll (probably Linux).
Expand Down
39 changes: 35 additions & 4 deletions fibers/operations.scm
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,37 @@
choice-operation
perform-operation

make-base-operation))
make-base-operation
make-base-operation/internal))

;; Three possible values: W (waiting), C (claimed), or S (synched).
;; The meanings are as in the Parallel CML paper.
(define-inlinable (make-op-state) (make-atomic-box 'W))

(define-record-type <base-op>
(make-base-operation wrap-fn try-fn block-fn)
(%make-base-operation wrap-fn try-fn block-fn cancel-fn)
base-op?
;; ((arg ...) -> (result ...)) | #f
(wrap-fn base-op-wrap-fn)
;; () -> (thunk | #f)
(try-fn base-op-try-fn)
;; (op-state sched resume-k) -> ()
(block-fn base-op-block-fn))
(block-fn base-op-block-fn)
;; (sched) -> ()
(cancel-fn base-op-cancel-fn)) ;for internal use so far

(define* (make-base-operation/internal wrap-fn try-fn block-fn
#:optional (cancel-fn (const #f)))
"This internal-use-only variant of @code{make-base-operation} has an extra
@var{cancel-fn} argument: a procedure to cancel this operation when, as part
of a \"choice\" operation, it has not been chosen.

This variant is kept internal while the interface and its consequences are
being discussed. Do NOT use it in external code."
(%make-base-operation wrap-fn try-fn block-fn cancel-fn))

(define (make-base-operation wrap-fn try-fn block-fn)
(%make-base-operation wrap-fn try-fn block-fn (const #f)))

(define-record-type <choice-op>
(make-choice-operation base-ops)
Expand Down Expand Up @@ -121,6 +137,18 @@ succeeds, will succeed with one and only one of the sub-operations
((base-op) base-op)
(base-ops (make-choice-operation (list->vector base-ops)))))

(define (cancel-other-operations op index)
"Assuming @var{op} is a choice operation, cancel every operation but the
one at @var{index}."
(match op
(($ <choice-op> base-ops)
(let loop ((i 0))
(when (< i (vector-length base-ops))
(unless (= i index)
(match (vector-ref base-ops i)
(($ <base-op> wrap-fn try-fn block-fn cancel-fn)
(cancel-fn (current-scheduler))))))))))

(define (perform-operation op)
"Perform the operation @var{op} and return the resulting values. If
the operation cannot complete directly, block until it can complete."
Expand All @@ -141,7 +169,10 @@ the operation cannot complete directly, block until it can complete."
(when (< i (vector-length base-ops))
(match (vector-ref base-ops i)
(($ <base-op> wrap-fn try-fn block-fn)
(block-fn flag sched (wrap-resume resume wrap-fn))))
(let ((resume (lambda (thunk)
(cancel-other-operations op i)
(resume thunk))))
(block-fn flag sched (wrap-resume resume wrap-fn)))))
(lp (1+ i))))))))

(define (suspend)
Expand Down
1 change: 1 addition & 0 deletions fibers/scheduler.scm
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
(scheduler-kernel-thread/public . scheduler-kernel-thread)
scheduler-remote-peers
scheduler-work-pending?
scheduler-timers
choose-parallel-scheduler
run-scheduler
destroy-scheduler
Expand Down
13 changes: 13 additions & 0 deletions fibers/timer-wheel.scm
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#:use-module (ice-9 format)
#:export (make-timer-wheel
timer-wheel-add!
timer-wheel-remove!
timer-wheel-next-entry-time
timer-wheel-next-tick-start
timer-wheel-next-tick-end
Expand Down Expand Up @@ -141,6 +142,18 @@
(else
(timer-wheel-add! (or outer (add-outer-wheel! wheel)) t obj)))))))

(define (timer-wheel-remove! wheel entry)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The argument wheel is unused

"Remove @var{entry}, a timer entry as returned by @code{timer-wheel-add!},
from @var{wheel}."
(match entry
(($ <timer-entry> prev next)
(when prev
(set-timer-entry-next! prev next)
(set-timer-entry-prev! entry #f))
(when next
(set-timer-entry-prev! next prev)
(set-timer-entry-next! entry #f)))))

(define (timer-wheel-next-entry-time wheel)
(define (slot-min-time head)
(let lp ((entry (timer-entry-next head)) (min #f))
Expand Down
51 changes: 33 additions & 18 deletions fibers/timers.scm
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
;; Fibers: cooperative, event-driven user-space threads.

;;;; Copyright (C) 2016 Free Software Foundation, Inc.
;;;; Copyright (C) 2016, 2024 Free Software Foundation, Inc.
;;;;
;;;; This library is free software; you can redistribute it and/or
;;;; modify it under the terms of the GNU Lesser General Public
Expand All @@ -19,6 +19,7 @@
(define-module (fibers timers)
#:use-module (fibers scheduler)
#:use-module (fibers operations)
#:autoload (fibers timer-wheel) (timer-wheel-remove!)
#:use-module (ice-9 atomic)
#:use-module (ice-9 match)
#:use-module (ice-9 threads)
Expand All @@ -45,23 +46,37 @@
"Make an operation that will succeed when the current time is
greater than or equal to @var{expiry}, expressed in internal time
units. The operation will succeed with no values."
(make-base-operation #f
(lambda ()
(and (< expiry (get-internal-real-time))
values))
(lambda (flag sched resume)
(define (timer)
(match (atomic-box-compare-and-swap! flag 'W 'S)
('W (resume values))
('C (timer))
('S #f)))
(if sched
(schedule-task-at-time sched expiry timer)
(schedule-task
(timer-sched)
(lambda ()
(perform-operation (timer-operation expiry))
(timer)))))))
(define wheel-entry
;; If true, this is the currently active timer entry for this operation.
#f)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a problem. Previously, operation objects are reusable, and nothing suggested you need to re-create the operation object (convenient for loops!). Now they aren't, and worse it's undocumented.

If state is needed, it needs to be moved into arguments of one of the 'lambdas' below (is API change, but you could define separate 'make-base-operation' and 'make-base-operation/stateful'). Another option is to rename 'flag' to state', make it a pair of (atomic box with flag . wheel-entry) and allow overriding the default construction of the state ((fibers operations) initialises to an atomic box, but it doesn't actually use its contents -- doing something with it is left entirely to the individual implementations).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that passing state around would be more elegant. In practice though, the current approach is okay IMO because the variable is closed over by the closures of the operation.

In this case, I fixed the problem you mentioned (being able to reuse a timer operation after it's been "canceled") simply by resetting the timer-wheel variable upon cancellation.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice though, the current approach is okay IMO because the variable is closed over by the closures of the operation.

AFAICT, there is still a problem if a single timer operation value is used concurrently from multiple fibers. (Having guard-operation would make this pattern work.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT, there is still a problem if a single timer operation value is used concurrently from multiple fibers. (Having guard-operation would make this pattern work.)

Apologies @LiberalArtist, I missed this comment of yours.

Would using an atomic box (as I suggested in the first message) solve the problem?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it might be possible to adjust the timer wheel implementation to use atomic boxes everywhere + compare and swap, but that seems inefficient, and because two pointers need to be replaced, difficult to do correctly and verify for correctness. It seems simpler & less error-prone to me to simply add a state argument.

In case of 'replace #f' by 'atomic box containing #f', no. We need to assign things to the right thread (in particular, the right scheduler, because of work stealing(?)) and atomic boxes don't do such things, they impose ordering constraints.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Note: IIUC, if/when guard is created, an explicit 'state' argument could be eliminated, although a 'state' operation-construction API could still be provided for convenience.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, re atomicity, I was thinking about the (set! timer-wheel ...) bit, but you're saying the timer wheel implementation itself should be made atomic?

I must say I'm unclear on that, though my understanding is that there's one timer wheel per scheduler and one scheduler per thread, no?

@wingo?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not saying that it should be made atomic. The state version is also an option.

though my understanding is that there's one timer wheel per scheduler and one scheduler per thread, no?

And that's why, if you don't go for the state version, it needs to be atomic -- you are saving a timer wheel entry of the current scheduler in the closure, and due to work stealing the fiber might migrate to another scheduler (IIUC), so the cancellation can be run from another scheduler, and fiddling with another threads data structures leads to trouble unless designed for that.

(make-base-operation/internal
#f ;wrap
(lambda () ;try
(and (< expiry (get-internal-real-time))
values))
(lambda (flag sched resume) ;block
(define (timer)
(match (atomic-box-compare-and-swap! flag 'W 'S)
('W (resume values))
('C (timer))
('S #f)))
(if sched
(set! wheel-entry
(schedule-task-at-time sched expiry timer))
(schedule-task
(timer-sched)
(lambda ()
(perform-operation (timer-operation expiry))
(timer)))))
(lambda (sched) ;cancel
;; This operation is being canceled.
(when (and sched wheel-entry)
;; Remove WHEEL-ENTRY from the timer wheel right away to avoid
;; accumulating entries in the wheel. See
;; <https://github.com/wingo/fibers/issues/109>.
(timer-wheel-remove! (scheduler-timers sched) wheel-entry)
(set! wheel-entry #f)))))

(define (sleep-operation seconds)
"Make an operation that will succeed with no values when
Expand Down
68 changes: 68 additions & 0 deletions tests/cancel-timer.scm
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
;; Fibers: cooperative, event-driven user-space threads.

;;;; Copyright (C) 2024 Ludovic Courtès <ludo@gnu.org>
;;;;
;;;; This library is free software; you can redistribute it and/or
;;;; modify it under the terms of the GNU Lesser General Public
;;;; License as published by the Free Software Foundation; either
;;;; version 3 of the License, or (at your option) any later version.
;;;;
;;;; This library is distributed in the hope that it will be useful,
;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;;; Lesser General Public License for more details.
;;;;
;;;; You should have received a copy of the GNU Lesser General Public License
;;;; along with this program. If not, see <http://www.gnu.org/licenses/>.
;;;;

(define-module (tests cancel-timer)
#:use-module (fibers)
#:use-module (fibers channels)
#:use-module (fibers operations)
#:use-module (fibers timers)
#:use-module (ice-9 format))

(define (heap-size)
(assoc-ref (gc-stats) 'heap-size))

(define iterations 200000)

;;; Check the heap growth caused by repeated choice operations where one of
;;; the base operations is a timer that always "loses" the choice.
;;;
;;; This situation used to cause timer continuations to accumulate, thereby
;;; leading to unbounded heap growth. The cancel function of
;;; 'timer-operation' fixes that by immediately canceling timers that lost in
;;; a choice operation. See <https://github.com/wingo/fibers/issues/109>.

(run-fibers
(lambda ()
(define channel
(make-channel))

(spawn-fiber
(lambda ()
(let loop ((i 0))
(when (< i iterations)
(put-message channel 'hello)
(loop (+ i 1))))))

(let ((initial-heap-size (heap-size)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're checking heap sizes, best do a gc beforehand, otherwise some heavy activity could lead to false negatives (as in 'no bug detected even though it exists').

Would it be possible (and sufficiently informative & meaningful) to instead check the length of the timer wheel? Seems less finicky to me (e.g. what if in the future tests are run in parallel in a single process).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if something finicky like heap sizes is avoided, I imagine the number of iterations could be reduced a lot (good for test performance).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that when #109 is present, the heap would grow way beyond the 2x limit that's tested here; it would not go unnoticed. (Maybe we could make the test faster but it was already reasonably fast in my experience.)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How fast is 'reasonably fast'?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that when #109 is present, the heap would grow way beyond the 2x limit that's tested here; it would not go unnoticed. (Maybe we could make the test faster but it was already reasonably fast in my experience.)

That's going to lead to false positives (*) in case of concurrent tests, or a hypothetical future 'Guile OS' where all Guile is run in a single process (with appropriate isolation, but also with a shared heap and GC).

(*) where positive = "there is a bug"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't answered yet

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed this comment as well. Again, I really don't think there's going to be false positives here; please judge for yourself by commenting out the "cancel" function of timers to see where it goes.

That said we can always add a comment in the test to clarify that.

Copy link
Collaborator

@emixa-d emixa-d Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refuse to [rewrite the foundations of operating systems, Guile processes or Fiber's test suite just to test this elementary logical conclusion]. (brackets added for clarity]. And why clarify things when you can just fix things? Surely a length check of the timer wheel would be straightforward, and less noisy than heap size information - it should even be feasible to check the exact length (two iterations should be sufficient, could be increased a little 'just in case').

Also, the question on speed remains unanswered.

(let loop ((i 0))
(when (< i iterations)
(perform-operation
(choice-operation (sleep-operation 500)
(get-operation channel)))
(loop (+ 1 i))))

(let ((final-heap-size (heap-size))
(MiB (lambda (size)
(/ size (expt 2 20.)))))
(if (<= final-heap-size (* 2 initial-heap-size))
(format #t "final heap size: ~,2f MiB; initial heap size: ~,2f MiB~%"
(MiB final-heap-size) (MiB initial-heap-size))
(begin
(format #t "heap grew too much: ~,2f MiB vs. ~,2f MiB~%"
(MiB final-heap-size) (MiB initial-heap-size))
(primitive-exit 1)))))))
Loading