Skip to content

Commit

Permalink
WIP: Integrate gossip agent into QED. Hungs in raftwal tests
Browse files Browse the repository at this point in the history
Co-authored-by: Gabriel Díaz <g.diaz.lopezllave@bbva.com>
  • Loading branch information
Jose Luis Lucas and gdiazlo committed Nov 16, 2018
1 parent d7798c7 commit fc2735b
Show file tree
Hide file tree
Showing 10 changed files with 315 additions and 20 deletions.
2 changes: 1 addition & 1 deletion api/apihttp/apihttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func newNodeBench(b *testing.B, id int) (*raftwal.RaftBalloon, func()) {

raftPath := fmt.Sprintf("/var/tmp/raft-test/node%d/raft", id)
os.MkdirAll(raftPath, os.FileMode(0755))
r, err := raftwal.NewRaftBalloon(raftPath, ":8301", fmt.Sprintf("%d", id), badger)
r, err := raftwal.NewRaftBalloon(raftPath, ":8301", fmt.Sprintf("%d", id), badger, make(chan *protocol.Snapshot))
assert.NoError(b, err)

return r, func() {
Expand Down
2 changes: 1 addition & 1 deletion gossip/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ const DefaultBindPort int = 7946
// DefaultConfig contains the defaults for configurations.
func DefaultConfig() *Config {
return &Config{
BindAddr: "0.0.0.0",
BindAddr: "0.0.0.0:12345",
AdvertiseAddr: "",
LeaveOnTerm: true,
EnableCompression: false,
Expand Down
16 changes: 16 additions & 0 deletions gossip/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type Member struct {
Port uint16
Role NodeType
Status MemberStatus
Node *memberlist.Node
}

// MemberStatus is the state that a member is in.
Expand Down Expand Up @@ -307,6 +308,7 @@ func (n *Node) getMember(peer *memberlist.Node) *Member {
Addr: net.IP(peer.Addr),
Port: peer.Port,
Role: meta.Role,
Node: peer,
}
}

Expand Down Expand Up @@ -347,3 +349,17 @@ func (n *Node) decodeMetadata(buf []byte) (*NodeMeta, error) {
}
return meta, nil
}

func (n *Node) GetPeers(numNodes int, nodeType NodeType) []*Member {
fullList := n.topology.Get(nodeType)
if len(fullList) <= numNodes {
return fullList
}

var filteredList []*Member
for i := 0; i < numNodes; i++ {
filteredList = append(filteredList, fullList[i])
}

return filteredList
}
114 changes: 114 additions & 0 deletions gossip/sender/sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sender

import (
"bytes"
"fmt"
"time"

"github.com/bbva/qed/gossip"
"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"
"github.com/bbva/qed/sign"
"github.com/hashicorp/go-msgpack/codec"
)

type Config struct {
BatchSize uint
BatchInterval time.Duration
TTL int
}

func DefaultConfig() *Config {
return &Config{
100,
1 * time.Second,
2,
}
}

func Start(n *gossip.Node, ch chan *protocol.Snapshot) {
ticker := time.NewTicker(1 * time.Second)

for {
select {
case <-ticker.C:
msg, _ := encode(getBatch(ch))

peers := n.GetPeers(1, gossip.AuditorType)
peers = append(peers, n.GetPeers(1, gossip.MonitorType)...)
peers = append(peers, n.GetPeers(1, gossip.PublisherType)...)

for _, peer := range peers {
err := n.Memberlist().SendReliable(peer.Node, msg)
if err != nil {
log.Errorf("Failed send message: %v", err)
}
}
// TODO: Implement graceful shutdown.
}
}
}

func encode(msg protocol.BatchSnapshots) ([]byte, error) {
var buf bytes.Buffer
encoder := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
if err := encoder.Encode(msg); err != nil {
log.Errorf("Failed to encode message: %v", err)
return nil, err
}
return buf.Bytes(), nil
}

func getBatch(ch chan *protocol.Snapshot) protocol.BatchSnapshots {

var snapshot *protocol.Snapshot
var batch protocol.BatchSnapshots
var batchSize int = 100
var counter int = 0
batch.Snapshots = make([]*protocol.SignedSnapshot, 0)
batch.TTL = 3

for {
select {
case snapshot = <-ch:
counter++
default:
return batch
}

ss, err := doSign(sign.NewEd25519Signer(), snapshot)
if err != nil {
log.Errorf("Failed signing message: %v", err)
}
batch.Snapshots = append(batch.Snapshots, ss)

if counter == batchSize {
return batch
}

}

}

func doSign(signer sign.Signer, snapshot *protocol.Snapshot) (*protocol.SignedSnapshot, error) {

signature, err := signer.Sign([]byte(fmt.Sprintf("%v", snapshot)))
if err != nil {
fmt.Println("Publisher: error signing commitment")
return nil, err
}
return &protocol.SignedSnapshot{snapshot, signature}, nil
}
117 changes: 117 additions & 0 deletions protocol/protocol.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package protocol

import (
"github.com/bbva/qed/balloon"
"github.com/bbva/qed/balloon/history"
"github.com/bbva/qed/balloon/hyper"
"github.com/bbva/qed/balloon/visitor"
"github.com/bbva/qed/hashing"
"github.com/bbva/qed/util"
)

// Event is the public struct that Add handler function uses to
// parse the post params.
type Event struct {
Event []byte
}

// MembershipQuery is the public struct that apihttp.Membership
// Handler uses to parse the post params.
type MembershipQuery struct {
Key []byte
Version uint64
}

// Snapshot is the public struct that apihttp.Add Handler call returns.
type Snapshot struct {
HistoryDigest hashing.Digest
HyperDigest hashing.Digest
Version uint64
EventDigest hashing.Digest
}

type SignedSnapshot struct {
Snapshot *Snapshot
Signature []byte
}

type BatchSnapshots struct {
Snapshots []*SignedSnapshot
TTL int
}

type MembershipResult struct {
Exists bool
Hyper visitor.AuditPath
History visitor.AuditPath
CurrentVersion uint64
QueryVersion uint64
ActualVersion uint64
KeyDigest hashing.Digest
Key []byte
}

type IncrementalRequest struct {
Start uint64
End uint64
}

type IncrementalResponse struct {
Start uint64
End uint64
AuditPath visitor.AuditPath
}

// ToMembershipProof translates internal api balloon.MembershipProof to the
// public struct protocol.MembershipResult.
func ToMembershipResult(key []byte, mp *balloon.MembershipProof) *MembershipResult {
return &MembershipResult{
mp.Exists,
mp.HyperProof.AuditPath(),
mp.HistoryProof.AuditPath(),
mp.CurrentVersion,
mp.QueryVersion,
mp.ActualVersion,
mp.KeyDigest,
key,
}
}

// ToBaloonProof translate public protocol.MembershipResult:w to internal
// balloon.Proof.
func ToBalloonProof(id []byte, mr *MembershipResult, hasherF func() hashing.Hasher) *balloon.MembershipProof {

historyProof := history.NewMembershipProof(mr.ActualVersion, mr.QueryVersion, mr.History, hasherF())
hyperProof := hyper.NewQueryProof(mr.KeyDigest, util.Uint64AsBytes(mr.ActualVersion), mr.Hyper, hasherF())

return balloon.NewMembershipProof(mr.Exists, hyperProof, historyProof, mr.CurrentVersion, mr.ActualVersion, mr.QueryVersion, mr.KeyDigest, hasherF())

}

func ToIncrementalResponse(proof *balloon.IncrementalProof) *IncrementalResponse {
return &IncrementalResponse{
proof.Start,
proof.End,
proof.AuditPath,
}
}

func ToIncrementalProof(ir *IncrementalResponse, hasher hashing.Hasher) *balloon.IncrementalProof {
return balloon.NewIncrementalProof(ir.Start, ir.End, ir.AuditPath, hasher)
}
31 changes: 24 additions & 7 deletions raftwal/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"io"
"sync"

"github.com/bbva/qed/protocol"

"github.com/bbva/qed/balloon"
"github.com/bbva/qed/hashing"
"github.com/bbva/qed/log"
Expand All @@ -47,6 +49,8 @@ type BalloonFSM struct {
balloon *balloon.Balloon
state *fsmState

agentsQueue chan *protocol.Snapshot

restoreMu sync.RWMutex // Restore needs exclusive access to database.
}

Expand All @@ -65,9 +69,9 @@ func loadState(s storage.ManagedStore) (*fsmState, error) {
return &state, err
}

func NewBalloonFSM(store storage.ManagedStore, hasherF func() hashing.Hasher) (*BalloonFSM, error) {
func NewBalloonFSM(store storage.ManagedStore, hasherF func() hashing.Hasher, agentsQueue chan *protocol.Snapshot) (*BalloonFSM, error) {

balloon, err := balloon.NewBalloon(store, hasherF)
b, err := balloon.NewBalloon(store, hasherF)
if err != nil {
return nil, err
}
Expand All @@ -78,10 +82,11 @@ func NewBalloonFSM(store storage.ManagedStore, hasherF func() hashing.Hasher) (*
}

return &BalloonFSM{
hasherF: hasherF,
store: store,
balloon: balloon,
state: state,
hasherF: hasherF,
store: store,
balloon: b,
state: state,
agentsQueue: agentsQueue,
}, nil
}

Expand Down Expand Up @@ -181,8 +186,20 @@ func (fsm *BalloonFSM) applyAdd(event []byte, state *fsmState) *fsmAddResponse {
}

mutations = append(mutations, storage.NewMutation(storage.FSMStatePrefix, []byte{0xab}, stateBuff.Bytes()))
fsm.store.Mutate(mutations)
err = fsm.store.Mutate(mutations)
if err != nil {
return &fsmAddResponse{error: err}
}
fsm.state = state

//Send snapshot to gossip agents
fsm.agentsQueue <- &protocol.Snapshot{
HistoryDigest: commitment.HistoryDigest,
HyperDigest: commitment.HyperDigest,
Version: commitment.Version,
EventDigest: event,
}

return &fsmAddResponse{commitment: commitment}
}

Expand Down
Loading

0 comments on commit fc2735b

Please sign in to comment.