Skip to content

Commit

Permalink
First draft of Whisper messages relaying
Browse files Browse the repository at this point in the history
  • Loading branch information
obscuren committed Dec 8, 2014
1 parent f06543f commit ebe2d9d
Show file tree
Hide file tree
Showing 7 changed files with 472 additions and 0 deletions.
96 changes: 96 additions & 0 deletions whisper/envelope.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package whisper

import (
"bytes"
"encoding/binary"
"io"
"time"

"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethutil"
"github.com/ethereum/go-ethereum/rlp"
)

const (
DefaultTtl = 50 * time.Second
)

type Envelope struct {
Expiry int32 // Whisper protocol specifies int32, really should be int64

This comment has been minimized.

Copy link
@fjl

fjl Dec 8, 2014

Contributor

I'm thinking about removing support for signed integers in rlp. These would then need to become uint32. Any objections?

This comment has been minimized.

Copy link
@obscuren

obscuren Dec 8, 2014

Author Contributor

None

Ttl int32 // ^^^^^^
Topics [][]byte
Data []byte
Nonce uint32

hash Hash
}

func NewEnvelopeFromReader(reader io.Reader) (*Envelope, error) {
var envelope Envelope

buf := new(bytes.Buffer)
buf.ReadFrom(reader)

h := H(crypto.Sha3(buf.Bytes()))
if err := rlp.Decode(buf, &envelope); err != nil {
return nil, err
}

envelope.hash = h

return &envelope, nil
}

func (self *Envelope) Hash() Hash {
if self.hash == EmptyHash {
self.hash = H(crypto.Sha3(ethutil.Encode(self)))
}

return self.hash
}

func NewEnvelope(ttl time.Duration, topics [][]byte, data *Message) *Envelope {
exp := time.Now().Add(ttl)

return &Envelope{int32(exp.Unix()), int32(ttl.Seconds()), topics, data.Bytes(), 0, Hash{}}
}

func (self *Envelope) Seal() {
self.proveWork(DefaultTtl)
}

func (self *Envelope) proveWork(dura time.Duration) {
var bestBit int
d := make([]byte, 64)
copy(d[:32], ethutil.Encode(self.withoutNonce()))

then := time.Now().Add(dura).UnixNano()
for n := uint32(0); time.Now().UnixNano() < then; {
for i := 0; i < 1024; i++ {
binary.BigEndian.PutUint32(d[60:], n)

fbs := ethutil.FirstBitSet(ethutil.BigD(crypto.Sha3(d)))
if fbs > bestBit {
bestBit = fbs
self.Nonce = n
}

n++
}
}
}

func (self *Envelope) valid() bool {
d := make([]byte, 64)
copy(d[:32], ethutil.Encode(self.withoutNonce()))
binary.BigEndian.PutUint32(d[60:], self.Nonce)
return ethutil.FirstBitSet(ethutil.BigD(crypto.Sha3(d))) > 0
}

func (self *Envelope) withoutNonce() interface{} {
return []interface{}{self.Expiry, self.Ttl, ethutil.ByteSliceToInterface(self.Topics), self.Data}
}

func (self *Envelope) RlpData() interface{} {
return []interface{}{self.Expiry, self.Ttl, ethutil.ByteSliceToInterface(self.Topics), self.Data, self.Nonce}
}
46 changes: 46 additions & 0 deletions whisper/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// +build none

package main

import (
"fmt"
"log"
"net"
"os"

"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/whisper"
"github.com/obscuren/secp256k1-go"
)

func main() {
logger.AddLogSystem(logger.NewStdLogSystem(os.Stdout, log.LstdFlags, logger.InfoLevel))

pub, sec := secp256k1.GenerateKeyPair()

whisper := whisper.New(pub, sec)

srv := p2p.Server{
MaxPeers: 10,
Identity: p2p.NewSimpleClientIdentity("whisper-go", "1.0", "", string(pub)),
ListenAddr: ":30303",
NAT: p2p.UPNP(),

Protocols: []p2p.Protocol{whisper.Protocol()},
}
if err := srv.Start(); err != nil {
fmt.Println("could not start server:", err)
os.Exit(1)
}

// add seed peers
seed, err := net.ResolveTCPAddr("tcp", "poc-7.ethdev.com:30300")
if err != nil {
fmt.Println("couldn't resolve:", err)
os.Exit(1)
}
srv.SuggestPeer(seed.IP, seed.Port, nil)

select {}
}
15 changes: 15 additions & 0 deletions whisper/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package whisper

