From 3a98650951f39a5e67682afe8e79bc1489c97926 Mon Sep 17 00:00:00 2001 From: Brandon Westcott Date: Mon, 28 Sep 2020 16:43:39 -0400 Subject: [PATCH] add mini cache for resending current round to clients --- signer/gossip/node.go | 45 ++++++++++++++++++++++++++++++------------- 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/signer/gossip/node.go b/signer/gossip/node.go index 39659d85..1c4ce86c 100644 --- a/signer/gossip/node.go +++ b/signer/gossip/node.go @@ -75,6 +75,8 @@ type Node struct { startCtx context.Context closer io.Closer + + roundRepublishCache *lru.Cache } type NewNodeOptions struct { @@ -116,6 +118,11 @@ func NewNode(ctx context.Context, opts *NewNodeOptions) (*Node, error) { return nil, fmt.Errorf("error creating cache: %w", err) } + roundRepublishCache, err := lru.New(3) + if err != nil { + return nil, fmt.Errorf("error creating cache: %w", err) + } + nodeName := opts.Name if nodeName == "" { nodeName = fmt.Sprintf("node-%d", signerIndex) @@ -128,18 +135,19 @@ func NewNode(ctx context.Context, opts *NewNodeOptions) (*Node, error) { dagStore := opts.DagStore n := &Node{ - name: nodeName, - p2pNode: opts.P2PNode, - signKey: opts.SignKey, - notaryGroup: opts.NotaryGroup, - dagStore: dagStore, - hamtStore: hamtStore, - dataStore: dataStore, - signerIndex: signerIndex, - inflight: cache, - mempool: newMempool(), - rootContext: opts.RootActorContext, - logger: logger, + name: nodeName, + p2pNode: opts.P2PNode, + signKey: opts.SignKey, + notaryGroup: opts.NotaryGroup, + dagStore: dagStore, + hamtStore: hamtStore, + dataStore: dataStore, + signerIndex: signerIndex, + inflight: cache, + mempool: newMempool(), + rootContext: opts.RootActorContext, + logger: logger, + roundRepublishCache: roundRepublishCache, } err = n.initRoundHolder() @@ -243,7 +251,7 @@ func (n *Node) maybeRepublish(ctx context.Context) { if found && previousRound != nil && previousRound.published { n.logger.Debugf("republishing round: %d", previousRound.height) - n.publishCompletedRound(ctx, previousRound) + n.republishCompletedRound(ctx, previousRound) } } } @@ -451,9 +459,20 @@ func (n *Node) publishCompletedRound(ctx context.Context, round *round) error { defer func() { round.published = true }() + n.roundRepublishCache.Add(round.height, conf.Data()) + return n.pubsub.Publish(n.notaryGroup.ID, conf.Data()) } +func (n *Node) republishCompletedRound(ctx context.Context, round *round) error { + roundConfPayload, ok := n.roundRepublishCache.Peek(round) // Peek acts as FIFO based on insert above + if ok { + n.logger.Debugf("republishing round confirmed to: %s", n.notaryGroup.ID) + return n.pubsub.Publish(n.notaryGroup.ID, roundConfPayload.([]byte)) + } + return n.publishCompletedRound(ctx, round) +} + func (n *Node) storeCompletedRound(round *types.RoundWrapper) error { heightBytes := make([]byte, binary.MaxVarintLen64) binary.PutUvarint(heightBytes, round.Height())