Skip to content

Commit

Permalink
Relax the assumption of receiving own CONVERGE messages synchronously
Browse files Browse the repository at this point in the history
The gpbft implementation implicitly assumes that broadcast of `CONVERGE`
messages to self are delivered immediately. In practice this assumption
does not hold because of the complexity in deferred signing and async
message delivery.

The changes here relax this assumption by explicitly notifying the local
converge state that the self participant has begun the `CONVERGE` step,
providing self proposal and justification for the proposal. The code
then considers the given data whenever search in converge state does not
bear any results, caused by asynchronous message delivery. Further, the
code ignores the self converge value once at least one broadcast message
is received.

Additionally, the changes remove zero-latency for messages to self in
simulations to make a stronger assertion that synchronous message
delivery to self is no longer required (neither for `GMessage` nor
alarms).

Fixes #316
Reverts #318
Relates to #103 (comment)
  • Loading branch information
masih committed Jun 11, 2024
1 parent c14d8f4 commit 1d885d8
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 46 deletions.
8 changes: 1 addition & 7 deletions gpbft/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ type MessageReceiver interface {
// - ErrReceivedAfterTermination if the message is received after the instance has terminated (a programming error)
// - both ErrReceivedInternalError and a cause if there was an internal error processing the message
ReceiveMessage(msg ValidatedMessage) error
// ReceiveAlarm signals the trigger of the alarm set by Clock.SetAlarm. Note that
// triggering alarms takes precedence over ReceiveMessage, i.e. when an alarm is
// triggered at the same time as an arrival of a message ReceiveAlarm must be
// called before ReceiveMessage.
// ReceiveAlarm signals the trigger of the alarm set by Clock.SetAlarm.
ReceiveAlarm() error
}

Expand Down Expand Up @@ -89,9 +86,6 @@ type Clock interface {
// can be set at a time. Setting an alarm replaces any previous alarm that has
// not yet fired. The timestamp may be in the past, in which case the alarm will
// fire as soon as possible (but not synchronously).
//
// Note that delivery of triggered alarms must take precedence over messages
// that may arrive at the same time.
SetAlarm(at time.Time)
}

Expand Down
45 changes: 45 additions & 0 deletions gpbft/gpbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,12 @@ func (i *instance) beginConverge(justification *Justification) {
i.phase = CONVERGE_PHASE
i.phaseTimeout = i.alarmAfterSynchrony()

// Notify the round's convergeState that the self participant has begun the
// CONVERGE phase. Because, we cannot guarantee that the CONVERGE message
// broadcasts are delivered to self synchronously.
converged := i.getRound(i.round).converged
converged.NotifySelfConvergeBegun(i.proposal, justification)

i.broadcast(i.round, CONVERGE_PHASE, i.proposal, true, justification)
}

Expand Down Expand Up @@ -1215,6 +1221,10 @@ func (q *quorumState) FindStrongQuorumValue() (quorumValue ECChain, foundQuorum
//// CONVERGE phase helper /////

type convergeState struct {
// self stores the self-participant converge value in order to relax the
// assumption that broadcast CONVERGE messages to the participant itself are
// delivered synchronously.
self *ConvergeValue
// Participants from which a message has been received.
senders map[ActorID]struct{}
// Chains indexed by key.
Expand All @@ -1241,6 +1251,23 @@ func newConvergeState() *convergeState {
}
}

// NotifySelfConvergeBegun notifies the convergeState that the self participant
// has begun the CONVERGE_PHASE. This notification ensures that the convergeState
// of a round does not rely on messages broadcast by a participant destined for
// itself to be delivered synchronously. See HasSelfBegunConverge.
func (c *convergeState) NotifySelfConvergeBegun(value ECChain, justification *Justification) {
c.self = &ConvergeValue{
Chain: value,
Justification: justification,
}
}

// HasSelfBegunConverge checks whether the self participant has begun the
// CONVERGE_PHASE. See NotifySelfConvergeBegun.
func (c *convergeState) HasSelfBegunConverge() bool {
return c.self != nil
}

// Receives a new CONVERGE value from a sender.
// Ignores any subsequent value from a sender from which a value has already been received.
func (c *convergeState) Receive(sender ActorID, value ECChain, ticket Ticket, justification *Justification) error {
Expand Down Expand Up @@ -1281,16 +1308,34 @@ func (c *convergeState) FindMaxTicketProposal(table PowerTable) ConvergeValue {
}
}
}

