Skip to content

Commit

Permalink
Merge pull request #157 from ipfs/feat/variable-rebroadcast-interval
Browse files Browse the repository at this point in the history
broadcast: randomize rebroadcast interval by 30%
  • Loading branch information
hsanjuan committed Mar 22, 2022
2 parents 668b0c2 + e9bd953 commit 1bc7a68
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 11 deletions.
26 changes: 15 additions & 11 deletions crdt.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"
"io"
"math/rand"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -54,6 +55,10 @@ var (
ErrNoMoreBroadcast = errors.New("receiving blocks aborted since no new blocks will be broadcasted")
)

func init() {
rand.Seed(time.Now().UnixNano())
}

// A Broadcaster provides a way to send (notify) an opaque payload to
// all replicas and to retrieve payloads broadcasted.
type Broadcaster interface {
Expand Down Expand Up @@ -451,19 +456,18 @@ func (store *Datastore) encodeBroadcast(heads []cid.Cid) ([]byte, error) {
return proto.Marshal(&bcastData)
}

func randomizeInterval(d time.Duration) time.Duration {
// 30% of the configured interval
leeway := (d * 30 / 100)
// A random number between -leeway|+leeway
randomInterval := time.Duration(rand.Int63n(int64(leeway*2))) - leeway
return d + randomInterval
}

func (store *Datastore) rebroadcast() {
timer := time.NewTimer(store.opts.RebroadcastInterval)
timer := time.NewTimer(randomizeInterval(store.opts.RebroadcastInterval))

for {
select {
case <-store.ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
default:
}

select {
case <-store.ctx.Done():
if !timer.Stop() {
Expand All @@ -472,7 +476,7 @@ func (store *Datastore) rebroadcast() {
return
case <-timer.C:
store.rebroadcastHeads()
timer.Reset(store.opts.RebroadcastInterval)
timer.Reset(randomizeInterval(store.opts.RebroadcastInterval))
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions crdt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -896,3 +896,17 @@ func BenchmarkQueryElements(b *testing.B) {
}
b.Log(totalSize)
}

func TestRandomizeInterval(t *testing.T) {
prevR := 100 * time.Second
for i := 0; i < 1000; i++ {
r := randomizeInterval(100 * time.Second)
if r < 70*time.Second || r > 130*time.Second {
t.Error("r was ", r)
}
if prevR == r {
t.Log("r and prevR were equal")
}
prevR = r
}
}

0 comments on commit 1bc7a68

Please sign in to comment.