Skip to content

Commit

Permalink
quic: gate and queue synchronization primitives
Browse files Browse the repository at this point in the history
Add a form of monitor (in the sense of the synchronization primitive)
for controlling access to queues and streams.

We call this a "gate". A gate acts as a mutex and condition variable
with one bit of state. A gate may be locked and unlocked. Lock
operations may optionally block on the gate condition being set.
Unlock operations always record the new value of the condition.

Gates play nicely with contexts.

Unlike traditional condition variables, gates do not suffer from
spurious wakeups: A goroutine waiting for a gate condition is not
woken before the condition is set.

Gates are inspired by the queue design from Bryan Mills's talk,
Rethinking Classical Concurrency Patterns.

Add a queue implemented with a gate.

For golang/go#58547

Change-Id: Ibec6d1f29a2c03a7184fca7392ed5639f96b6485
Reviewed-on: https://go-review.googlesource.com/c/net/+/513955
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Damien Neil <dneil@google.com>
  • Loading branch information
neild committed Jul 31, 2023
1 parent bd8ac9e commit 63fe334
Show file tree
Hide file tree
Showing 4 changed files with 370 additions and 0 deletions.
104 changes: 104 additions & 0 deletions internal/quic/gate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

//go:build go1.21

package quic

import "context"

// An gate is a monitor (mutex + condition variable) with one bit of state.
//
// The condition may be either set or unset.
// Lock operations may be unconditional, or wait for the condition to be set.
// Unlock operations record the new state of the condition.
type gate struct {
// When unlocked, exactly one of set or unset contains a value.
// When locked, neither chan contains a value.
set chan struct{}
unset chan struct{}
}

func newGate() gate {
g := gate{
set: make(chan struct{}, 1),
unset: make(chan struct{}, 1),
}
g.unset <- struct{}{}
return g
}

// lock acquires the gate unconditionally.
// It reports whether the condition is set.
func (g *gate) lock() (set bool) {
select {
case <-g.set:
return true
case <-g.unset:
return false
}
}

// waitAndLock waits until the condition is set before acquiring the gate.
func (g *gate) waitAndLock() {
<-g.set
}

// waitAndLockContext waits until the condition is set before acquiring the gate.
// If the context expires, waitAndLockContext returns an error and does not acquire the gate.
func (g *gate) waitAndLockContext(ctx context.Context) error {
select {
case <-g.set:
return nil
default:
}
select {
case <-g.set:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

// waitWithLock releases an acquired gate until the condition is set.
// The caller must have previously acquired the gate.
// Upon return from waitWithLock, the gate will still be held.
// If waitWithLock returns nil, the condition is set.
func (g *gate) waitWithLock(ctx context.Context) error {
g.unlock(false)
err := g.waitAndLockContext(ctx)
if err != nil {
if g.lock() {
// The condition was set in between the context expiring
// and us reacquiring the gate.
err = nil
}
}
return err
}

// lockIfSet acquires the gate if and only if the condition is set.
func (g *gate) lockIfSet() (acquired bool) {
select {
case <-g.set:
return true
default:
return false
}
}

// unlock sets the condition and releases the gate.
func (g *gate) unlock(set bool) {
if set {
g.set <- struct{}{}
} else {
g.unset <- struct{}{}
}
}

// unlock sets the condition to the result of f and releases the gate.
// Useful in defers.
func (g *gate) unlockFunc(f func() bool) {
g.unlock(f())
}
142 changes: 142 additions & 0 deletions internal/quic/gate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

//go:build go1.21

package quic

import (
"context"
"testing"
"time"
)

func TestGateLockAndUnlock(t *testing.T) {
g := newGate()
if set := g.lock(); set {
t.Errorf("g.lock() of never-locked gate: true, want false")
}
unlockedc := make(chan struct{})
donec := make(chan struct{})
go func() {
defer close(donec)
set := g.lock()
select {
case <-unlockedc:
default:
t.Errorf("g.lock() succeeded while gate was held")
}
if !set {
t.Errorf("g.lock() of set gate: false, want true")
}
g.unlock(false)
}()
time.Sleep(1 * time.Millisecond)
close(unlockedc)
g.unlock(true)
<-donec
if set := g.lock(); set {
t.Errorf("g.lock() of unset gate: true, want false")
}
}

func TestGateWaitAndLock(t *testing.T) {
g := newGate()
set := false
go func() {
for i := 0; i < 3; i++ {
g.lock()
g.unlock(false)
time.Sleep(1 * time.Millisecond)
}
g.lock()
set = true
g.unlock(true)
}()
g.waitAndLock()
if !set {
t.Errorf("g.waitAndLock() returned before gate was set")
}
}

func TestGateWaitAndLockContext(t *testing.T) {
g := newGate()
// waitAndLockContext is canceled
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(1 * time.Millisecond)
cancel()
}()
if err := g.waitAndLockContext(ctx); err != context.Canceled {
t.Errorf("g.waitAndLockContext() = %v, want context.Canceled", err)
}
// waitAndLockContext succeeds
set := false
go func() {
time.Sleep(1 * time.Millisecond)
g.lock()
set = true
g.unlock(true)
}()
if err := g.waitAndLockContext(context.Background()); err != nil {
t.Errorf("g.waitAndLockContext() = %v, want nil", err)
}
if !set {
t.Errorf("g.waitAndLockContext() returned before gate was set")
}
g.unlock(true)
// waitAndLockContext succeeds when the gate is set and the context is canceled
if err := g.waitAndLockContext(ctx); err != nil {
t.Errorf("g.waitAndLockContext() = %v, want nil", err)
}
}

func TestGateWaitWithLock(t *testing.T) {
g := newGate()
// waitWithLock is canceled
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(1 * time.Millisecond)
cancel()
}()
g.lock()
if err := g.waitWithLock(ctx); err != context.Canceled {
t.Errorf("g.waitWithLock() = %v, want context.Canceled", err)
}
// waitWithLock succeeds
set := false
go func() {
g.lock()
set = true
g.unlock(true)
}()
time.Sleep(1 * time.Millisecond)
if err := g.waitWithLock(context.Background()); err != nil {
t.Errorf("g.waitWithLock() = %v, want nil", err)
}
if !set {
t.Errorf("g.waitWithLock() returned before gate was set")
}
}

