Skip to content

Commit

Permalink
sync: fix initial snap
Browse files Browse the repository at this point in the history
  • Loading branch information
pulsejet committed Feb 2, 2025
1 parent 577a6c2 commit 8bfd0ad
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 24 deletions.
12 changes: 7 additions & 5 deletions std/examples/svs-alo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,8 @@ func main() {
fmt.Fprintln(os.Stderr, "*** Press Ctrl+C to exit.")
fmt.Fprintln(os.Stderr)

// Publish initial message
msgCount++
_, err = svsalo.Publish(enc.Wire{[]byte("Joined the chatroom")})
// Publish an initial empty message to announce our presence
_, err = svsalo.Publish(enc.Wire{})
if err != nil {
log.Error(nil, "Unable to publish message", "err", err)
}
Expand All @@ -191,10 +190,13 @@ func main() {

// Trim newline character
line = line[:len(line)-1]
msgCount++
msgSize += len(line)
if len(line) == 0 {
continue
}

// Publish chat message
msgCount++
msgSize += len(line)
_, err = svsalo.Publish(enc.Wire{line})
if err != nil {
log.Error(nil, "Unable to publish message", "err", err)
Expand Down
4 changes: 2 additions & 2 deletions std/sync/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ type Snapshot interface {
// Snapshot returns the Snapshot trait.
Snapshot() Snapshot

// setName sets the name of the node and group.
setNames(node enc.Name, group enc.Name)
// initialize the strategy, and set basic parameters.
initialize(node enc.Name, group enc.Name)

// setCallback sets the callback for fetched snapshot.
//
Expand Down
23 changes: 13 additions & 10 deletions std/sync/snapshot_node_latest.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,21 @@ func (s *SnapshotNodeLatest) Snapshot() Snapshot {
return s
}

func (s *SnapshotNodeLatest) setNames(node enc.Name, group enc.Name) {
func (s *SnapshotNodeLatest) initialize(node enc.Name, group enc.Name) {
if s.Client == nil || s.SnapMe == nil || s.Threshold == 0 {
panic("SnapshotNodeLatest: not initialized")
}

s.nodePrefix = node
s.groupPrefix = group
}

func (s *SnapshotNodeLatest) setCallback(callback snapshotCallbackWrap) {
if s.Client == nil || s.SnapMe == nil || s.Threshold == 0 {
panic("SnapshotNodeLatest: not initialized")
}

s.callback = callback
}

// check determines if a snapshot should be taken or fetched.
func (s *SnapshotNodeLatest) check(args snapshotOnUpdateArgs) {
// TODO: fetch snapshot if rebooted

// We only care about the latest boot.
// For all other states, make sure the fetch is skipped.
entries := args.state[args.hash]
Expand All @@ -74,12 +72,17 @@ func (s *SnapshotNodeLatest) check(args snapshotOnUpdateArgs) {

if args.node.Equal(s.nodePrefix) {
// This is me - check if I should snapshot
if lastV.Latest-s.prevSeq >= s.Threshold {
// 1. I have reached the threshold
// 2. I have not taken any snapshot yet
if lastV.Latest-s.prevSeq >= s.Threshold || (s.prevSeq == 0 && lastV.Latest > 0) {
s.snap(last.Boot, lastV.Latest)
}
} else {
// This is not me - check if I should block
if lastV.SnapBlock == 0 && lastV.Latest-lastV.Pending >= s.Threshold*2 {
// This is not me - check if I should fetch
// 1. Pending gap is more than 2*threshold
// 2. I have not fetched anything yet
// And, I'm not already blocked by a fetch
if lastV.SnapBlock == 0 && (lastV.Latest-lastV.Pending >= s.Threshold*2 || lastV.Pending == 0) {
entries[i].Value.SnapBlock = 1 // released by fetch callback
s.fetch(args.node, entries[i].Boot)
}
Expand Down
2 changes: 1 addition & 1 deletion std/sync/snapshot_null.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func (s *SnapshotNull) Snapshot() Snapshot {
return s
}

func (s *SnapshotNull) setNames(enc.Name, enc.Name) {
func (s *SnapshotNull) initialize(enc.Name, enc.Name) {
}

func (s *SnapshotNull) setCallback(snapshotCallbackWrap) {
Expand Down
2 changes: 1 addition & 1 deletion std/sync/svs_alo.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func NewSvsALO(opts SvsAloOpts) *SvsALO {
if s.opts.Snapshot == nil {
s.opts.Snapshot = &SnapshotNull{}
} else {
s.opts.Snapshot.setNames(s.opts.Name, s.opts.Svs.GroupPrefix)
s.opts.Snapshot.initialize(s.opts.Name, s.opts.Svs.GroupPrefix)
s.opts.Snapshot.setCallback(s.snapshotCallback)
}

Expand Down
10 changes: 5 additions & 5 deletions std/sync/svs_alo_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,6 @@ func (s *SvsALO) produceObject(content enc.Wire) (enc.Name, error) {
return nil, err
}

// Update the state vector
if got := s.svs.IncrSeqNo(node); got != seq {
panic("[BUG] sequence number mismatch - who changed it?")
}

// We don't get notified of changes to our own state.
// So we need to update the state vector ourselves.
hash := node.String()
Expand All @@ -84,6 +79,11 @@ func (s *SvsALO) produceObject(content enc.Wire) (enc.Name, error) {
// Inform the snapshot strategy
s.opts.Snapshot.check(snapshotOnUpdateArgs{s.state, node, hash})

// Update the state vector
if got := s.svs.IncrSeqNo(node); got != seq {
panic("[BUG] sequence number mismatch - who changed it?")
}

return name, nil
}

Expand Down

0 comments on commit 8bfd0ad

Please sign in to comment.