// Check if self participant has entered CONVERGE phase.
if maxTicket == nil && c.HasSelfBegunConverge() {
// No converge message is received through broadcast but self converge message is broadcast.
// Return self converge value while waiting for broadcast to deliver it.
return *c.self

Check warning on line 1316 in gpbft/gpbft.go

View check run for this annotation

Codecov / codecov/patch

gpbft/gpbft.go#L1316

Added line #L1316 was not covered by tests
}

return maxValue
}

// Finds some proposal which matches a specific value.
func (c *convergeState) FindProposalFor(chain ECChain) (ConvergeValue, bool) {
// Attempt to find matching proposal among CONVERGE messages received via
// broadcast first.
for _, value := range c.values {
if value.Chain.Eq(chain) {
return value, true
}
}

// Check if self participant has entered the CONVERGE step, and whether the chain
// matches the self proposal. This clause covers an edge-case where self
// participant has not received broadcasts about its own CONVERGE messages yet.
if c.HasSelfBegunConverge() && c.self.Chain.Eq(chain) {
return *c.self, true
}

return ConvergeValue{}, false
}

Expand Down
6 changes: 3 additions & 3 deletions sim/latency/log_normal.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ func NewLogNormal(seed int64, mean time.Duration) *LogNormal {
// distribution. Latency from one participant to another may be asymmetric and
// once generated remains constant for the lifetime of a simulation.
//
// Note, where from and to are the same or mean configured latency is not larger
// than zero the latency sample will always be zero.
// Note, mean configured latency is not larger than zero the latency sample will
// always be zero.
func (l *LogNormal) Sample(_ time.Time, from gpbft.ActorID, to gpbft.ActorID) time.Duration {
if from == to || l.mean <= 0 {
if l.mean <= 0 {
return 0
}

Expand Down
5 changes: 0 additions & 5 deletions sim/latency/zipf.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ func NewZipf(seed int64, s, v float64, max time.Duration) (*Zipf, error) {
// Sample returns latency samples that correspond to this ZipF numerical
// distribution. The samples returned disregard time and participants, i.e. the
// distribution does not vary over time nor for specific participants.
//
// Note, here from and to are the same the latency sample will always be zero.
func (l *Zipf) Sample(_ time.Time, from gpbft.ActorID, to gpbft.ActorID) time.Duration {
if from == to {
return 0
}
return time.Duration(l.dist.Uint64())
}
7 changes: 1 addition & 6 deletions sim/message_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,7 @@ func (pq *messageQueue) Len() int { return len(pq.mailbox) }
func (pq *messageQueue) Less(i, j int) bool {
// We want Pop to give us the earliest delivery time, so we use Less to sort by
// deliverAt in ascending order.
switch one, other := pq.mailbox[i], pq.mailbox[j]; {
case one.deliverAt.Equal(other.deliverAt):
return one.isAlarm() && !other.isAlarm()
default:
return one.deliverAt.Before(other.deliverAt)
}
return pq.mailbox[i].deliverAt.Before(pq.mailbox[j].deliverAt)
}

// Swap swaps messages in-flight at index i with the one at index j.
Expand Down
24 changes: 0 additions & 24 deletions sim/message_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,27 +156,3 @@ func TestMessageQueue_UpsertFirstWhere(t *testing.T) {
})
}
}
func TestMessageQueue_AlarmTakesPrecedence(t *testing.T) {
subject := newMessagePriorityQueue()

t0 := time.Time{}.Add(time.Second)
t1 := t0.Add(12 * time.Second)
t2 := t1.Add(100 * time.Second)

want1st := &messageInFlight{deliverAt: t0}
want2nd := &messageInFlight{payload: "ALARM", deliverAt: t1}
want3rd := &messageInFlight{deliverAt: t1}
want4th := &messageInFlight{deliverAt: t2}

subject.Insert(want2nd)
subject.Insert(want4th)
subject.Insert(want1st)
subject.Insert(want3rd)

require.Equal(t, subject.Len(), 4)
require.Equal(t, want1st, subject.Remove())
require.Equal(t, want2nd, subject.Remove())
require.Equal(t, want3rd, subject.Remove())
require.Equal(t, want4th, subject.Remove())
require.Zero(t, subject.Len())
}
2 changes: 1 addition & 1 deletion test/deny_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ func TestDeny_SkipsToFuture(t *testing.T) {
require.NoError(t, err)
require.NoErrorf(t, sm.Run(instanceCount, maxRounds), "%s", sm.Describe())
chain := ecChainGenerator.GenerateECChain(instanceCount-1, gpbft.TipSet{}, math.MaxUint64)
requireConsensusAtInstance(t, sm, instanceCount-1, *chain.Head())
requireConsensusAtInstance(t, sm, instanceCount-1, chain...)
}

0 comments on commit 1d885d8

Please sign in to comment.