Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
document republisher logic
Browse files Browse the repository at this point in the history
Document the `Republisher` code in `repub.go` according to the explanation in
ipfs/kubo#5092 (comment). Erring on
the side of verbosity since this has been a part of the code with very little
review up until now.
  • Loading branch information
schomatis committed Dec 19, 2018
1 parent 925c0a8 commit 0abe769
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 55 deletions.
128 changes: 74 additions & 54 deletions repub.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,58 +8,53 @@ import (
cid "github.com/ipfs/go-cid"
)

// PubFunc is the function used by the `publish()` method.
// PubFunc is the user-defined function that determines exactly what
// logic entails "publishing" a `Cid` value.
type PubFunc func(context.Context, cid.Cid) error

// Republisher manages when to publish a given entry.
type Republisher struct {
TimeoutLong time.Duration
TimeoutShort time.Duration
Publish chan struct{}
pubfunc PubFunc
pubnowch chan chan struct{}
TimeoutLong time.Duration
TimeoutShort time.Duration
valueHasBeenUpdated chan struct{}
pubfunc PubFunc
immediatePublish chan chan struct{}

valueLock sync.Mutex
valueToPublish cid.Cid
lastValuePublished cid.Cid

ctx context.Context
cancel func()

lk sync.Mutex
val cid.Cid
lastpub cid.Cid
}

// NewRepublisher creates a new Republisher object to republish the given root
// using the given short and long time intervals.
func NewRepublisher(ctx context.Context, pf PubFunc, tshort, tlong time.Duration) *Republisher {
ctx, cancel := context.WithCancel(ctx)
return &Republisher{
TimeoutShort: tshort,
TimeoutLong: tlong,
Publish: make(chan struct{}, 1),
pubfunc: pf,
pubnowch: make(chan chan struct{}),
ctx: ctx,
cancel: cancel,
TimeoutShort: tshort,
TimeoutLong: tlong,
valueHasBeenUpdated: make(chan struct{}, 1),
pubfunc: pf,
immediatePublish: make(chan chan struct{}),
ctx: ctx,
cancel: cancel,
}
}

func (p *Republisher) setVal(c cid.Cid) {
p.lk.Lock()
defer p.lk.Unlock()
p.val = c
}

// WaitPub Returns immediately if `lastpub` value is consistent with the
// current value `val`, else will block until `val` has been published.
// WaitPub waits for the current value to be published (or returns early
// if it already has).
func (p *Republisher) WaitPub() {
p.lk.Lock()
consistent := p.lastpub == p.val
p.lk.Unlock()
if consistent {
p.valueLock.Lock()
valueHasBeenPublished := p.lastValuePublished == p.valueToPublish
p.valueLock.Unlock()
if valueHasBeenPublished {
return
}

wait := make(chan struct{})
p.pubnowch <- wait
p.immediatePublish <- wait
<-wait
}

Expand All @@ -69,67 +64,92 @@ func (p *Republisher) Close() error {
return err
}

// Touch signals that an update has occurred since the last publish.
// Multiple consecutive touches may extend the time period before
// the next Publish occurs in order to more efficiently batch updates.
// Update the `valueToPublish` and signal it in the `valueHasBeenUpdated`
// channel. Multiple consecutive updates may extend the time period before
// the next publish occurs in order to more efficiently batch updates.
func (np *Republisher) Update(c cid.Cid) {
np.setVal(c)
np.valueLock.Lock()
np.valueToPublish = c
np.valueLock.Unlock()

select {
case np.Publish <- struct{}{}:
case np.valueHasBeenUpdated <- struct{}{}:
default:
}
}

// Run is the main republisher loop.
// TODO: Document according to:
// https://github.com/ipfs/go-ipfs/issues/5092#issuecomment-398524255.
// Run contains the core logic of the `Republisher`. It calls the user-defined
// `pubfunc` function whenever the `Cid` value is updated. The complexity comes
// from the fact that `pubfunc` may be slow so we need to batch updates.
// Algorithm:
// 1. When we receive the first update after publishing, we set a `longer` timer.
// 2. When we receive any update, we reset the `quick` timer.
// 3. If either the `quick` timeout or the `longer` timeout elapses,
// we call `publish` with the latest updated value.
//
// The `longer` timer ensures that we delay publishing by at most
// `TimeoutLong`. The `quick` timer allows us to publish sooner if
// it looks like there are no more updates coming down the pipe.
func (np *Republisher) Run() {
for {
select {
case <-np.Publish:
case <-np.ctx.Done():
return
case <-np.valueHasBeenUpdated:
// Fast timeout, a `publish` will be issued if there are
// no more updates before it expires (restarted every time
// the `valueHasBeenUpdated` is signaled).
quick := time.After(np.TimeoutShort)
// Long timeout that guarantees a `publish` after it expires
// even if the value keeps being updated (and `quick` is
// restarted).
longer := time.After(np.TimeoutLong)

wait:
var pubnowresp chan struct{}
var valueHasBeenPublished chan struct{}

select {
case <-np.ctx.Done():
return
case <-np.Publish:
case <-np.valueHasBeenUpdated:
// The `valueToPublish` has been updated *again* since
// the last time we checked and we still haven't published
// it, restart the `quick` timer allowing for some more
// time to see if the `valueToPublish` changes again.
quick = time.After(np.TimeoutShort)
goto wait

case <-quick:
case <-longer:
case pubnowresp = <-np.pubnowch:
case valueHasBeenPublished = <-np.immediatePublish:
}

err := np.publish(np.ctx)
if pubnowresp != nil {
pubnowresp <- struct{}{}
if valueHasBeenPublished != nil {
// The user is waiting in `WaitPub` with this channel, signal
// that the `publish` has happened.
valueHasBeenPublished <- struct{}{}
}
if err != nil {
log.Errorf("republishRoot error: %s", err)
}

case <-np.ctx.Done():
return
}
}
}

// publish calls the `PubFunc`.
// Wrapper function around the user-defined `pubfunc`. It publishes
// the (last) `valueToPublish` set and registers it in `lastValuePublished`.
func (np *Republisher) publish(ctx context.Context) error {
np.lk.Lock()
topub := np.val
np.lk.Unlock()
np.valueLock.Lock()
topub := np.valueToPublish
np.valueLock.Unlock()

err := np.pubfunc(ctx, topub)
if err != nil {
return err
}
np.lk.Lock()
np.lastpub = topub
np.lk.Unlock()
np.valueLock.Lock()
np.lastValuePublished = topub
np.valueLock.Unlock()
return nil
}
6 changes: 5 additions & 1 deletion root.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ func NewRoot(parent context.Context, ds ipld.DAGService, node *dag.ProtoNode, pf
var repub *Republisher
if pf != nil {
repub = NewRepublisher(parent, pf, time.Millisecond*300, time.Second*3)
repub.setVal(node.Cid())

repub.valueToPublish = node.Cid()
// No need to take the lock here since we just created
// the `Republisher` and no one has access to it yet.

go repub.Run()
}

Expand Down

0 comments on commit 0abe769

Please sign in to comment.