Skip to content

Commit

Permalink
Merge pull request #226 from zenhack/generic-mpsc
Browse files Browse the repository at this point in the history
Use generics for mpsc
  • Loading branch information
lthibault authored Mar 16, 2022
2 parents cd831da + 88fa49a commit fde96ea
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 40 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go: [ '1.15', '1.16', '1.17' ]
go: [ '1.18' ]
name: Go ${{ matrix.go }}
steps:
- uses: actions/checkout@v2
Expand Down
9 changes: 8 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
module capnproto.org/go/capnp/v3

go 1.16
go 1.18

require (
github.com/kylelemons/godebug v1.1.0
github.com/stretchr/testify v1.7.0
github.com/tinylib/msgp v1.1.5
)

require (
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/philhofer/fwd v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
55 changes: 26 additions & 29 deletions internal/mpsc/mpsc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,70 +9,65 @@ import (
// A multiple-producer, single-consumer queue. Create one with New(),
// and send from many gorotuines with Tx.Send(). Only one gorotuine may
// call Rx.Recv().
type Queue struct {
Tx
Rx
type Queue[T any] struct {
Tx[T]
Rx[T]
}

// The receive end of a Queue.
type Rx struct {
type Rx[T any] struct {
// The head of the list. If the list is empty, this will be
// non-nil but have a locked mu field.
head *node
head *node[T]
}

// The send/transmit end of a Queue.
type Tx struct {
type Tx[T any] struct {
// Mutex which must be held by senders. A goroutine must hold this
// lock to manipulate `tail`.
mu chanmutex.Mutex

// Pointer to the tail of the list. This will have a locked mu,
// and zero values for other fields.
tail *node
tail *node[T]
}

// Alias for interface{}, the values in the queue. TODO: once Go
// supports generics, get rid of this and make Queue generic in the
// type of the values.
type Value interface{}

// A node in the linked linst that makes up the queue internally.
type node struct {
type node[T any] struct {
// A mutex which guards the other fields in the node.
// Nodes start out with this locked, and then we unlock it
// after filling in the other fields.
mu chanmutex.Mutex

// The next node in the list, if any. Must be non-nil if
// mu is unlocked:
next *node
next *node[T]

// The value in this node:
value Value
value T
}

// Create a new node, with a locked mutex and zero values for
// the other fields.
func newNode() *node {
return &node{mu: chanmutex.NewLocked()}
func newNode[T any]() *node[T] {
return &node[T]{mu: chanmutex.NewLocked()}
}

// Create a new, initially empty Queue.
func New() *Queue {
node := newNode()
return &Queue{
Tx: Tx{
func New[T any]() *Queue[T] {
node := newNode[T]()
return &Queue[T]{
Tx: Tx[T]{
tail: node,
mu: chanmutex.NewUnlocked(),
},
Rx: Rx{head: node},
Rx: Rx[T]{head: node},
}
}

// Send a message on the queue.
func (tx *Tx) Send(v Value) {
newTail := newNode()
func (tx *Tx[T]) Send(v T) {
newTail := newNode[T]()

tx.mu.Lock()

Expand All @@ -88,29 +83,31 @@ func (tx *Tx) Send(v Value) {
// Receive a message from the queue. Blocks if the queue is empty.
// If the context ends before the receive happens, this returns
// ctx.Err().
func (rx *Rx) Recv(ctx context.Context) (Value, error) {
func (rx *Rx[T]) Recv(ctx context.Context) (T, error) {
var zero T
select {
case <-rx.head.mu:
return rx.doRecv(), nil
case <-ctx.Done():
return nil, ctx.Err()
return zero, ctx.Err()
}
}

// Try to receive a message from the queue. If successful, ok will be true.
// If the queue is empty, this will return immediately with ok = false.
func (rx *Rx) TryRecv() (v Value, ok bool) {
func (rx *Rx[T]) TryRecv() (v T, ok bool) {
var zero T
select {
case <-rx.head.mu:
return rx.doRecv(), true
default:
return nil, false
return zero, false
}
}

// Helper for shared logic between Recv and TryRecv. Must be holding
// rx.head.mu.
func (rx *Rx) doRecv() Value {
func (rx *Rx[T]) doRecv() T {
ret := rx.head.value
rx.head = rx.head.next
return ret
Expand Down
18 changes: 9 additions & 9 deletions internal/mpsc/mpsc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,28 @@ import (

func TestTryRecvEmpty(t *testing.T) {
t.Parallel()
q := New()
q := New[int]()
v, ok := q.TryRecv()
assert.False(t, ok, "TryRecv() on an empty queue succeeded; recevied: ", v)
}

func TestRecvEmpty(t *testing.T) {
t.Parallel()
q := New()
q := New[int]()

// Recv() on an empty queue should block until the context is canceled.
ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*10)

v, err := q.Recv(ctx)
assert.Equal(t, ctx.Err(), err, "Returned error is not ctx.Err()")
assert.NotNil(t, err, "Returned error is nil.")
assert.Nil(t, v, "Return value is not nil.")
assert.Equal(t, 0, v, "Return value is not the zero value.")
}

func TestSendOne(t *testing.T) {
t.Parallel()
q := New()
want := Value(1)
q := New[int]()
want := 1
q.Send(want)
got, err := q.Recv(context.Background())
assert.Nil(t, err, "Non-nil error from Recv()")
Expand All @@ -51,7 +51,7 @@ func TestSendManySequential(t *testing.T) {

ctx := context.Background()

q := New()
q := New[int]()

for _, v := range inputs {
q.Send(v)
Expand All @@ -60,7 +60,7 @@ func TestSendManySequential(t *testing.T) {
for len(outputs) != len(inputs) {
v, err := q.Recv(ctx)
assert.Nil(t, err, "Non-nil error from Recv()")
outputs = append(outputs, v.(int))
outputs = append(outputs, v)
}
assert.Equal(t, inputs, outputs, "Received sequence was different from sent.")

Expand All @@ -71,7 +71,7 @@ func TestSendManySequential(t *testing.T) {
func TestSendManyConcurrent(t *testing.T) {
t.Parallel()

q := New()
q := New[int]()

for i := 0; i < 100; i += 10 {
for j := 0; j < 10; j++ {
Expand Down Expand Up @@ -100,7 +100,7 @@ func TestSendManyConcurrent(t *testing.T) {
for i := 0; i < 100; i++ {
v, err := q.Recv(ctx)
assert.Nil(t, err, "Failed to receive from queue: ", err)
actual = append(actual, v.(int))
actual = append(actual, v)
}
// Values come out in random order, so sort them:
sort.Slice(actual, func(i, j int) bool {
Expand Down

0 comments on commit fde96ea

Please sign in to comment.