Skip to content

LispKit Thread Shared‐Queue

Matthias Zenger edited this page Nov 19, 2024 · 1 revision

Library (lispkit thread shared-queue) implements thread-safe queues for which all operations are done atomically. Such shared queues optionally have a maximum length. They start out in state "open", which means they allow elements to be added. Once a shared queue has been closed, new elements cannot be added anymore, but existing ones can still be dequeued.

With this functionality, shared queues provide a good basis for synchronizing threads. For example, you can let a consumer thread block if a shared queue is empty, or a producer thread block if the number of items in the shared queue reaches a specified limit. Thus, shared queues constitute a more conventional alternative to channels as provided by library (lispkit thread channel).

Inspired by the mtqueue abstraction of the Scheme implementation Gauche, this library not only supports enqueue! for adding items to the end of the queue and dequeue! for taking items from the front of the queue, but also queue-push! for putting items at the front of the queue. Therefore, shared queues can also be used for use cases where stack semantics (LIFO) is needed.

shared-queue-type-tag [constant]

Symbol representing the shared-queue type. The type-for procedure of library (lispkit type) returns this symbol for all shared queue objects.

(shared-queue? obj)     [procedure]

Returns #t if obj is a shared queue object; #f otherwise.

(make-shared-queue)     [procedure]
(make-shared-queue maxlen)
(make-shared-queue maxlen capacity)

Returns a new empty shared queue with maximum length maxlen and a given initial capacity. If maxlen is not provided, there is no maximum length and inserting an element never blocks.

(shared-queue-copy sq)     [procedure]

Returns a copy of shared queue sq. The copy has the same maximum length like sq and all elements currently in the queue are copied over.

(list->shared-queue xs)     [procedure]
(list->shared-queue xs maxlen)
(list->shared-queue xs maxlen capacity)

Returns a new shared queue with maximum length maxlen and a given initial capacity. The new queue contains the elements from list xs. If maxlen is not provided, there is no maximum length and inserting an element never blocks.

(shared-queue->list sq)     [procedure]

Returns the elements currently in the shared queue sq as a list. The elements remain in sq.

(shared-queue-empty? sq)     [procedure]

Returns #t if shared queue sq does not have any items queued up; returns #f otherwise.

(shared-queue-closed? sq)     [procedure]

Returns #t if shared queue sq has been closed, i.e. it does not allow to enqueue more items; returns #f otherwise.

(shared-queue-max-length sq)     [procedure]

Returns the maximum number of items the shared queue sq can hold.

(shared-queue-length sq)     [procedure]

Returns the number of items in the shared queue sq.

(shared-queue-room sq)     [procedure]

Returns the number of items the shared queue sq can accept at this moment to reach its maximum length. For example, if the shared queue has the maximum number of items already, 0 is returned.

(shared-queue-num-waiting sq)     [procedure]

Returns two values: the first value is the number of threads waiting on the shared queue sq to read at this moment; the second value is the number of threads waiting on sq to write at this moment.

(shared-queue-front sq)     [procedure]
(shared-queue-front sq default)

Returns the first item in the shared queue sq. This is the item that shared-queue-dequeue! would remove if called. shared-queue-front does not modify sq itself. If sq is empty, default is returned if it is given, otherwise an error is signaled.

(shared-queue-rear sq)     [procedure]
(shared-queue-rear sq default)

Returns the last item in the shared queue sq. shared-queue-rear does not modify sq itself. If sq is empty, default is returned if it is given, otherwise an error is signaled.

(shared-queue-enqueue! sq x ...)     [procedure]

Inserts the elements x ... at the end of the shared queue sq in the order they are given.

