Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm: snapshot load improvement #18220

Merged
merged 8 commits into from
Dec 7, 2018
75 changes: 75 additions & 0 deletions p2p/simulations/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -705,8 +706,11 @@ func (net *Network) snapshot(addServices []string, removeServices []string) (*Sn
return snap, nil
}

var snapshotLoadTimeout = 120 * time.Second

// Load loads a network snapshot
func (net *Network) Load(snap *Snapshot) error {
// Start nodes.
for _, n := range snap.Nodes {
if _, err := net.NewNodeWithConfig(n.Node.Config); err != nil {
return err
Expand All @@ -718,6 +722,69 @@ func (net *Network) Load(snap *Snapshot) error {
return err
}
}

// Prepare connection events counter.
allConnected := make(chan struct{}) // closed when all connections are established
done := make(chan struct{}) // ensures that the event loop goroutine is terminated
defer close(done)

// Subscribe to event channel.
// It needs to be done outside of the event loop goroutine (created below)
// to ensure that the event channel is blocking before connect calls are made.
events := make(chan *Event)
sub := net.Events().Subscribe(events)
defer sub.Unsubscribe()

go func() {
// Expected number of connections.
total := len(snap.Conns)
// Set of all established connections from the snapshot, not other connections.
// Key array element 0 is the connection One field value, and element 1 connection Other field.
connections := make(map[[2]enode.ID]struct{}, total)

for {
select {
case e := <-events:
// Ignore control events as they do not represent
// connect or disconnect (Up) state change.
if e.Control {
continue
}
// Detect only connection events.
if e.Type != EventTypeConn {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not disregard the Control events aswell?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, but could you elaborate what would we protect agains?

Copy link
Contributor

@nolash nolash Dec 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've experienced that for connections in p2p layer there are/can be two conn events being sent, the first of which is a Control event and will be fired even though the node object at that time will have Conn.Up = false for that connection in its state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I see, I forgot about this Up = false events. Thank you, that is a great suggestion. I've updated the code.

continue
}
connection := [2]enode.ID{e.Conn.One, e.Conn.Other}
// Nodes are still not connected or have been disconnected.
if !e.Conn.Up {
// Delete the connection from the set of established connections.
// This will prevent false positive in case disconnections happen.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really correct? If disconnects happen,mustn't that mean we are not getting the snapshot we are expecting, since all connections that are not Up were pruned already from the snapshot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not experience any disconnections during snapshot loading, but please have a look at this conversation #18220 (comment).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be also nice to have a log.Warn here. this shouldn't be happening (I guess)

delete(connections, connection)
log.Warn("load snapshot: unexpected disconnection", "one", e.Conn.One, "other", e.Conn.Other)
continue
}
// Check that the connection is from the snapshot.
for _, conn := range snap.Conns {
zelig marked this conversation as resolved.
Show resolved Hide resolved
if conn.One == e.Conn.One && conn.Other == e.Conn.Other {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we entirely sure that this can't be the other way around when it comes back as events?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am pretty much sure. But please check the events code to verify it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nolash i was thinking this too.
you could probably pull out the address comparison logic from p2p/simulations/network.go#ConnLabel to another function and use that before doing the comparison.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@justelad interesting Idea about ConnLabel, but I think that the source/destination (One/Other) are intentionally called like that to specify the node which initiated the connection and the node which the connection was made to. Through the codebase that is an important distinction, which is also stated in p2p/simulations.Conn. Snapshots also keep this distinction in the json.

@nolash @justelad could you provide more details where this distinction breaks in the codebase?

Copy link
Contributor

@nolash nolash Dec 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if it breaks. I asked. And you asked me to check. I haven't gotten around to checking yet, sorry! :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can not find the breaking part and all snapshots from the stream package are loadable with exact checks on One/Other connection fields. It would be good to see if my assumption is correct. I did check and it would be good for someone else to check too, if there is a doubt about this check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've looked through and I agree, the order of the conns will not change.

// Add the connection to the set of established connections.
connections[connection] = struct{}{}
if len(connections) == total {
// Signal that all nodes are connected.
close(allConnected)
return
}

break
}
}
case <-done:
// Load function returned, terminate this goroutine.
return
}
}
}()

// Start connecting.
for _, conn := range snap.Conns {

if !net.GetNode(conn.One).Up || !net.GetNode(conn.Other).Up {
Expand All @@ -729,6 +796,14 @@ func (net *Network) Load(snap *Snapshot) error {
return err
}
}

select {
// Wait until all connections from the snapshot are established.
case <-allConnected:
// Make sure that we do not wait forever.
case <-time.After(snapshotLoadTimeout):
return errors.New("snapshot connections not established")
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion swarm/network/hive.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ func (h *Hive) Run(p *BzzPeer) error {
// otherwise just send depth to new peer
dp.NotifyDepth(depth)
}
NotifyPeer(p.BzzAddr, h.Kademlia)
Copy link
Contributor

@nolash nolash Dec 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please explain the effect of this change? never mind, I read the description last

}
NotifyPeer(p.BzzAddr, h.Kademlia)
defer h.Off(dp)
return dp.Run(dp.HandleMsg)
}
Expand Down
1 change: 0 additions & 1 deletion swarm/network/simulation/kademlia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func TestWaitTillHealthy(t *testing.T) {
"bzz": func(ctx *adapters.ServiceContext, b *sync.Map) (node.Service, func(), error) {
addr := network.NewAddr(ctx.Config.Node())
hp := network.NewHiveParams()
hp.Discovery = false
config := &network.BzzConfig{
OverlayAddr: addr.Over(),
UnderlayAddr: addr.Under(),
Expand Down