diff --git a/Makefile.am b/Makefile.am index f9e0b2d..2b9fd9c 100644 --- a/Makefile.am +++ b/Makefile.am @@ -119,7 +119,8 @@ TESTS = \ tests/parameters.scm \ tests/preemption.scm \ tests/speedup.scm \ - tests/timer-wheel.scm + tests/timer-wheel.scm \ + tests/cancel-timer.scm # The following tests require SOCK_NONBLOCK and SOCK_CLOEXEC. For now we just # run them on a platform that supports epoll (probably Linux). diff --git a/fibers/operations.scm b/fibers/operations.scm index b5e385f..6253d60 100644 --- a/fibers/operations.scm +++ b/fibers/operations.scm @@ -60,21 +60,37 @@ choice-operation perform-operation - make-base-operation)) + make-base-operation + make-base-operation/internal)) ;; Three possible values: W (waiting), C (claimed), or S (synched). ;; The meanings are as in the Parallel CML paper. (define-inlinable (make-op-state) (make-atomic-box 'W)) (define-record-type - (make-base-operation wrap-fn try-fn block-fn) + (%make-base-operation wrap-fn try-fn block-fn cancel-fn) base-op? ;; ((arg ...) -> (result ...)) | #f (wrap-fn base-op-wrap-fn) ;; () -> (thunk | #f) (try-fn base-op-try-fn) ;; (op-state sched resume-k) -> () - (block-fn base-op-block-fn)) + (block-fn base-op-block-fn) + ;; (sched) -> () + (cancel-fn base-op-cancel-fn)) ;for internal use so far + +(define* (make-base-operation/internal wrap-fn try-fn block-fn + #:optional (cancel-fn (const #f))) + "This internal-use-only variant of @code{make-base-operation} has an extra +@var{cancel-fn} argument: a procedure to cancel this operation when, as part +of a \"choice\" operation, it has not been chosen. + +This variant is kept internal while the interface and its consequences are +being discussed. Do NOT use it in external code." + (%make-base-operation wrap-fn try-fn block-fn cancel-fn)) + +(define (make-base-operation wrap-fn try-fn block-fn) + (%make-base-operation wrap-fn try-fn block-fn (const #f))) (define-record-type (make-choice-operation base-ops) @@ -121,6 +137,18 @@ succeeds, will succeed with one and only one of the sub-operations ((base-op) base-op) (base-ops (make-choice-operation (list->vector base-ops))))) +(define (cancel-other-operations op index) + "Assuming @var{op} is a choice operation, cancel every operation but the +one at @var{index}." + (match op + (($ base-ops) + (let loop ((i 0)) + (when (< i (vector-length base-ops)) + (unless (= i index) + (match (vector-ref base-ops i) + (($ wrap-fn try-fn block-fn cancel-fn) + (cancel-fn (current-scheduler)))))))))) + (define (perform-operation op) "Perform the operation @var{op} and return the resulting values. If the operation cannot complete directly, block until it can complete." @@ -141,7 +169,10 @@ the operation cannot complete directly, block until it can complete." (when (< i (vector-length base-ops)) (match (vector-ref base-ops i) (($ wrap-fn try-fn block-fn) - (block-fn flag sched (wrap-resume resume wrap-fn)))) + (let ((resume (lambda (thunk) + (cancel-other-operations op i) + (resume thunk)))) + (block-fn flag sched (wrap-resume resume wrap-fn))))) (lp (1+ i)))))))) (define (suspend) diff --git a/fibers/scheduler.scm b/fibers/scheduler.scm index 8cb63cd..fd490db 100644 --- a/fibers/scheduler.scm +++ b/fibers/scheduler.scm @@ -35,6 +35,7 @@ (scheduler-kernel-thread/public . scheduler-kernel-thread) scheduler-remote-peers scheduler-work-pending? + scheduler-timers choose-parallel-scheduler run-scheduler destroy-scheduler diff --git a/fibers/timer-wheel.scm b/fibers/timer-wheel.scm index 5ca37ea..de98918 100644 --- a/fibers/timer-wheel.scm +++ b/fibers/timer-wheel.scm @@ -31,6 +31,7 @@ #:use-module (ice-9 format) #:export (make-timer-wheel timer-wheel-add! + timer-wheel-remove! timer-wheel-next-entry-time timer-wheel-next-tick-start timer-wheel-next-tick-end @@ -141,6 +142,18 @@ (else (timer-wheel-add! (or outer (add-outer-wheel! wheel)) t obj))))))) +(define (timer-wheel-remove! wheel entry) + "Remove @var{entry}, a timer entry as returned by @code{timer-wheel-add!}, +from @var{wheel}." + (match entry + (($ prev next) + (when prev + (set-timer-entry-next! prev next) + (set-timer-entry-prev! entry #f)) + (when next + (set-timer-entry-prev! next prev) + (set-timer-entry-next! entry #f))))) + (define (timer-wheel-next-entry-time wheel) (define (slot-min-time head) (let lp ((entry (timer-entry-next head)) (min #f)) diff --git a/fibers/timers.scm b/fibers/timers.scm index 051919e..14133a1 100644 --- a/fibers/timers.scm +++ b/fibers/timers.scm @@ -1,6 +1,6 @@ ;; Fibers: cooperative, event-driven user-space threads. -;;;; Copyright (C) 2016 Free Software Foundation, Inc. +;;;; Copyright (C) 2016, 2024 Free Software Foundation, Inc. ;;;; ;;;; This library is free software; you can redistribute it and/or ;;;; modify it under the terms of the GNU Lesser General Public @@ -19,6 +19,7 @@ (define-module (fibers timers) #:use-module (fibers scheduler) #:use-module (fibers operations) + #:autoload (fibers timer-wheel) (timer-wheel-remove!) #:use-module (ice-9 atomic) #:use-module (ice-9 match) #:use-module (ice-9 threads) @@ -45,23 +46,37 @@ "Make an operation that will succeed when the current time is greater than or equal to @var{expiry}, expressed in internal time units. The operation will succeed with no values." - (make-base-operation #f - (lambda () - (and (< expiry (get-internal-real-time)) - values)) - (lambda (flag sched resume) - (define (timer) - (match (atomic-box-compare-and-swap! flag 'W 'S) - ('W (resume values)) - ('C (timer)) - ('S #f))) - (if sched - (schedule-task-at-time sched expiry timer) - (schedule-task - (timer-sched) - (lambda () - (perform-operation (timer-operation expiry)) - (timer))))))) + (define wheel-entry + ;; If true, this is the currently active timer entry for this operation. + #f) + + (make-base-operation/internal + #f ;wrap + (lambda () ;try + (and (< expiry (get-internal-real-time)) + values)) + (lambda (flag sched resume) ;block + (define (timer) + (match (atomic-box-compare-and-swap! flag 'W 'S) + ('W (resume values)) + ('C (timer)) + ('S #f))) + (if sched + (set! wheel-entry + (schedule-task-at-time sched expiry timer)) + (schedule-task + (timer-sched) + (lambda () + (perform-operation (timer-operation expiry)) + (timer))))) + (lambda (sched) ;cancel + ;; This operation is being canceled. + (when (and sched wheel-entry) + ;; Remove WHEEL-ENTRY from the timer wheel right away to avoid + ;; accumulating entries in the wheel. See + ;; . + (timer-wheel-remove! (scheduler-timers sched) wheel-entry) + (set! wheel-entry #f))))) (define (sleep-operation seconds) "Make an operation that will succeed with no values when diff --git a/tests/cancel-timer.scm b/tests/cancel-timer.scm new file mode 100644 index 0000000..c0a4e12 --- /dev/null +++ b/tests/cancel-timer.scm @@ -0,0 +1,68 @@ +;; Fibers: cooperative, event-driven user-space threads. + +;;;; Copyright (C) 2024 Ludovic Courtès +;;;; +;;;; This library is free software; you can redistribute it and/or +;;;; modify it under the terms of the GNU Lesser General Public +;;;; License as published by the Free Software Foundation; either +;;;; version 3 of the License, or (at your option) any later version. +;;;; +;;;; This library is distributed in the hope that it will be useful, +;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;;; Lesser General Public License for more details. +;;;; +;;;; You should have received a copy of the GNU Lesser General Public License +;;;; along with this program. If not, see . +;;;; + +(define-module (tests cancel-timer) + #:use-module (fibers) + #:use-module (fibers channels) + #:use-module (fibers operations) + #:use-module (fibers timers) + #:use-module (ice-9 format)) + +(define (heap-size) + (assoc-ref (gc-stats) 'heap-size)) + +(define iterations 200000) + +;;; Check the heap growth caused by repeated choice operations where one of +;;; the base operations is a timer that always "loses" the choice. +;;; +;;; This situation used to cause timer continuations to accumulate, thereby +;;; leading to unbounded heap growth. The cancel function of +;;; 'timer-operation' fixes that by immediately canceling timers that lost in +;;; a choice operation. See . + +(run-fibers + (lambda () + (define channel + (make-channel)) + + (spawn-fiber + (lambda () + (let loop ((i 0)) + (when (< i iterations) + (put-message channel 'hello) + (loop (+ i 1)))))) + + (let ((initial-heap-size (heap-size))) + (let loop ((i 0)) + (when (< i iterations) + (perform-operation + (choice-operation (sleep-operation 500) + (get-operation channel))) + (loop (+ 1 i)))) + + (let ((final-heap-size (heap-size)) + (MiB (lambda (size) + (/ size (expt 2 20.))))) + (if (<= final-heap-size (* 2 initial-heap-size)) + (format #t "final heap size: ~,2f MiB; initial heap size: ~,2f MiB~%" + (MiB final-heap-size) (MiB initial-heap-size)) + (begin + (format #t "heap grew too much: ~,2f MiB vs. ~,2f MiB~%" + (MiB final-heap-size) (MiB initial-heap-size)) + (primitive-exit 1)))))))