(define sq (make-shared-queue))
(shared-queue-front sq #f)  ⇒  #f
(shared-queue-enqueue! sq 1)
(shared-queue-front sq #f)  ⇒  1
(shared-queue-enqueue! sq 2 3 4)
(shared-queue-front sq #f)  ⇒  1
(shared-queue->list sq)  ⇒  (1 2 3 4)

(shared-queue-push! sq x ...)     [procedure]

Inserts the elements x ... at the start of the shared queue sq in the order they are given.

(define sq (make-shared-queue))
(shared-queue-push! sq 1)
(shared-queue-front sq #f)  ⇒  1
(shared-queue-push! sq 2 3 4)
(shared-queue-front sq #f)  ⇒  4
(shared-queue->list sq)  ⇒  (4 3 2 1)

(shared-queue-dequeue! sq)     [procedure]
(shared-queue-dequeue! sq default)

Returns and removes the first element of the shared queue sq. If the queue is empty, an error is signaled unless the default parameter is provided. In this case, default is returned.

(shared-queue-pop! sq)     [procedure]
(shared-queue-pop! sq default)

Returns and removes the first element of the shared queue sq. If the queue is empty, an error is signaled unless the default parameter is provided. In this case, default is returned. This procedure is functionally equivalent with shared-queue-dequeue!. It is provided to help emphasizing that the queue is used like a stack together with shared-queue-push!.

(shared-queue-dequeue-all! sq)     [procedure]
(shared-queue-dequeue-all! sq close?)

Removes all items from the shared queue sq and returns them as a list. If argument close? is provided and set to true, the shared queue sq will be closed. The whole operation is done atomically.

(shared-queue-enqueue/wait! sq x)     [procedure]
(shared-queue-enqueue/wait! sq x timeout)
(shared-queue-enqueue/wait! sq x timeout default)
(shared-queue-enqueue/wait! sq x timeout default close?)

Inserts the element x at the end of the shared queue sq. This procedure blocks if sq has reached its maximum length until x was eventually inserted successfully. The optional timeout argument specifies the maximum time to wait in seconds. If it is set to \#f, shared-queue-enqueue/wait! will wait indefinitely. In case the call is timing out, the value of default is returned (#f is the default). If the operation succeeds without timing out, #t is returned. If sq is already closed, shared-queue-enqueue/wait! raises an error without modifying sq. The whole check and insert operation is performed atomically.

If the last optional argument close? is given and true, shared-queue-enqueue/wait! closes the shared queue sq. The close operation is done atomically and it is guaranteed that x is the last item put into the queue.

(shared-queue-push/wait! sq x)     [procedure]
(shared-queue-push/wait! sq x timeout)
(shared-queue-push/wait! sq x timeout default)
(shared-queue-push/wait! sq x timeout default close?)

Inserts the element x at the start of the shared queue sq. This procedure blocks if sq has reached its maximum length until x was eventually inserted successfully. The optional timeout argument specifies the maximum time to wait in seconds. If it is set to \#f, shared-queue-push/wait! will wait indefinitely. In case the call is timing out, the value of default is returned (#f is the default). If the operation succeeds without timing out, #t is returned. If sq is already closed, shared-queue-push/wait! raises an error without modifying sq. The whole check and insert operation is performed atomically.

If the last optional argument close? is given and true, shared-queue-push/wait! closes the shared queue sq. The close operation is done atomically and it is guaranteed that x is the last item put into the queue.

(shared-queue-dequeue/wait! sq)     [procedure]
(shared-queue-dequeue/wait! sq timeout)
(shared-queue-dequeue/wait! sq timeout default)
(shared-queue-dequeue/wait! sq timeout default close?)

Returns and removes the first element of the shared queue sq independent of the queue being closed. This procedure blocks if sq is empty waiting until an item has been inserted and can successfully be dequeued. The optional timeout argument specifies the maximum time to wait in seconds. If it is set to \#f, shared-queue-dequeue/wait! will wait indefinitely. In case the call is timing out, the value of default is returned (#f is the default). If the operation succeeds without timing out, the dequeued item is returned. The whole check and dequeue operation is performed atomically.

If the last optional argument close? is given and true, shared-queue-dequeue/wait! closes the shared queue sq.

(shared-queue-pop/wait! sq)     [procedure]
(shared-queue-pop/wait! sq timeout)
(shared-queue-pop/wait! sq timeout default)
(shared-queue-pop/wait! sq timeout default close?)

This procedure is functionally equivalent with shared-queue-dequeue/wait!. It is provided to help emphasizing that the queue is used like a stack together with shared-queue-push/wait!.

(shared-queue-close! sq)     [procedure]
(shared-queue-close! sq force?)

Closes the shared queue sq. As a consequence, no new items can be inserted. It is still possible to dequeue items until the queue is empty. If argument force? is provided and set to true, then even dequeuing is not possible anymore and attempts to do that result in errors to be signaled.

Clone this wiki locally