Skip to content

Commit

Permalink
Merge pull request #480 from quorumcontrol/republish-perf
Browse files Browse the repository at this point in the history
add mini cache for resending current round to clients
  • Loading branch information
brandonwestcott authored Sep 28, 2020
2 parents 9250e46 + 3a98650 commit 6624aff
Showing 1 changed file with 32 additions and 13 deletions.
45 changes: 32 additions & 13 deletions signer/gossip/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ type Node struct {
startCtx context.Context

closer io.Closer

roundRepublishCache *lru.Cache
}

type NewNodeOptions struct {
Expand Down Expand Up @@ -116,6 +118,11 @@ func NewNode(ctx context.Context, opts *NewNodeOptions) (*Node, error) {
return nil, fmt.Errorf("error creating cache: %w", err)
}

roundRepublishCache, err := lru.New(3)
if err != nil {
return nil, fmt.Errorf("error creating cache: %w", err)
}

nodeName := opts.Name
if nodeName == "" {
nodeName = fmt.Sprintf("node-%d", signerIndex)
Expand All @@ -128,18 +135,19 @@ func NewNode(ctx context.Context, opts *NewNodeOptions) (*Node, error) {
dagStore := opts.DagStore

n := &Node{
name: nodeName,
p2pNode: opts.P2PNode,
signKey: opts.SignKey,
notaryGroup: opts.NotaryGroup,
dagStore: dagStore,
hamtStore: hamtStore,
dataStore: dataStore,
signerIndex: signerIndex,
inflight: cache,
mempool: newMempool(),
rootContext: opts.RootActorContext,
logger: logger,
name: nodeName,
p2pNode: opts.P2PNode,
signKey: opts.SignKey,
notaryGroup: opts.NotaryGroup,
dagStore: dagStore,
hamtStore: hamtStore,
dataStore: dataStore,
signerIndex: signerIndex,
inflight: cache,
mempool: newMempool(),
rootContext: opts.RootActorContext,
logger: logger,
roundRepublishCache: roundRepublishCache,
}

err = n.initRoundHolder()
Expand Down Expand Up @@ -243,7 +251,7 @@ func (n *Node) maybeRepublish(ctx context.Context) {

if found && previousRound != nil && previousRound.published {
n.logger.Debugf("republishing round: %d", previousRound.height)
n.publishCompletedRound(ctx, previousRound)
n.republishCompletedRound(ctx, previousRound)
}
}
}
Expand Down Expand Up @@ -451,9 +459,20 @@ func (n *Node) publishCompletedRound(ctx context.Context, round *round) error {

defer func() { round.published = true }()

n.roundRepublishCache.Add(round.height, conf.Data())

return n.pubsub.Publish(n.notaryGroup.ID, conf.Data())
}

func (n *Node) republishCompletedRound(ctx context.Context, round *round) error {
roundConfPayload, ok := n.roundRepublishCache.Peek(round) // Peek acts as FIFO based on insert above
if ok {
n.logger.Debugf("republishing round confirmed to: %s", n.notaryGroup.ID)
return n.pubsub.Publish(n.notaryGroup.ID, roundConfPayload.([]byte))
}
return n.publishCompletedRound(ctx, round)
}

func (n *Node) storeCompletedRound(round *types.RoundWrapper) error {
heightBytes := make([]byte, binary.MaxVarintLen64)
binary.PutUvarint(heightBytes, round.Height())
Expand Down

0 comments on commit 6624aff

Please sign in to comment.