Skip to content

Commit

Permalink
Document and adjust alarm delivery precedence over network messages (#…
Browse files Browse the repository at this point in the history
…318)

Document the implicit assumption that the delivery of alarms to a
participant must take precedence over broadcast messages.

Adjust simulation message queue to respect the alarm ordering.

Fixes #316
  • Loading branch information
masih authored Jun 7, 2024
1 parent 2ce2956 commit 1dfe25f
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 7 deletions.
17 changes: 12 additions & 5 deletions gpbft/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ type MessageReceiver interface {
// - both ErrReceivedInternalError and a cause if there was an internal error processing the message
// This method is not safe for concurrent use.
ReceiveMessage(msg ValidatedMessage) error
// ReceiveAlarm signals the trigger of the alarm set by Clock.SetAlarm. Note that
// triggering alarms takes precedence over ReceiveMessage, i.e. when an alarm is
// triggered at the same time as an arrival of a message ReceiveAlarm must be
// called before ReceiveMessage.
//
// This method is not safe for concurrent use.
ReceiveAlarm() error
}
Expand Down Expand Up @@ -93,11 +98,13 @@ type Network interface {
type Clock interface {
// Returns the current network time.
Time() time.Time
// Sets an alarm to fire after the given timestamp.
// At most one alarm can be set at a time.
// Setting an alarm replaces any previous alarm that has not yet fired.
// The timestamp may be in the past, in which case the alarm will fire as soon as possible
// (but not synchronously).
// SetAlarm sets an alarm to fire after the given timestamp. At most one alarm
// can be set at a time. Setting an alarm replaces any previous alarm that has
// not yet fired. The timestamp may be in the past, in which case the alarm will
// fire as soon as possible (but not synchronously).
//
// Note that delivery of triggered alarms must take precedence over messages
// that may arrive at the same time.
SetAlarm(at time.Time)
}

Expand Down
19 changes: 18 additions & 1 deletion sim/message_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ type messageInFlight struct {
index int // Index in the heap used internally by the heap implementation
}

func (m *messageInFlight) isAlarm() bool {
switch payload := m.payload.(type) {
case string:
return m.dest == m.source && payload == "ALARM"
default:
return false
}
}

func newMessagePriorityQueue() *messageQueue {
var mpq messageQueue
heap.Init(&mpq)
Expand All @@ -39,11 +48,19 @@ func (pq *messageQueue) Len() int { return len(pq.mailbox) }
// to determine this, where the earlier the deliverAt the higher priority the
// message, i.e. messages are sorted in ascending order of their deliverAt.
//
// Note that alarms take precedence over messages when both have the same
// deliverAt.
//
// This function is part of heap.Interface and must not be called externally.
func (pq *messageQueue) Less(i, j int) bool {
// We want Pop to give us the earliest delivery time, so we use Less to sort by
// deliverAt in ascending order.
return pq.mailbox[i].deliverAt.Before(pq.mailbox[j].deliverAt)
switch one, other := pq.mailbox[i], pq.mailbox[j]; {
case one.deliverAt.Equal(other.deliverAt):
return one.isAlarm() && !other.isAlarm()
default:
return one.deliverAt.Before(other.deliverAt)
}
}

// Swap swaps messages in-flight at index i with the one at index j.
Expand Down
24 changes: 24 additions & 0 deletions sim/message_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,27 @@ func TestMessageQueue_UpsertFirstWhere(t *testing.T) {
})
}
}
func TestMessageQueue_AlarmTakesPrecedence(t *testing.T) {
subject := newMessagePriorityQueue()

t0 := time.Time{}.Add(time.Second)
t1 := t0.Add(12 * time.Second)
t2 := t1.Add(100 * time.Second)

want1st := &messageInFlight{deliverAt: t0}
want2nd := &messageInFlight{payload: "ALARM", deliverAt: t1}
want3rd := &messageInFlight{deliverAt: t1}
want4th := &messageInFlight{deliverAt: t2}

subject.Insert(want2nd)
subject.Insert(want4th)
subject.Insert(want1st)
subject.Insert(want3rd)

require.Equal(t, subject.Len(), 4)
require.Equal(t, want1st, subject.Remove())
require.Equal(t, want2nd, subject.Remove())
require.Equal(t, want3rd, subject.Remove())
require.Equal(t, want4th, subject.Remove())
require.Zero(t, subject.Len())
}
2 changes: 1 addition & 1 deletion sim/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (n *Network) SetAlarm(sender gpbft.ActorID, at time.Time) {
// Update any existing alarm or insert if no such alarm exists.
n.queue.UpsertFirstWhere(
func(m *messageInFlight) bool {
return m.dest == sender && m.payload == "ALARM"
return m.dest == sender && m.isAlarm()
}, &messageInFlight{
source: sender,
dest: sender,
Expand Down

0 comments on commit 1dfe25f

Please sign in to comment.