Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

various fixes #490

Merged
merged 28 commits into from
Jan 5, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e3223a0
dockertest: logging niceness
jbenet Jan 3, 2015
3b63503
dht: debug dont cast Key as peer.ID
jbenet Jan 3, 2015
17ce192
dht: some provider debug logging
jbenet Jan 3, 2015
7629ad7
bitswap: add self peer.ID
jbenet Jan 3, 2015
5639042
bitswap: send wantlist code reuse + debug logs
jbenet Jan 3, 2015
6236258
prefix logger
jbenet Jan 3, 2015
c100390
bitswap engine: signal in own func
jbenet Jan 3, 2015
d357b0a
bitswap debug logging
jbenet Jan 3, 2015
71ada46
bitswap net: always close
jbenet Jan 3, 2015
bb8886f
merkledag: LONG timeout on Get
jbenet Jan 3, 2015
9c6228d
bitswap and dht: lots of debugging logs
jbenet Jan 3, 2015
64cb32d
disable utp
jbenet Jan 3, 2015
05d8c80
fix(bitswap/network): return when context is done
Jan 3, 2015
a8127a2
fix: force clean test results directory
Jan 4, 2015
b599e28
core/mock: use mock routing
jbenet Jan 5, 2015
b4be7c5
dht: extend duration of TestGetFailures
jbenet Jan 5, 2015
ce367ee
ext_test: bitten by mocknet ordering
jbenet Jan 5, 2015
9d0736b
peer+mocknet: sorting for determinism.
jbenet Jan 5, 2015
032b35b
dht: key without record validator func
jbenet Jan 5, 2015
f6def11
`make test` now runs expensive tests.
jbenet Jan 4, 2015
aeb2e07
bitswap: remove DialPeer from interface
jbenet Jan 4, 2015
10072c1
bitswap: log superfluous messages
jbenet Jan 4, 2015
09a2e1f
testutil: obvious names for seeded key pairs
jbenet Jan 4, 2015
f25dfb6
peer/queue: close fix, and logging
jbenet Jan 5, 2015
1728017
dht: even more logging.
jbenet Jan 5, 2015
41af4f4
dht test skips
jbenet Jan 5, 2015
1ab9588
p2p/test: bogus key pair for faster tests
jbenet Jan 5, 2015
919e9be
ipns test: is fast republish a timing prob?
jbenet Jan 5, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,25 @@ vendor: godep
install:
cd cmd/ipfs && go install

test: test_go test_sharness
##############################################################
# tests targets

test: test_expensive

test_short: test_go_short test_sharness_short

test_expensive: test_go_expensive test_sharness_expensive

test_docker:
cd dockertest/ && make

test_go:
test_go_short:
go test -test.short ./...

test_go_expensive:
go test ./...

test_sharness:
test_sharness_short:
cd test/ && make

