Skip to content

Commit

Permalink
Improve sender parallelism.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jose Luis Lucas authored and iknite committed Feb 19, 2019
1 parent 87dd8de commit d66c35e
Showing 1 changed file with 24 additions and 24 deletions.
48 changes: 24 additions & 24 deletions gossip/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,23 @@ func NewSender(a *gossip.Agent, c *Config, s sign.Signer) *Sender {
}
}

func (s Sender) Start(ch chan *protocol.Snapshot) {
ticker := time.NewTicker(1000 * time.Millisecond)

for i := 0; i < 10; i++ {
go s.batcherSender(i, ch, s.quit)
}

for {
select {
case <-ticker.C:
log.Debug("Messages in sender queue: ", len(ch))
case <-s.quit:
return
}
}
}

func (s Sender) batcherSender(id int, ch chan *protocol.Snapshot, quit chan bool) {
batches := []*protocol.BatchSnapshots{}
batch := &protocol.BatchSnapshots{
Expand Down Expand Up @@ -98,7 +115,7 @@ func (s Sender) batcherSender(id int, ch chan *protocol.Snapshot, quit chan bool
resetBatches()
}
for _, b := range batches {
s.sender(*b)
go s.sender(*b)
}
batches = []*protocol.BatchSnapshots{}

Expand All @@ -120,34 +137,17 @@ func (s Sender) sender(batch protocol.BatchSnapshots) {
dst := peer.Node()
log.Infof("Sending batch %+v to node %+v\n", batch, dst.Name)
wg.Add(1)
go func() {
err := s.Agent.Memberlist().SendReliable(dst, msg)
if err != nil {
log.Errorf("Failed send message: %v", err)
}
}()
// go func() {
err := s.Agent.Memberlist().SendReliable(dst, msg)
if err != nil {
log.Errorf("Failed send message: %v", err)
}
// }()
}
wg.Wait()
log.Infof("Sent batch %+v to nodes %+v\n", batch, peers.L)
}

func (s Sender) Start(ch chan *protocol.Snapshot) {
ticker := time.NewTicker(1000 * time.Millisecond)

for i := 0; i < 10; i++ {
go s.batcherSender(i, ch, s.quit)
}

for {
select {
case <-ticker.C:
log.Debug("Messages in sender queue: ", len(ch))
case <-s.quit:
return
}
}
}

func (s Sender) Stop() {
metrics.Qed_sender_instances_count.Dec()
s.quit <- true
Expand Down

0 comments on commit d66c35e

Please sign in to comment.