From fc2735bf183cc99465b6e60dec212538f2829d1c Mon Sep 17 00:00:00 2001 From: Jose Luis Lucas Date: Fri, 16 Nov 2018 14:00:23 +0100 Subject: [PATCH] WIP: Integrate gossip agent into QED. Hungs in raftwal tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Gabriel Díaz --- api/apihttp/apihttp_test.go | 2 +- gossip/config.go | 2 +- gossip/node.go | 16 +++++ gossip/sender/sender.go | 114 +++++++++++++++++++++++++++++++++++ protocol/protocol.go | 117 ++++++++++++++++++++++++++++++++++++ raftwal/fsm.go | 31 +++++++--- raftwal/fsm_test.go | 11 ++-- raftwal/raft.go | 5 +- raftwal/raft_test.go | 6 +- server/server.go | 31 +++++++++- 10 files changed, 315 insertions(+), 20 deletions(-) create mode 100644 gossip/sender/sender.go create mode 100644 protocol/protocol.go diff --git a/api/apihttp/apihttp_test.go b/api/apihttp/apihttp_test.go index 625d2957e..cfcfd8d17 100644 --- a/api/apihttp/apihttp_test.go +++ b/api/apihttp/apihttp_test.go @@ -314,7 +314,7 @@ func newNodeBench(b *testing.B, id int) (*raftwal.RaftBalloon, func()) { raftPath := fmt.Sprintf("/var/tmp/raft-test/node%d/raft", id) os.MkdirAll(raftPath, os.FileMode(0755)) - r, err := raftwal.NewRaftBalloon(raftPath, ":8301", fmt.Sprintf("%d", id), badger) + r, err := raftwal.NewRaftBalloon(raftPath, ":8301", fmt.Sprintf("%d", id), badger, make(chan *protocol.Snapshot)) assert.NoError(b, err) return r, func() { diff --git a/gossip/config.go b/gossip/config.go index ae4e8bc20..941ae578e 100644 --- a/gossip/config.go +++ b/gossip/config.go @@ -63,7 +63,7 @@ const DefaultBindPort int = 7946 // DefaultConfig contains the defaults for configurations. func DefaultConfig() *Config { return &Config{ - BindAddr: "0.0.0.0", + BindAddr: "0.0.0.0:12345", AdvertiseAddr: "", LeaveOnTerm: true, EnableCompression: false, diff --git a/gossip/node.go b/gossip/node.go index 046fb63e4..6d7a1eea4 100644 --- a/gossip/node.go +++ b/gossip/node.go @@ -112,6 +112,7 @@ type Member struct { Port uint16 Role NodeType Status MemberStatus + Node *memberlist.Node } // MemberStatus is the state that a member is in. @@ -307,6 +308,7 @@ func (n *Node) getMember(peer *memberlist.Node) *Member { Addr: net.IP(peer.Addr), Port: peer.Port, Role: meta.Role, + Node: peer, } } @@ -347,3 +349,17 @@ func (n *Node) decodeMetadata(buf []byte) (*NodeMeta, error) { } return meta, nil } + +func (n *Node) GetPeers(numNodes int, nodeType NodeType) []*Member { + fullList := n.topology.Get(nodeType) + if len(fullList) <= numNodes { + return fullList + } + + var filteredList []*Member + for i := 0; i < numNodes; i++ { + filteredList = append(filteredList, fullList[i]) + } + + return filteredList +} diff --git a/gossip/sender/sender.go b/gossip/sender/sender.go new file mode 100644 index 000000000..ab3aca02e --- /dev/null +++ b/gossip/sender/sender.go @@ -0,0 +1,114 @@ +/* + Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package sender + +import ( + "bytes" + "fmt" + "time" + + "github.com/bbva/qed/gossip" + "github.com/bbva/qed/log" + "github.com/bbva/qed/protocol" + "github.com/bbva/qed/sign" + "github.com/hashicorp/go-msgpack/codec" +) + +type Config struct { + BatchSize uint + BatchInterval time.Duration + TTL int +} + +func DefaultConfig() *Config { + return &Config{ + 100, + 1 * time.Second, + 2, + } +} + +func Start(n *gossip.Node, ch chan *protocol.Snapshot) { + ticker := time.NewTicker(1 * time.Second) + + for { + select { + case <-ticker.C: + msg, _ := encode(getBatch(ch)) + + peers := n.GetPeers(1, gossip.AuditorType) + peers = append(peers, n.GetPeers(1, gossip.MonitorType)...) + peers = append(peers, n.GetPeers(1, gossip.PublisherType)...) + + for _, peer := range peers { + err := n.Memberlist().SendReliable(peer.Node, msg) + if err != nil { + log.Errorf("Failed send message: %v", err) + } + } + // TODO: Implement graceful shutdown. + } + } +} + +func encode(msg protocol.BatchSnapshots) ([]byte, error) { + var buf bytes.Buffer + encoder := codec.NewEncoder(&buf, &codec.MsgpackHandle{}) + if err := encoder.Encode(msg); err != nil { + log.Errorf("Failed to encode message: %v", err) + return nil, err + } + return buf.Bytes(), nil +} + +func getBatch(ch chan *protocol.Snapshot) protocol.BatchSnapshots { + + var snapshot *protocol.Snapshot + var batch protocol.BatchSnapshots + var batchSize int = 100 + var counter int = 0 + batch.Snapshots = make([]*protocol.SignedSnapshot, 0) + batch.TTL = 3 + + for { + select { + case snapshot = <-ch: + counter++ + default: + return batch + } + + ss, err := doSign(sign.NewEd25519Signer(), snapshot) + if err != nil { + log.Errorf("Failed signing message: %v", err) + } + batch.Snapshots = append(batch.Snapshots, ss) + + if counter == batchSize { + return batch + } + + } + +} + +func doSign(signer sign.Signer, snapshot *protocol.Snapshot) (*protocol.SignedSnapshot, error) { + + signature, err := signer.Sign([]byte(fmt.Sprintf("%v", snapshot))) + if err != nil { + fmt.Println("Publisher: error signing commitment") + return nil, err + } + return &protocol.SignedSnapshot{snapshot, signature}, nil +} diff --git a/protocol/protocol.go b/protocol/protocol.go new file mode 100644 index 000000000..6820f2e04 --- /dev/null +++ b/protocol/protocol.go @@ -0,0 +1,117 @@ +/* + Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package protocol + +import ( + "github.com/bbva/qed/balloon" + "github.com/bbva/qed/balloon/history" + "github.com/bbva/qed/balloon/hyper" + "github.com/bbva/qed/balloon/visitor" + "github.com/bbva/qed/hashing" + "github.com/bbva/qed/util" +) + +// Event is the public struct that Add handler function uses to +// parse the post params. +type Event struct { + Event []byte +} + +// MembershipQuery is the public struct that apihttp.Membership +// Handler uses to parse the post params. +type MembershipQuery struct { + Key []byte + Version uint64 +} + +// Snapshot is the public struct that apihttp.Add Handler call returns. +type Snapshot struct { + HistoryDigest hashing.Digest + HyperDigest hashing.Digest + Version uint64 + EventDigest hashing.Digest +} + +type SignedSnapshot struct { + Snapshot *Snapshot + Signature []byte +} + +type BatchSnapshots struct { + Snapshots []*SignedSnapshot + TTL int +} + +type MembershipResult struct { + Exists bool + Hyper visitor.AuditPath + History visitor.AuditPath + CurrentVersion uint64 + QueryVersion uint64 + ActualVersion uint64 + KeyDigest hashing.Digest + Key []byte +} + +type IncrementalRequest struct { + Start uint64 + End uint64 +} + +type IncrementalResponse struct { + Start uint64 + End uint64 + AuditPath visitor.AuditPath +} + +// ToMembershipProof translates internal api balloon.MembershipProof to the +// public struct protocol.MembershipResult. +func ToMembershipResult(key []byte, mp *balloon.MembershipProof) *MembershipResult { + return &MembershipResult{ + mp.Exists, + mp.HyperProof.AuditPath(), + mp.HistoryProof.AuditPath(), + mp.CurrentVersion, + mp.QueryVersion, + mp.ActualVersion, + mp.KeyDigest, + key, + } +} + +// ToBaloonProof translate public protocol.MembershipResult:w to internal +// balloon.Proof. +func ToBalloonProof(id []byte, mr *MembershipResult, hasherF func() hashing.Hasher) *balloon.MembershipProof { + + historyProof := history.NewMembershipProof(mr.ActualVersion, mr.QueryVersion, mr.History, hasherF()) + hyperProof := hyper.NewQueryProof(mr.KeyDigest, util.Uint64AsBytes(mr.ActualVersion), mr.Hyper, hasherF()) + + return balloon.NewMembershipProof(mr.Exists, hyperProof, historyProof, mr.CurrentVersion, mr.ActualVersion, mr.QueryVersion, mr.KeyDigest, hasherF()) + +} + +func ToIncrementalResponse(proof *balloon.IncrementalProof) *IncrementalResponse { + return &IncrementalResponse{ + proof.Start, + proof.End, + proof.AuditPath, + } +} + +func ToIncrementalProof(ir *IncrementalResponse, hasher hashing.Hasher) *balloon.IncrementalProof { + return balloon.NewIncrementalProof(ir.Start, ir.End, ir.AuditPath, hasher) +} diff --git a/raftwal/fsm.go b/raftwal/fsm.go index f144984a7..97730da2e 100644 --- a/raftwal/fsm.go +++ b/raftwal/fsm.go @@ -22,6 +22,8 @@ import ( "io" "sync" + "github.com/bbva/qed/protocol" + "github.com/bbva/qed/balloon" "github.com/bbva/qed/hashing" "github.com/bbva/qed/log" @@ -47,6 +49,8 @@ type BalloonFSM struct { balloon *balloon.Balloon state *fsmState + agentsQueue chan *protocol.Snapshot + restoreMu sync.RWMutex // Restore needs exclusive access to database. } @@ -65,9 +69,9 @@ func loadState(s storage.ManagedStore) (*fsmState, error) { return &state, err } -func NewBalloonFSM(store storage.ManagedStore, hasherF func() hashing.Hasher) (*BalloonFSM, error) { +func NewBalloonFSM(store storage.ManagedStore, hasherF func() hashing.Hasher, agentsQueue chan *protocol.Snapshot) (*BalloonFSM, error) { - balloon, err := balloon.NewBalloon(store, hasherF) + b, err := balloon.NewBalloon(store, hasherF) if err != nil { return nil, err } @@ -78,10 +82,11 @@ func NewBalloonFSM(store storage.ManagedStore, hasherF func() hashing.Hasher) (* } return &BalloonFSM{ - hasherF: hasherF, - store: store, - balloon: balloon, - state: state, + hasherF: hasherF, + store: store, + balloon: b, + state: state, + agentsQueue: agentsQueue, }, nil } @@ -181,8 +186,20 @@ func (fsm *BalloonFSM) applyAdd(event []byte, state *fsmState) *fsmAddResponse { } mutations = append(mutations, storage.NewMutation(storage.FSMStatePrefix, []byte{0xab}, stateBuff.Bytes())) - fsm.store.Mutate(mutations) + err = fsm.store.Mutate(mutations) + if err != nil { + return &fsmAddResponse{error: err} + } fsm.state = state + + //Send snapshot to gossip agents + fsm.agentsQueue <- &protocol.Snapshot{ + HistoryDigest: commitment.HistoryDigest, + HyperDigest: commitment.HyperDigest, + Version: commitment.Version, + EventDigest: event, + } + return &fsmAddResponse{commitment: commitment} } diff --git a/raftwal/fsm_test.go b/raftwal/fsm_test.go index 9facb76f6..365a1def7 100644 --- a/raftwal/fsm_test.go +++ b/raftwal/fsm_test.go @@ -8,6 +8,7 @@ import ( assert "github.com/stretchr/testify/require" "github.com/bbva/qed/hashing" + "github.com/bbva/qed/protocol" "github.com/bbva/qed/raftwal/commands" storage_utils "github.com/bbva/qed/testutils/storage" ) @@ -16,7 +17,7 @@ func TestApply(t *testing.T) { store, closeF := storage_utils.OpenBadgerStore(t, "/var/tmp/balloon.test.db") defer closeF() - fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher) + fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100)) assert.NoError(t, err) // happy path @@ -41,7 +42,7 @@ func TestSnapshot(t *testing.T) { store, closeF := storage_utils.OpenBadgerStore(t, "/var/tmp/balloon.test.db") defer closeF() - fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher) + fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100)) assert.NoError(t, err) fsm.Apply(newRaftLog(0, 0)) @@ -65,7 +66,7 @@ func TestRestore(t *testing.T) { store, closeF := storage_utils.OpenBadgerStore(t, "/var/tmp/balloon.test.db") defer closeF() - fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher) + fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100)) assert.NoError(t, err) assert.NoError(t, fsm.Restore(&fakeRC{})) @@ -75,7 +76,7 @@ func TestAddAndRestoreSnapshot(t *testing.T) { store, closeF := storage_utils.OpenBadgerStore(t, "/var/tmp/balloon.test.db") defer closeF() - fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher) + fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100)) assert.NoError(t, err) fsm.Apply(newRaftLog(0, 0)) @@ -106,7 +107,7 @@ func TestAddAndRestoreSnapshot(t *testing.T) { defer close2F() // New FSMStore - fsm2, err := NewBalloonFSM(store2, hashing.NewSha256Hasher) + fsm2, err := NewBalloonFSM(store2, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100)) assert.NoError(t, err) fsm2.Restore(r) diff --git a/raftwal/raft.go b/raftwal/raft.go index 3986fd218..db006f5e0 100644 --- a/raftwal/raft.go +++ b/raftwal/raft.go @@ -26,6 +26,7 @@ import ( "github.com/bbva/qed/balloon" "github.com/bbva/qed/hashing" "github.com/bbva/qed/log" + "github.com/bbva/qed/protocol" "github.com/bbva/qed/raftwal/commands" "github.com/bbva/qed/storage" raftbadger "github.com/bbva/raft-badger" @@ -90,7 +91,7 @@ type RaftBalloon struct { } // New returns a new RaftBalloon. -func NewRaftBalloon(path, addr, id string, store storage.ManagedStore) (*RaftBalloon, error) { +func NewRaftBalloon(path, addr, id string, store storage.ManagedStore, agentsQueue chan *protocol.Snapshot) (*RaftBalloon, error) { // Create the log store and stable store badgerLogStore, err := raftbadger.New(raftbadger.Options{Path: path + "/logs", NoSync: true, ValueLogGC: true}) // raftbadger.NewBadgerStore(path + "/logs") @@ -108,7 +109,7 @@ func NewRaftBalloon(path, addr, id string, store storage.ManagedStore) (*RaftBal } // Instantiate balloon FSM - fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher) + fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, agentsQueue) if err != nil { return nil, fmt.Errorf("new balloon fsm: %s", err) } diff --git a/raftwal/raft_test.go b/raftwal/raft_test.go index c69bb6241..691f23181 100644 --- a/raftwal/raft_test.go +++ b/raftwal/raft_test.go @@ -27,6 +27,8 @@ import ( "testing" "time" + "github.com/bbva/qed/protocol" + "github.com/bbva/qed/log" "github.com/bbva/qed/storage/badger" utilrand "github.com/bbva/qed/testutils/rand" @@ -54,7 +56,7 @@ func newNode(t *testing.T, id int) (*RaftBalloon, func()) { raftPath := fmt.Sprintf("/var/tmp/raft-test/node%d/raft", id) os.MkdirAll(raftPath, os.FileMode(0755)) - r, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), badger) + r, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), badger, make(chan *protocol.Snapshot, 100)) require.NoError(t, err) return r, func() { @@ -359,7 +361,7 @@ func newNodeBench(b *testing.B, id int) (*RaftBalloon, func()) { raftPath := fmt.Sprintf("/var/tmp/raft-test/node%d/raft", id) os.MkdirAll(raftPath, os.FileMode(0755)) - r, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), badger) + r, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), badger, make(chan *protocol.Snapshot, 100)) require.NoError(b, err) return r, func() { diff --git a/server/server.go b/server/server.go index 0d46c8671..4343ebb05 100644 --- a/server/server.go +++ b/server/server.go @@ -32,8 +32,11 @@ import ( "github.com/bbva/qed/api/apihttp" "github.com/bbva/qed/api/mgmthttp" "github.com/bbva/qed/api/tampering" + "github.com/bbva/qed/gossip" + "github.com/bbva/qed/gossip/sender" "github.com/bbva/qed/hashing" "github.com/bbva/qed/log" + "github.com/bbva/qed/protocol" "github.com/bbva/qed/raftwal" "github.com/bbva/qed/sign" "github.com/bbva/qed/storage" @@ -66,6 +69,8 @@ type Server struct { tamperingServer *http.Server profilingServer *http.Server signer sign.Signer + agent *gossip.Node + agentsQueue chan *protocol.Snapshot } // NewServer synthesizes a new Server based on the parameters it receives. @@ -122,8 +127,17 @@ func NewServer( return nil, err } + // Create gossip agent + server.agent, err = gossip.Create(gossip.DefaultConfig(), gossip.NewFakeDelegate()) + if err != nil { + return nil, err + } + + // TODO: add queue size to config + server.agentsQueue = make(chan *protocol.Snapshot, 10000) + // Create RaftBalloon - server.raftBalloon, err = raftwal.NewRaftBalloon(raftPath, raftAddr, nodeID, store) + server.raftBalloon, err = raftwal.NewRaftBalloon(raftPath, raftAddr, nodeID, store, server.agentsQueue) if err != nil { return nil, err } @@ -142,7 +156,6 @@ func NewServer( } return server, nil - } func join(joinAddr, raftAddr, nodeID string) error { @@ -210,6 +223,11 @@ func (s *Server) Start() error { } } + go func() { + log.Debug(" * Starting QED agent.") + sender.Start(s.agent, s.agentsQueue) + }() + awaitTermSignal(s.Stop) log.Debug("Stopping server, about to exit...") @@ -251,6 +269,15 @@ func (s *Server) Stop() { if err != nil { log.Error(err) } + + log.Debugf("Closing QED agent queue...") + close(s.agentsQueue) + + log.Debugf("Stopping QED agent...") + if err := s.agent.Shutdown(); err != nil { + log.Error(err) + } + log.Debugf("Done. Exiting...\n") }