type Message struct {
Flags byte
Signature []byte
Payload []byte
}

func NewMessage(payload []byte) *Message {
return &Message{Flags: 0, Payload: payload}
}

func (self *Message) Bytes() []byte {
return append([]byte{self.Flags}, append(self.Signature, self.Payload...)...)
}
114 changes: 114 additions & 0 deletions whisper/peer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package whisper

import (
"fmt"
"io/ioutil"
"time"

"github.com/ethereum/go-ethereum/p2p"
"gopkg.in/fatih/set.v0"
)

const (
protocolVersion = 0x02
)

type peer struct {
host *Whisper
peer *p2p.Peer
ws p2p.MsgReadWriter

// XXX Eventually this is going to reach exceptional large space. We need an expiry here
known *set.Set

quit chan struct{}
}

func NewPeer(host *Whisper, p *p2p.Peer, ws p2p.MsgReadWriter) *peer {
return &peer{host, p, ws, set.New(), make(chan struct{})}
}

func (self *peer) init() error {
if err := self.handleStatus(); err != nil {
return err
}

return nil
}

func (self *peer) start() {
go self.update()
}

func (self *peer) update() {
relay := time.NewTicker(300 * time.Millisecond)
out:
for {
select {
case <-relay.C:
err := self.broadcast(self.host.envelopes())
if err != nil {
self.peer.Infoln(err)
break out
}

case <-self.quit:
break out
}
}
}

func (self *peer) broadcast(envelopes []*Envelope) error {
envs := make([]interface{}, len(envelopes))
i := 0
for _, envelope := range envelopes {
if !self.known.Has(envelope.Hash()) {
envs[i] = envelope
self.known.Add(envelope.Hash())
i++
}
}

msg := p2p.NewMsg(envelopesMsg, envs[:i]...)
if err := self.ws.WriteMsg(msg); err != nil {
return err
}

return nil
}

func (self *peer) handleStatus() error {
ws := self.ws

if err := ws.WriteMsg(self.statusMsg()); err != nil {
return err
}

msg, err := ws.ReadMsg()
if err != nil {
return err
}

if msg.Code != statusMsg {
return fmt.Errorf("peer send %x before status msg", msg.Code)
}

data, err := ioutil.ReadAll(msg.Payload)
if err != nil {
return err
}

if len(data) == 0 {
return fmt.Errorf("malformed status. data len = 0")
}

if pv := data[0]; pv != protocolVersion {
return fmt.Errorf("protocol version mismatch %d != %d", pv, protocolVersion)
}

return nil
}

func (self *peer) statusMsg() p2p.Msg {
return p2p.NewMsg(statusMsg, protocolVersion)
}
25 changes: 25 additions & 0 deletions whisper/sort.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package whisper

import "sort"

type sortedKeys struct {
k []int32
}

func (self *sortedKeys) Len() int { return len(self.k) }
func (self *sortedKeys) Less(i, j int) bool { return self.k[i] < self.k[j] }
func (self *sortedKeys) Swap(i, j int) { self.k[i], self.k[j] = self.k[j], self.k[i] }

func sortKeys(m map[int32]Hash) []int32 {
sorted := new(sortedKeys)
sorted.k = make([]int32, len(m))
i := 0
for key, _ := range m {
sorted.k[i] = key
i++
}

sort.Sort(sorted)

return sorted.k
}
19 changes: 19 additions & 0 deletions whisper/sort_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package whisper

import "testing"

func TestSorting(t *testing.T) {
m := map[int32]Hash{
1: HS("1"),
3: HS("3"),
2: HS("2"),
5: HS("5"),
}
exp := []int32{1, 2, 3, 5}
res := sortKeys(m)
for i, k := range res {
if k != exp[i] {
t.Error(k, "failed. Expected", exp[i])
}
}
}
Loading

0 comments on commit ebe2d9d

Please sign in to comment.