Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document and adjust alarm delivery precedence over network messages #318

Merged
merged 1 commit into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions gpbft/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ type MessageReceiver interface {
// - both ErrReceivedInternalError and a cause if there was an internal error processing the message
// This method is not safe for concurrent use.
ReceiveMessage(msg ValidatedMessage) error
// ReceiveAlarm signals the trigger of the alarm set by Clock.SetAlarm. Note that
// triggering alarms takes precedence over ReceiveMessage, i.e. when an alarm is
// triggered at the same time as an arrival of a message ReceiveAlarm must be
// called before ReceiveMessage.
//
// This method is not safe for concurrent use.
ReceiveAlarm() error
}
Expand Down Expand Up @@ -93,11 +98,13 @@ type Network interface {
type Clock interface {
// Returns the current network time.
Time() time.Time
// Sets an alarm to fire after the given timestamp.
// At most one alarm can be set at a time.
// Setting an alarm replaces any previous alarm that has not yet fired.
// The timestamp may be in the past, in which case the alarm will fire as soon as possible
// (but not synchronously).
// SetAlarm sets an alarm to fire after the given timestamp. At most one alarm
// can be set at a time. Setting an alarm replaces any previous alarm that has
// not yet fired. The timestamp may be in the past, in which case the alarm will
// fire as soon as possible (but not synchronously).
//
// Note that delivery of triggered alarms must take precedence over messages
// that may arrive at the same time.
SetAlarm(at time.Time)
}

Expand Down
19 changes: 18 additions & 1 deletion sim/message_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ type messageInFlight struct {
index int // Index in the heap used internally by the heap implementation
}

func (m *messageInFlight) isAlarm() bool {
switch payload := m.payload.(type) {
case string:
return m.dest == m.source && payload == "ALARM"
default:
return false
}
}

func newMessagePriorityQueue() *messageQueue {
var mpq messageQueue
heap.Init(&mpq)
Expand All @@ -39,11 +48,19 @@ func (pq *messageQueue) Len() int { return len(pq.mailbox) }
// to determine this, where the earlier the deliverAt the higher priority the
// message, i.e. messages are sorted in ascending order of their deliverAt.
//
// Note that alarms take precedence over messages when both have the same
// deliverAt.
//
// This function is part of heap.Interface and must not be called externally.
func (pq *messageQueue) Less(i, j int) bool {
// We want Pop to give us the earliest delivery time, so we use Less to sort by
// deliverAt in ascending order.
return pq.mailbox[i].deliverAt.Before(pq.mailbox[j].deliverAt)
switch one, other := pq.mailbox[i], pq.mailbox[j]; {
case one.deliverAt.Equal(other.deliverAt):
return one.isAlarm() && !other.isAlarm()
default:
return one.deliverAt.Before(other.deliverAt)
}
}

// Swap swaps messages in-flight at index i with the one at index j.
Expand Down
24 changes: 24 additions & 0 deletions sim/message_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,27 @@ func TestMessageQueue_UpsertFirstWhere(t *testing.T) {
})
}
}
func TestMessageQueue_AlarmTakesPrecedence(t *testing.T) {
subject := newMessagePriorityQueue()

t0 := time.Time{}.Add(time.Second)
t1 := t0.Add(12 * time.Second)
t2 := t1.Add(100 * time.Second)

want1st := &messageInFlight{deliverAt: t0}
want2nd := &messageInFlight{payload: "ALARM", deliverAt: t1}
want3rd := &messageInFlight{deliverAt: t1}
want4th := &messageInFlight{deliverAt: t2}

subject.Insert(want2nd)
subject.Insert(want4th)
subject.Insert(want1st)
subject.Insert(want3rd)

require.Equal(t, subject.Len(), 4)
require.Equal(t, want1st, subject.Remove())
require.Equal(t, want2nd, subject.Remove())
require.Equal(t, want3rd, subject.Remove())
require.Equal(t, want4th, subject.Remove())
require.Zero(t, subject.Len())
}
2 changes: 1 addition & 1 deletion sim/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (n *Network) SetAlarm(sender gpbft.ActorID, at time.Time) {
// Update any existing alarm or insert if no such alarm exists.
n.queue.UpsertFirstWhere(
func(m *messageInFlight) bool {
return m.dest == sender && m.payload == "ALARM"
return m.dest == sender && m.isAlarm()
}, &messageInFlight{
source: sender,
dest: sender,
Expand Down
Loading