Skip to content

Commit

Permalink
Simplify sender, removing uneeded code. Make it send snaposhots as so…
Browse files Browse the repository at this point in the history
…on as they are enqueued
  • Loading branch information
gdiazlo committed Mar 19, 2019
1 parent a6d896a commit 52b1900
Showing 1 changed file with 8 additions and 12 deletions.
20 changes: 8 additions & 12 deletions gossip/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package sender

import (
"fmt"
"sync"
"time"

"github.com/bbva/qed/gossip"
Expand All @@ -37,11 +36,12 @@ type Sender struct {
}

type Config struct {
BatchSize uint
BatchSize int
BatchInterval time.Duration
NumSenders int
TTL int
EachN int
SendTimer time.Duration
}

func DefaultConfig() *Config {
Expand Down Expand Up @@ -70,6 +70,7 @@ func NewSender(a *gossip.Agent, c *Config, s sign.Signer) *Sender {
// to finish
func (s Sender) Start(ch chan *protocol.Snapshot) {
for i := 0; i < s.config.NumSenders; i++ {
log.Debugf("starting sender %d", i)
go s.batcherSender(i, ch, s.quit)
}
<-s.quit
Expand Down Expand Up @@ -102,17 +103,15 @@ func (s Sender) batcherSender(id int, ch chan *protocol.Snapshot, quit chan bool
log.Errorf("Failed signing message: %v", err)
}
batch.Snapshots = append(batch.Snapshots, ss)

case b := <- s.out:
go s.sender(b)
case <-time.After(s.config.SendTimer):
// send whatever we have on each tick, do not wait
// to have complete batches
if len(batch.Snapshots) > 0 {
s.agent.ChTimedSend(batch, s.out)
batch = s.newBatch()
}
for b := range s.out {
go s.sender(*b)
}
case <-quit:
return
}
Expand All @@ -123,8 +122,7 @@ func (s Sender) batcherSender(id int, ch chan *protocol.Snapshot, quit chan bool
// network topology.
// Do not retry sending to faulty agents, and pray other
// sender will.
func (s Sender) sender(batch protocol.BatchSnapshots) {
var wg sync.WaitGroup
func (s Sender) sender(batch *protocol.BatchSnapshots) {
msg, _ := batch.Encode()
peers := s.agent.Topology.Each(s.config.EachN, nil)
for _, peer := range peers.L {
Expand All @@ -134,13 +132,11 @@ func (s Sender) sender(batch protocol.BatchSnapshots) {
log.Debugf("Sending batch %+v to node %+v\n", batch, dst.Name)

err := s.agent.Memberlist().SendReliable(dst, msg)
if err == nil {
if err != nil {
log.Infof("Failed send message to %+v because: %v", peer, err)
break
}
}
wg.Wait()
log.Infof("Sent batch %+v to nodes %+v\n", batch, peers.L)
log.Debugf("Sent batch %+v to nodes %+v\n", batch, peers.L)
}

func (s Sender) Stop() {
Expand Down

0 comments on commit 52b1900

Please sign in to comment.