test_sharness_expensive:
Expand Down
2 changes: 1 addition & 1 deletion blockservice/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// Mocks returns |n| connected mock Blockservices
func Mocks(t *testing.T, n int) []*BlockService {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
sg := bitswap.NewSessionGenerator(net)
sg := bitswap.NewTestSessionGenerator(net)

instances := sg.Instances(n)

Expand Down
2 changes: 1 addition & 1 deletion cmd/ipfs/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func initConfig(configFilename string, dspathOverride string, nBitsForKeypair in
Addresses: config.Addresses{
Swarm: []string{
"/ip4/0.0.0.0/tcp/4001",
"/ip4/0.0.0.0/udp/4002/utp",
// "/ip4/0.0.0.0/udp/4002/utp", // disabled for now.
},
API: "/ip4/127.0.0.1/tcp/5001",
},
Expand Down
24 changes: 9 additions & 15 deletions core/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ import (
"github.com/jbenet/go-ipfs/exchange/offline"
mdag "github.com/jbenet/go-ipfs/merkledag"
nsys "github.com/jbenet/go-ipfs/namesys"
ci "github.com/jbenet/go-ipfs/p2p/crypto"
mocknet "github.com/jbenet/go-ipfs/p2p/net/mock"
peer "github.com/jbenet/go-ipfs/p2p/peer"
path "github.com/jbenet/go-ipfs/path"
dht "github.com/jbenet/go-ipfs/routing/dht"
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
ds2 "github.com/jbenet/go-ipfs/util/datastore2"
testutil "github.com/jbenet/go-ipfs/util/testutil"
)
Expand All @@ -29,23 +28,19 @@ func NewMockNode() (*IpfsNode, error) {
nd := new(IpfsNode)

// Generate Identity
sk, pk, err := ci.GenerateKeyPair(ci.RSA, 1024)
if err != nil {
return nil, err
}

p, err := peer.IDFromPublicKey(pk)
ident, err := testutil.RandIdentity()
if err != nil {
return nil, err
}

p := ident.ID()
nd.Identity = p
nd.PrivateKey = sk
nd.PrivateKey = ident.PrivateKey()
nd.Peerstore = peer.NewPeerstore()
nd.Peerstore.AddPrivKey(p, sk)
nd.Peerstore.AddPubKey(p, pk)
nd.Peerstore.AddPrivKey(p, ident.PrivateKey())
nd.Peerstore.AddPubKey(p, ident.PublicKey())

nd.PeerHost, err = mocknet.New(ctx).AddPeer(sk, testutil.RandLocalTCPAddress()) // effectively offline
nd.PeerHost, err = mocknet.New(ctx).AddPeer(ident.PrivateKey(), ident.Address()) // effectively offline
if err != nil {
return nil, err
}
Expand All @@ -55,8 +50,7 @@ func NewMockNode() (*IpfsNode, error) {
nd.Datastore = ds2.CloserWrap(syncds.MutexWrap(dstore))

// Routing
dht := dht.NewDHT(ctx, nd.PeerHost, nd.Datastore)
nd.Routing = dht
nd.Routing = mockrouting.NewServer().Client(ident)

// Bitswap
bstore := blockstore.NewBlockstore(nd.Datastore)
Expand All @@ -68,7 +62,7 @@ func NewMockNode() (*IpfsNode, error) {
nd.DAG = mdag.NewDAGService(bserv)

// Namespace resolver
nd.Namesys = nsys.NewNameSystem(dht)
nd.Namesys = nsys.NewNameSystem(nd.Routing)

// Path resolver
nd.Resolver = &path.Resolver{DAG: nd.DAG}
Expand Down
3 changes: 1 addition & 2 deletions dockertest/bootstrap/config
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
},
"Addresses": {
"Swarm": [
"/ip4/0.0.0.0/tcp/4011",
"/ip4/0.0.0.0/udp/4012/utp"
"/ip4/0.0.0.0/tcp/4011"
],
"API": "/ip4/127.0.0.1/tcp/5001"
},
Expand Down
3 changes: 1 addition & 2 deletions dockertest/client/config
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
"Addresses": {
"API": "/ip4/127.0.0.1/tcp/5001",
"Swarm": [
"/ip4/0.0.0.0/tcp/4031",
"/ip4/0.0.0.0/udp/4032/utp"
"/ip4/0.0.0.0/tcp/4031"
]
},
"Bootstrap": [
Expand Down
12 changes: 7 additions & 5 deletions dockertest/client/run.sh
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
ipfs bootstrap add /ip4/$BOOTSTRAP_PORT_4011_TCP_ADDR/tcp/$BOOTSTRAP_PORT_4011_TCP_PORT/QmNXuBh8HFsWq68Fid8dMbGNQTh7eG6hV9rr1fQyfmfomE


echo "dockertest> starting client daemon"
ipfs daemon &
sleep 3

while [ ! -f /data/idtiny ]
do
echo waiting for server to add the file...
echo "dockertest> waiting for server to add the file..."
sleep 1
done
echo client found file with hash: $(cat /data/idtiny)
echo "dockertest> client found file with hash:" $(cat /data/idtiny)

ipfs cat $(cat /data/idtiny) > filetiny

Expand All @@ -23,10 +25,10 @@ fi

while [ ! -f /data/idrand ]
do
echo waiting for server to add the file...
echo "dockertest> waiting for server to add the file..."
sleep 1
done
echo client found file with hash: $(cat /data/idrand)
echo "dockertest> client found file with hash:" $(cat /data/idrand)

cat /data/idrand

Expand All @@ -44,4 +46,4 @@ if (($? > 0)); then
exit 1
fi

echo "success"
echo "dockertest> success"
3 changes: 1 addition & 2 deletions dockertest/server/config
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
"Addresses": {
"API": "/ip4/127.0.0.1/tcp/5001",
"Swarm": [
"/ip4/0.0.0.0/tcp/4021",
"/ip4/0.0.0.0/udp/4022/utp"
"/ip4/0.0.0.0/tcp/4021"
]
},
"Bootstrap": [
Expand Down
5 changes: 3 additions & 2 deletions dockertest/server/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@ ipfs bootstrap add /ip4/$BOOTSTRAP_PORT_4011_TCP_ADDR/tcp/$BOOTSTRAP_PORT_4011_T

# wait for daemon to start/bootstrap
# alternatively use ipfs swarm connect
echo "dockertest> starting server daemon"
ipfs daemon &
sleep 3
# TODO instead of bootrapping: ipfs swarm connect /ip4/$BOOTSTRAP_PORT_4011_TCP_ADDR/tcp/$BOOTSTRAP_PORT_4011_TCP_PORT/QmNXuBh8HFsWq68Fid8dMbGNQTh7eG6hV9rr1fQyfmfomE

# must mount this volume from data container
ipfs add -q /data/filetiny > tmptiny
mv tmptiny /data/idtiny
echo added tiny file. hash is $(cat /data/idtiny)
echo "dockertest> added tiny file. hash is" $(cat /data/idtiny)

ipfs add -q /data/filerand > tmprand
mv tmprand /data/idrand
echo added rand file. hash is $(cat /data/idrand)
echo "dockertest> added rand file. hash is" $(cat /data/idrand)

# allow ample time for the client to pull the data
sleep 10000000
100 changes: 72 additions & 28 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
}()

bs := &bitswap{
self: p,
blockstore: bstore,
cancelFunc: cancelFunc,
notifications: notif,
Expand All @@ -79,6 +80,9 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
// bitswap instances implement the bitswap protocol.
type bitswap struct {

// the ID of the peer to act on behalf of
self peer.ID

// network delivers messages on behalf of the session
network bsnet.BitSwapNetwork

Expand All @@ -104,6 +108,7 @@ type bitswap struct {
// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) {
log := log.Prefix("bitswap(%s).GetBlock(%s)", bs.self, k)

// Any async work initiated by this function must end when this function
// returns. To ensure this, derive a new context. Note that it is okay to
Expand All @@ -116,10 +121,12 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err

ctx = eventlog.ContextWithLoggable(ctx, eventlog.Uuid("GetBlockRequest"))
log.Event(ctx, "GetBlockRequestBegin", &k)
log.Debugf("GetBlockRequestBegin")

defer func() {
cancelFunc()
log.Event(ctx, "GetBlockRequestEnd", &k)
log.Debugf("GetBlockRequestEnd")
}()

promise, err := bs.GetBlocks(ctx, []u.Key{k})
Expand Down Expand Up @@ -166,67 +173,109 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
return bs.network.Provide(ctx, blk.Key())
}

func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.ID) error {
func (bs *bitswap) sendWantlistMsgToPeer(ctx context.Context, m bsmsg.BitSwapMessage, p peer.ID) error {
log := log.Prefix("bitswap(%s).bitswap.sendWantlistMsgToPeer(%d, %s)", bs.self, len(m.Wantlist()), p)

log.Debug("sending wantlist")
if err := bs.send(ctx, p, m); err != nil {
log.Errorf("send wantlist error: %s", err)
return err
}
log.Debugf("send wantlist success")
return nil
}

func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error {
if peers == nil {
panic("Cant send wantlist to nil peerchan")
}
message := bsmsg.New()
for _, wanted := range bs.wantlist.Entries() {
message.AddEntry(wanted.Key, wanted.Priority)
}

log := log.Prefix("bitswap(%s).sendWantlistMsgToPeers(%d)", bs.self, len(m.Wantlist()))
log.Debugf("begin")
defer log.Debugf("end")

set := pset.New()
wg := sync.WaitGroup{}
for peerToQuery := range peers {
log.Event(ctx, "PeerToQuery", peerToQuery)

if !set.TryAdd(peerToQuery) { //Do once per peer
log.Debugf("%s skipped (already sent)", peerToQuery)
continue
}
log.Debugf("%s sending", peerToQuery)

wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
if err := bs.send(ctx, p, message); err != nil {
log.Error(err)
return
}
bs.sendWantlistMsgToPeer(ctx, m, p)
}(peerToQuery)
}
wg.Wait()
return nil
}

func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wantlist.ThreadSafe) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

func (bs *bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID) error {
message := bsmsg.New()
message.SetFull(true)
for _, e := range bs.wantlist.Entries() {
message.AddEntry(e.Key, e.Priority)
for _, wanted := range bs.wantlist.Entries() {
message.AddEntry(wanted.Key, wanted.Priority)
}
return bs.sendWantlistMsgToPeers(ctx, message, peers)
}

set := pset.New()
func (bs *bitswap) sendWantlistToProviders(ctx context.Context) {
log := log.Prefix("bitswap(%s).sendWantlistToProviders ", bs.self)
log.Debugf("begin")
defer log.Debugf("end")

ctx, cancel := context.WithCancel(ctx)
defer cancel()

// prepare a channel to hand off to sendWantlistToPeers
sendToPeers := make(chan peer.ID)

// Get providers for all entries in wantlist (could take a while)
wg := sync.WaitGroup{}
for _, e := range wantlist.Entries() {
for _, e := range bs.wantlist.Entries() {
wg.Add(1)
go func(k u.Key) {
defer wg.Done()

log := log.Prefix("(entry: %s) ", k)
log.Debug("asking dht for providers")

child, _ := context.WithTimeout(ctx, providerRequestTimeout)
providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest)
for prov := range providers {
if set.TryAdd(prov) { //Do once per peer
bs.send(ctx, prov, message)
}
log.Debugf("dht returned provider %s. send wantlist", prov)
sendToPeers <- prov
}
}(e.Key)
}
wg.Wait()

go func() {
wg.Wait() // make sure all our children do finish.
close(sendToPeers)
}()

err := bs.sendWantlistToPeers(ctx, sendToPeers)
if err != nil {
log.Errorf("sendWantlistToPeers error: %s", err)
}
}

func (bs *bitswap) taskWorker(ctx context.Context) {
log := log.Prefix("bitswap(%s).taskWorker", bs.self)
for {
select {
case <-ctx.Done():
log.Debugf("exiting")
return
case envelope := <-bs.engine.Outbox():
log.Debugf("message to %s sending...", envelope.Peer)
bs.send(ctx, envelope.Peer, envelope.Message)
log.Debugf("message to %s sent", envelope.Peer)
}
}
}
Expand All @@ -243,7 +292,7 @@ func (bs *bitswap) clientWorker(parent context.Context) {
select {
case <-broadcastSignal:
// Resend unfulfilled wantlist keys
bs.sendWantlistToProviders(ctx, bs.wantlist)
bs.sendWantlistToProviders(ctx)
broadcastSignal = time.After(rebroadcastDelay.Get())
case ks := <-bs.batchRequests:
if len(ks) == 0 {
Expand All @@ -262,7 +311,7 @@ func (bs *bitswap) clientWorker(parent context.Context) {
// newer bitswap strategies.
child, _ := context.WithTimeout(ctx, providerRequestTimeout)
providers := bs.network.FindProvidersAsync(child, ks[0], maxProvidersPerRequest)
err := bs.sendWantListTo(ctx, providers)
err := bs.sendWantlistToPeers(ctx, providers)
if err != nil {
log.Errorf("error sending wantlist: %s", err)
}
Expand Down Expand Up @@ -336,11 +385,6 @@ func (bs *bitswap) ReceiveError(err error) {
// send strives to ensure that accounting is always performed when a message is
// sent
func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) error {
log.Event(ctx, "DialPeer", p)
err := bs.network.DialPeer(ctx, p)
if err != nil {
return errors.Wrap(err)
}
if err := bs.network.SendMessage(ctx, p, m); err != nil {
return errors.Wrap(err)
}
Expand Down
Loading