Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm: snapshot load improvement #18220

Merged
merged 8 commits into from
Dec 7, 2018
90 changes: 89 additions & 1 deletion p2p/simulations/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -707,6 +709,7 @@ func (net *Network) snapshot(addServices []string, removeServices []string) (*Sn

// Load loads a network snapshot
func (net *Network) Load(snap *Snapshot) error {
// Start nodes.
for _, n := range snap.Nodes {
if _, err := net.NewNodeWithConfig(n.Node.Config); err != nil {
return err
Expand All @@ -718,17 +721,102 @@ func (net *Network) Load(snap *Snapshot) error {
return err
}
}

// Prepare connection events counter.
allConnected := make(chan struct{}) // closed when all connections are established
eventLoopStarted := make(chan struct{}) // ensures that event loop is started before it is closed
done := make(chan struct{}) // ensures that the event loop goroutine is terminated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the impression we named these channels with postfix C. I found this contributes to clarity. Did we stop doing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not know that suffix C is a hard requirement. This variables are local and very close in this function, so I saw no need to suffix them. Do you insist on adding the C?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't insist, but I just always do :)

defer close(done)

go func() {
// Subscribe to event channel.
events := make(chan *Event)
sub := net.Events().Subscribe(events)
defer sub.Unsubscribe()

// Expected number of connections.
total := len(snap.Conns)
// Set of all established connections from the snapshot, not other connections.
// Key array element 0 is the connection One field value, and element 1 connection Other field.
connections := make(map[[2]enode.ID]struct{}, total)

// once is a closed channel that is read in the event loop below
// only once.
// It ensures that eventLoopStarted is closed which signals that
// it is safe to call connect method on the network without the
// possibility to miss a few first connection events.
once := make(chan struct{})
// Close once channel so that it can be read from in the event loop.
close(once)

for {
select {
case e := <-events:
// Detect only connection events.
if e.Type != EventTypeConn {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not disregard the Control events aswell?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, but could you elaborate what would we protect agains?

Copy link
Contributor

@nolash nolash Dec 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've experienced that for connections in p2p layer there are/can be two conn events being sent, the first of which is a Control event and will be fired even though the node object at that time will have Conn.Up = false for that connection in its state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I see, I forgot about this Up = false events. Thank you, that is a great suggestion. I've updated the code.

continue
}
connection := [2]enode.ID{e.Conn.One, e.Conn.Other}
// Nodes are still not connected or have been disconnected.
if !e.Conn.Up {
// Delete the connection from the set of established connections.
// This will prevent false positive in case disconnections happen.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really correct? If disconnects happen,mustn't that mean we are not getting the snapshot we are expecting, since all connections that are not Up were pruned already from the snapshot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not experience any disconnections during snapshot loading, but please have a look at this conversation #18220 (comment).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be also nice to have a log.Warn here. this shouldn't be happening (I guess)

delete(connections, connection)
continue
}
// Check that the connection is from the snapshot.
for _, conn := range snap.Conns {
zelig marked this conversation as resolved.
Show resolved Hide resolved
if conn.One == e.Conn.One && conn.Other == e.Conn.Other {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we entirely sure that this can't be the other way around when it comes back as events?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am pretty much sure. But please check the events code to verify it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nolash i was thinking this too.
you could probably pull out the address comparison logic from p2p/simulations/network.go#ConnLabel to another function and use that before doing the comparison.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@justelad interesting Idea about ConnLabel, but I think that the source/destination (One/Other) are intentionally called like that to specify the node which initiated the connection and the node which the connection was made to. Through the codebase that is an important distinction, which is also stated in p2p/simulations.Conn. Snapshots also keep this distinction in the json.

@nolash @justelad could you provide more details where this distinction breaks in the codebase?

Copy link
Contributor

@nolash nolash Dec 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if it breaks. I asked. And you asked me to check. I haven't gotten around to checking yet, sorry! :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can not find the breaking part and all snapshots from the stream package are loadable with exact checks on One/Other connection fields. It would be good to see if my assumption is correct. I did check and it would be good for someone else to check too, if there is a doubt about this check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've looked through and I agree, the order of the conns will not change.

// Add the connection to the set of established connections.
connections[connection] = struct{}{}
if len(connections) == total {
// Signal that all nodes are connected.
close(allConnected)
return
}

break
}
}
case <-once:
// Set once to nil as nil channel never blocks forever.
// This ensures that this for loop never gets into this part
// of the code again.
once = nil
// Proceed with connecting the nodes, as we are ready to
// detect events.
close(eventLoopStarted)
case <-done:
// Load function returned, terminate this goroutine.
return
}
}
}()

// Do not proceed until the goroutine with the event loop actually is ready
// to receive events.
<-eventLoopStarted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't the subscription start actually guarantee this already? When the subscription is executed, the channel queue will make sure all events come through (even block if not handled), no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it does not. Scheduling of two goroutines is not deterministic. Even subscription is done in a different goroutine. I thought that I covered explanations in the comments. Is there anything that you think that it needs more detailed elaboration in the code?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comments were clear, but I was under the assumption that subscribe would register the channel before returning, and thus generate a block on unread events. I wouldn't be surprised if I'm not the only one in our team with that assumption 🗡️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I pushed an alternative approach where the subscription is done in the main goroutine with unbuffered channel. This should ensure the required synchronization and it should be easier to read. I added a comment as events channel and subscription object are not used in the main goroutine.

Copy link
Contributor

@nolash nolash Dec 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh .. so the assumption was correct? I think I misunderstood what you wrote; when you said subscribe is in goroutine you meant your goroutine? 💃

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, correct, the goroutine created in the Load method. I think that the updated code is simpler and easier to read, with the risk that someone will move the subscription to the (child) goroutine as it is only used in it, not in the main/parent goroutine. Because of this, I added a few words in the comment.


// Start connecting.
for _, conn := range snap.Conns {

if !net.GetNode(conn.One).Up || !net.GetNode(conn.Other).Up {
//in this case, at least one of the nodes of a connection is not up,
//so it would result in the snapshot `Load` to fail
continue
}
if err := net.Connect(conn.One, conn.Other); err != nil {
if err := net.Connect(conn.One, conn.Other); err != nil && !strings.Contains(err.Error(), "already connected") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should "already connected" even be possible here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible if discovery is set to true, which is not disabled in snapshot load function, but in the provided service. This change is protecting agains such scenario, which happened quite frequently when discovery is set to true or the fix NotifyPeer is not applied. I would keep ignoring this error as it may result in flaky tests if discovery is set to true.

Copy link
Contributor

@nolash nolash Dec 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this in the end a bit of a catch 22 scenario? We want to make sure that the network is exactly as we had it in the snapshot, but if we boot with discovery this suggests it will never be. We should never pretend that it should. Maybe this warrants a bit more discussion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it needs more discussion. My reason for this approach is that the flakiness produced with exposing this error is not contributing to what is the core reason for already connected error. By reporting already connected error, we are requiring that discovery is set to false before the snapshot is loaded, and I am not sure if it is safe and correct to set it to true in the runtime, after the snapshot is loaded, if it needs to be enabled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @nolash. Why should we allow snapshot simulations to have discovery enabled in the first place?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, I will revert this change as discovery is a not needed.

return err
}
}

select {
// Wait until all connections from the snapshot are established.
case <-allConnected:
// Make sure that we do not wait forever.
case <-time.After(120 * time.Second):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not so fond of these literals.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A dedicated variable is added. I am not fond of introducing indirections just to avoid literals, but this one is very close.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<3

return errors.New("snapshot connections not established")
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion swarm/network/hive.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ func (h *Hive) Run(p *BzzPeer) error {
// otherwise just send depth to new peer
dp.NotifyDepth(depth)
}
NotifyPeer(p.BzzAddr, h.Kademlia)
Copy link
Contributor

@nolash nolash Dec 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please explain the effect of this change? never mind, I read the description last

}
NotifyPeer(p.BzzAddr, h.Kademlia)
defer h.Off(dp)
return dp.Run(dp.HandleMsg)
}
Expand Down
1 change: 0 additions & 1 deletion swarm/network/simulation/kademlia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func TestWaitTillHealthy(t *testing.T) {
"bzz": func(ctx *adapters.ServiceContext, b *sync.Map) (node.Service, func(), error) {
addr := network.NewAddr(ctx.Config.Node())
hp := network.NewHiveParams()
hp.Discovery = false
config := &network.BzzConfig{
OverlayAddr: addr.Over(),
UnderlayAddr: addr.Under(),
Expand Down