-
Notifications
You must be signed in to change notification settings - Fork 20.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
swarm: snapshot load improvement #18220
Changes from 2 commits
6fbdcb2
49ca919
cad2a69
51ae795
f34119d
dd75dde
3f5f137
7d1ad71
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,9 @@ import ( | |
"bytes" | ||
"context" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
|
@@ -707,6 +709,7 @@ func (net *Network) snapshot(addServices []string, removeServices []string) (*Sn | |
|
||
// Load loads a network snapshot | ||
func (net *Network) Load(snap *Snapshot) error { | ||
// Start nodes. | ||
for _, n := range snap.Nodes { | ||
if _, err := net.NewNodeWithConfig(n.Node.Config); err != nil { | ||
return err | ||
|
@@ -718,17 +721,97 @@ func (net *Network) Load(snap *Snapshot) error { | |
return err | ||
} | ||
} | ||
|
||
// Prepare connection events counter. | ||
allConnected := make(chan struct{}) // closed when all connections are established | ||
eventLoopStarted := make(chan struct{}) // ensures that event loop is started before it is closed | ||
done := make(chan struct{}) // ensures that the event loop goroutine is terminated | ||
defer close(done) | ||
|
||
go func() { | ||
// Subscribe to event channel. | ||
events := make(chan *Event) | ||
sub := net.Events().Subscribe(events) | ||
defer sub.Unsubscribe() | ||
|
||
// Expected number of connections. | ||
total := len(snap.Conns) | ||
// counter tracks the current number of connections. | ||
var counter int | ||
|
||
// once is a closed channel that is read in the event loop below | ||
// only once. | ||
// It ensures that eventLoopStarted is closed which signals that | ||
// it is safe to call connect method on the network without the | ||
// possibility to miss a few first connection events. | ||
once := make(chan struct{}) | ||
// Close once channel so that it can be read from in the event loop. | ||
close(once) | ||
|
||
for { | ||
select { | ||
case e := <-events: | ||
// Detect only connection events. | ||
if e.Type != EventTypeConn { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we not disregard the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can, but could you elaborate what would we protect agains? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've experienced that for connections in p2p layer there are/can be two conn events being sent, the first of which is a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I see, I forgot about this |
||
continue | ||
} | ||
// Detect only "connect" events of all connection events. | ||
if !e.Conn.Up { | ||
continue | ||
} | ||
// Check that the connection is from the snapshot. | ||
for _, conn := range snap.Conns { | ||
zelig marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if conn.One == e.Conn.One && conn.Other == e.Conn.Other { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we entirely sure that this can't be the other way around when it comes back as events? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am pretty much sure. But please check the events code to verify it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @nolash i was thinking this too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @justelad interesting Idea about ConnLabel, but I think that the source/destination (One/Other) are intentionally called like that to specify the node which initiated the connection and the node which the connection was made to. Through the codebase that is an important distinction, which is also stated in p2p/simulations.Conn. Snapshots also keep this distinction in the json. @nolash @justelad could you provide more details where this distinction breaks in the codebase? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know if it breaks. I asked. And you asked me to check. I haven't gotten around to checking yet, sorry! :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can not find the breaking part and all snapshots from the stream package are loadable with exact checks on One/Other connection fields. It would be good to see if my assumption is correct. I did check and it would be good for someone else to check too, if there is a doubt about this check. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've looked through and I agree, the order of the conns will not change. |
||
counter++ | ||
|
||
if counter == total { | ||
// Signal that all nodes are connected. | ||
close(allConnected) | ||
return | ||
} | ||
|
||
break | ||
} | ||
} | ||
case <-once: | ||
// Set once to nil as nil channel never blocks forever. | ||
// This ensures that this for loop never gets into this part | ||
// of the code again. | ||
once = nil | ||
// Proceed with connecting the nodes, as we are ready to | ||
// detect events. | ||
close(eventLoopStarted) | ||
case <-done: | ||
// Load function returned, terminate this goroutine. | ||
return | ||
} | ||
} | ||
}() | ||
|
||
// Do not proceed until the goroutine with the event loop actually is ready | ||
// to receive events. | ||
<-eventLoopStarted | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't the subscription start actually guarantee this already? When the subscription is executed, the channel queue will make sure all events come through (even block if not handled), no? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, it does not. Scheduling of two goroutines is not deterministic. Even subscription is done in a different goroutine. I thought that I covered explanations in the comments. Is there anything that you think that it needs more detailed elaboration in the code? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comments were clear, but I was under the assumption that subscribe would register the channel before returning, and thus generate a block on unread events. I wouldn't be surprised if I'm not the only one in our team with that assumption 🗡️ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. I pushed an alternative approach where the subscription is done in the main goroutine with unbuffered channel. This should ensure the required synchronization and it should be easier to read. I added a comment as events channel and subscription object are not used in the main goroutine. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ahh .. so the assumption was correct? I think I misunderstood what you wrote; when you said subscribe is in goroutine you meant your goroutine? 💃 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, correct, the goroutine created in the Load method. I think that the updated code is simpler and easier to read, with the risk that someone will move the subscription to the (child) goroutine as it is only used in it, not in the main/parent goroutine. Because of this, I added a few words in the comment. |
||
|
||
// Start connecting. | ||
for _, conn := range snap.Conns { | ||
|
||
if !net.GetNode(conn.One).Up || !net.GetNode(conn.Other).Up { | ||
//in this case, at least one of the nodes of a connection is not up, | ||
//so it would result in the snapshot `Load` to fail | ||
continue | ||
} | ||
if err := net.Connect(conn.One, conn.Other); err != nil { | ||
if err := net.Connect(conn.One, conn.Other); err != nil && !strings.Contains(err.Error(), "already connected") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why should "already connected" even be possible here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is possible if discovery is set to true, which is not disabled in snapshot load function, but in the provided service. This change is protecting agains such scenario, which happened quite frequently when discovery is set to true or the fix NotifyPeer is not applied. I would keep ignoring this error as it may result in flaky tests if discovery is set to true. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this in the end a bit of a catch 22 scenario? We want to make sure that the network is exactly as we had it in the snapshot, but if we boot with discovery this suggests it will never be. We should never pretend that it should. Maybe this warrants a bit more discussion? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that it needs more discussion. My reason for this approach is that the flakiness produced with exposing this error is not contributing to what is the core reason for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with @nolash. Why should we allow snapshot simulations to have discovery enabled in the first place? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cool, I will revert this change as discovery is a not needed. |
||
return err | ||
} | ||
} | ||
|
||
select { | ||
// Wait until all connections from the snapshot are established. | ||
case <-allConnected: | ||
// Make sure that we do not wait forever. | ||
case <-time.After(120 * time.Second): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not so fond of these literals. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A dedicated variable is added. I am not fond of introducing indirections just to avoid literals, but this one is very close. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. <3 |
||
return errors.New("snapshot connections not established") | ||
} | ||
return nil | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -165,8 +165,8 @@ func (h *Hive) Run(p *BzzPeer) error { | |
// otherwise just send depth to new peer | ||
dp.NotifyDepth(depth) | ||
} | ||
NotifyPeer(p.BzzAddr, h.Kademlia) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
NotifyPeer(p.BzzAddr, h.Kademlia) | ||
defer h.Off(dp) | ||
return dp.Run(dp.HandleMsg) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had the impression we named these channels with postfix
C
. I found this contributes to clarity. Did we stop doing this?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not know that suffix C is a hard requirement. This variables are local and very close in this function, so I saw no need to suffix them. Do you insist on adding the C?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't insist, but I just always do :)