diff --git a/server/server.go b/server/server.go index e56058939..5fca3cd73 100644 --- a/server/server.go +++ b/server/server.go @@ -34,15 +34,12 @@ import ( "github.com/bbva/qed/api/apihttp" "github.com/bbva/qed/api/mgmthttp" "github.com/bbva/qed/gossip" - "github.com/bbva/qed/gossip/member" - "github.com/bbva/qed/gossip/sender" "github.com/bbva/qed/log" "github.com/bbva/qed/metrics" "github.com/bbva/qed/protocol" "github.com/bbva/qed/raftwal" "github.com/bbva/qed/sign" "github.com/bbva/qed/storage/rocks" - "github.com/bbva/qed/util" ) // Server encapsulates the data and login to start/stop a QED server @@ -57,7 +54,7 @@ type Server struct { metricsServer *metrics.Server prometheusRegistry *prometheus.Registry signer sign.Signer - sender *sender.Sender + sender *Sender agent *gossip.Agent snapshotsCh chan *protocol.Snapshot } @@ -125,26 +122,19 @@ func NewServer(conf *Config) (*Server, error) { // Create gossip agent config := gossip.DefaultConfig() config.BindAddr = conf.GossipAddr - config.Role = member.Server + config.Role = "server" config.NodeName = conf.NodeID - server.agent, err = gossip.NewAgent(config, nil, server.metricsServer) + server.agent, err = gossip.NewAgentFromConfig(config) if err != nil { return nil, err } - if len(conf.GossipJoinAddr) > 0 { - _, err = server.agent.Join(conf.GossipJoinAddr) - if err != nil { - return nil, err - } - } - // TODO: add queue size to config - server.snapshotsCh = make(chan *protocol.Snapshot, 2<<16) + server.snapshotsCh = make(chan *protocol.Snapshot, 1<<16) // Create sender - server.sender = sender.NewSender(server.agent, sender.DefaultConfig(), server.signer) + server.sender = NewSender(server.agent, server.signer, 100, 2, 3) // Create RaftBalloon server.raftBalloon, err = raftwal.NewRaftBalloon(conf.RaftPath, conf.RaftAddr, conf.NodeID, store, server.snapshotsCh) @@ -253,14 +243,9 @@ func (s *Server) Start() error { } } - go func() { - log.Debug(" * Starting QED gossip agent.") - s.sender.Start(s.snapshotsCh) - }() - - util.AwaitTermSignal(s.Stop) + s.sender.Start(s.snapshotsCh) - log.Debug("Stopping server, about to exit...") + s.agent.Start() return nil } @@ -292,8 +277,9 @@ func (s *Server) Stop() error { return err } - log.Debugf("Closing QED sender...") - s.sender.Stop() + /* + log.Debugf("Closing QED sender...") + s.sender.Stop() */ close(s.snapshotsCh) log.Debugf("Stopping QED agent...")