diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 7cad1ef9..8a7373f4 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -11,7 +11,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go: [ '1.15', '1.16', '1.17' ] + go: [ '1.18' ] name: Go ${{ matrix.go }} steps: - uses: actions/checkout@v2 diff --git a/go.mod b/go.mod index 1e9aaf47..a7091e04 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,16 @@ module capnproto.org/go/capnp/v3 -go 1.16 +go 1.18 require ( github.com/kylelemons/godebug v1.1.0 github.com/stretchr/testify v1.7.0 github.com/tinylib/msgp v1.1.5 ) + +require ( + github.com/davecgh/go-spew v1.1.0 // indirect + github.com/philhofer/fwd v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect +) diff --git a/internal/mpsc/mpsc.go b/internal/mpsc/mpsc.go index bb895ad0..5ec19efe 100644 --- a/internal/mpsc/mpsc.go +++ b/internal/mpsc/mpsc.go @@ -9,36 +9,31 @@ import ( // A multiple-producer, single-consumer queue. Create one with New(), // and send from many gorotuines with Tx.Send(). Only one gorotuine may // call Rx.Recv(). -type Queue struct { - Tx - Rx +type Queue[T any] struct { + Tx[T] + Rx[T] } // The receive end of a Queue. -type Rx struct { +type Rx[T any] struct { // The head of the list. If the list is empty, this will be // non-nil but have a locked mu field. - head *node + head *node[T] } // The send/transmit end of a Queue. -type Tx struct { +type Tx[T any] struct { // Mutex which must be held by senders. A goroutine must hold this // lock to manipulate `tail`. mu chanmutex.Mutex // Pointer to the tail of the list. This will have a locked mu, // and zero values for other fields. - tail *node + tail *node[T] } -// Alias for interface{}, the values in the queue. TODO: once Go -// supports generics, get rid of this and make Queue generic in the -// type of the values. -type Value interface{} - // A node in the linked linst that makes up the queue internally. -type node struct { +type node[T any] struct { // A mutex which guards the other fields in the node. // Nodes start out with this locked, and then we unlock it // after filling in the other fields. @@ -46,33 +41,33 @@ type node struct { // The next node in the list, if any. Must be non-nil if // mu is unlocked: - next *node + next *node[T] // The value in this node: - value Value + value T } // Create a new node, with a locked mutex and zero values for // the other fields. -func newNode() *node { - return &node{mu: chanmutex.NewLocked()} +func newNode[T any]() *node[T] { + return &node[T]{mu: chanmutex.NewLocked()} } // Create a new, initially empty Queue. -func New() *Queue { - node := newNode() - return &Queue{ - Tx: Tx{ +func New[T any]() *Queue[T] { + node := newNode[T]() + return &Queue[T]{ + Tx: Tx[T]{ tail: node, mu: chanmutex.NewUnlocked(), }, - Rx: Rx{head: node}, + Rx: Rx[T]{head: node}, } } // Send a message on the queue. -func (tx *Tx) Send(v Value) { - newTail := newNode() +func (tx *Tx[T]) Send(v T) { + newTail := newNode[T]() tx.mu.Lock() @@ -88,29 +83,31 @@ func (tx *Tx) Send(v Value) { // Receive a message from the queue. Blocks if the queue is empty. // If the context ends before the receive happens, this returns // ctx.Err(). -func (rx *Rx) Recv(ctx context.Context) (Value, error) { +func (rx *Rx[T]) Recv(ctx context.Context) (T, error) { + var zero T select { case <-rx.head.mu: return rx.doRecv(), nil case <-ctx.Done(): - return nil, ctx.Err() + return zero, ctx.Err() } } // Try to receive a message from the queue. If successful, ok will be true. // If the queue is empty, this will return immediately with ok = false. -func (rx *Rx) TryRecv() (v Value, ok bool) { +func (rx *Rx[T]) TryRecv() (v T, ok bool) { + var zero T select { case <-rx.head.mu: return rx.doRecv(), true default: - return nil, false + return zero, false } } // Helper for shared logic between Recv and TryRecv. Must be holding // rx.head.mu. -func (rx *Rx) doRecv() Value { +func (rx *Rx[T]) doRecv() T { ret := rx.head.value rx.head = rx.head.next return ret diff --git a/internal/mpsc/mpsc_test.go b/internal/mpsc/mpsc_test.go index e8a021c9..87a31ceb 100644 --- a/internal/mpsc/mpsc_test.go +++ b/internal/mpsc/mpsc_test.go @@ -12,14 +12,14 @@ import ( func TestTryRecvEmpty(t *testing.T) { t.Parallel() - q := New() + q := New[int]() v, ok := q.TryRecv() assert.False(t, ok, "TryRecv() on an empty queue succeeded; recevied: ", v) } func TestRecvEmpty(t *testing.T) { t.Parallel() - q := New() + q := New[int]() // Recv() on an empty queue should block until the context is canceled. ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -27,13 +27,13 @@ func TestRecvEmpty(t *testing.T) { v, err := q.Recv(ctx) assert.Equal(t, ctx.Err(), err, "Returned error is not ctx.Err()") assert.NotNil(t, err, "Returned error is nil.") - assert.Nil(t, v, "Return value is not nil.") + assert.Equal(t, 0, v, "Return value is not the zero value.") } func TestSendOne(t *testing.T) { t.Parallel() - q := New() - want := Value(1) + q := New[int]() + want := 1 q.Send(want) got, err := q.Recv(context.Background()) assert.Nil(t, err, "Non-nil error from Recv()") @@ -51,7 +51,7 @@ func TestSendManySequential(t *testing.T) { ctx := context.Background() - q := New() + q := New[int]() for _, v := range inputs { q.Send(v) @@ -60,7 +60,7 @@ func TestSendManySequential(t *testing.T) { for len(outputs) != len(inputs) { v, err := q.Recv(ctx) assert.Nil(t, err, "Non-nil error from Recv()") - outputs = append(outputs, v.(int)) + outputs = append(outputs, v) } assert.Equal(t, inputs, outputs, "Received sequence was different from sent.") @@ -71,7 +71,7 @@ func TestSendManySequential(t *testing.T) { func TestSendManyConcurrent(t *testing.T) { t.Parallel() - q := New() + q := New[int]() for i := 0; i < 100; i += 10 { for j := 0; j < 10; j++ { @@ -100,7 +100,7 @@ func TestSendManyConcurrent(t *testing.T) { for i := 0; i < 100; i++ { v, err := q.Recv(ctx) assert.Nil(t, err, "Failed to receive from queue: ", err) - actual = append(actual, v.(int)) + actual = append(actual, v) } // Values come out in random order, so sort them: sort.Slice(actual, func(i, j int) bool {