func TestGateLockIfSet(t *testing.T) {
g := newGate()
if locked := g.lockIfSet(); locked {
t.Errorf("g.lockIfSet() of unset gate = %v, want false", locked)
}
g.lock()
g.unlock(true)
if locked := g.lockIfSet(); !locked {
t.Errorf("g.lockIfSet() of set gate = %v, want true", locked)
}
}

func TestGateUnlockFunc(t *testing.T) {
g := newGate()
go func() {
g.lock()
defer g.unlockFunc(func() bool { return true })
}()
g.waitAndLock()
}
65 changes: 65 additions & 0 deletions internal/quic/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

//go:build go1.21

package quic

import "context"

// A queue is an unbounded queue of some item (new connections and streams).
type queue[T any] struct {
// The gate condition is set if the queue is non-empty or closed.
gate gate
err error
q []T
}

func newQueue[T any]() queue[T] {
return queue[T]{gate: newGate()}
}

// close closes the queue, causing pending and future pop operations
// to return immediately with err.
func (q *queue[T]) close(err error) {
q.gate.lock()
defer q.unlock()
if q.err == nil {
q.err = err
}
}

// put appends an item to the queue.
// It returns true if the item was added, false if the queue is closed.
func (q *queue[T]) put(v T) bool {
q.gate.lock()
defer q.unlock()
if q.err != nil {
return false
}
q.q = append(q.q, v)
return true
}

// get removes the first item from the queue, blocking until ctx is done, an item is available,
// or the queue is closed.
func (q *queue[T]) get(ctx context.Context) (T, error) {
var zero T
if err := q.gate.waitAndLockContext(ctx); err != nil {
return zero, err
}
defer q.unlock()
if q.err != nil {
return zero, q.err
}
v := q.q[0]
copy(q.q[:], q.q[1:])
q.q[len(q.q)-1] = zero
q.q = q.q[:len(q.q)-1]
return v, nil
}

func (q *queue[T]) unlock() {
q.gate.unlock(q.err != nil || len(q.q) > 0)
}
59 changes: 59 additions & 0 deletions internal/quic/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

//go:build go1.21

package quic

import (
"context"
"io"
"testing"
"time"
)

func TestQueue(t *testing.T) {
nonblocking, cancel := context.WithCancel(context.Background())
cancel()

q := newQueue[int]()
if got, err := q.get(nonblocking); err != context.Canceled {
t.Fatalf("q.get() = %v, %v, want nil, contex.Canceled", got, err)
}

if !q.put(1) {
t.Fatalf("q.put(1) = false, want true")
}
if !q.put(2) {
t.Fatalf("q.put(2) = false, want true")
}
if got, err := q.get(nonblocking); got != 1 || err != nil {
t.Fatalf("q.get() = %v, %v, want 1, nil", got, err)
}
if got, err := q.get(nonblocking); got != 2 || err != nil {
t.Fatalf("q.get() = %v, %v, want 2, nil", got, err)
}
if got, err := q.get(nonblocking); err != context.Canceled {
t.Fatalf("q.get() = %v, %v, want nil, contex.Canceled", got, err)
}

go func() {
time.Sleep(1 * time.Millisecond)
q.put(3)
}()
if got, err := q.get(context.Background()); got != 3 || err != nil {
t.Fatalf("q.get() = %v, %v, want 3, nil", got, err)
}

if !q.put(4) {
t.Fatalf("q.put(2) = false, want true")
}
q.close(io.EOF)
if got, err := q.get(context.Background()); got != 0 || err != io.EOF {
t.Fatalf("q.get() = %v, %v, want 0, io.EOF", got, err)
}
if q.put(5) {
t.Fatalf("q.put(5) = true, want false")
}
}

0 comments on commit 63fe334

Please sign in to comment.