-
Notifications
You must be signed in to change notification settings - Fork 112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(dot/network): implement streamManager to cleanup not recently used streams #1611
Changes from 38 commits
45e059b
5197340
8d96671
42e2f44
161406e
aa538da
735c60f
bcc5e6f
db5ebf2
35200ba
78895dd
ef61ee8
60afa84
8e70013
4af4f95
fb467b1
56c1aaa
aa58de6
e1563f3
c8b5544
8d54da5
0ef575d
3f3055e
5565d90
a1babfa
a9fab01
80569f5
f392695
a6ef2ac
da6d0b4
2162ebc
d4085c1
85e2e5d
03480e3
d26bce1
945f95b
4942020
ea5248c
bb94aec
09d9aab
0fe8ea4
7027073
bbdee57
39694fa
f061756
c3b1a83
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,68 @@ | ||||||||||||||||||||||||||||||||||||
package network | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
import ( | ||||||||||||||||||||||||||||||||||||
"context" | ||||||||||||||||||||||||||||||||||||
"sync" | ||||||||||||||||||||||||||||||||||||
"time" | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
"github.com/libp2p/go-libp2p-core/network" | ||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
var cleanupStreamInterval = time.Minute | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
// streamManager tracks inbound streams and runs a cleanup goroutine every `cleanupStreamInterval` to close streams that | ||||||||||||||||||||||||||||||||||||
// we haven't received any data on for the last time period. this prevents keeping stale streams open and continuously trying to | ||||||||||||||||||||||||||||||||||||
// read from it, which takes up lots of CPU over time. | ||||||||||||||||||||||||||||||||||||
type streamManager struct { | ||||||||||||||||||||||||||||||||||||
ctx context.Context | ||||||||||||||||||||||||||||||||||||
lastReceivedMessage *sync.Map //map[string]time.Time | ||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we using two maps?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated! |
||||||||||||||||||||||||||||||||||||
streams *sync.Map //map[string]network.Stream | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
func newStreamManager(ctx context.Context) *streamManager { | ||||||||||||||||||||||||||||||||||||
return &streamManager{ | ||||||||||||||||||||||||||||||||||||
ctx: ctx, | ||||||||||||||||||||||||||||||||||||
lastReceivedMessage: new(sync.Map), | ||||||||||||||||||||||||||||||||||||
streams: new(sync.Map), | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
func (sm *streamManager) start() { | ||||||||||||||||||||||||||||||||||||
go func() { | ||||||||||||||||||||||||||||||||||||
for { | ||||||||||||||||||||||||||||||||||||
select { | ||||||||||||||||||||||||||||||||||||
case <-sm.ctx.Done(): | ||||||||||||||||||||||||||||||||||||
return | ||||||||||||||||||||||||||||||||||||
case <-time.After(cleanupStreamInterval): | ||||||||||||||||||||||||||||||||||||
sm.cleanupStreams() | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. very cool, can definitely use this in other places too |
||||||||||||||||||||||||||||||||||||
}() | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
func (sm *streamManager) cleanupStreams() { | ||||||||||||||||||||||||||||||||||||
sm.streams.Range(func(id, stream interface{}) bool { | ||||||||||||||||||||||||||||||||||||
lastReceived, has := sm.lastReceivedMessage.Load(id) | ||||||||||||||||||||||||||||||||||||
if !has { | ||||||||||||||||||||||||||||||||||||
_ = stream.(network.Stream).Close() | ||||||||||||||||||||||||||||||||||||
sm.streams.Delete(id) | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
if time.Since(lastReceived.(time.Time)) > cleanupStreamInterval { | ||||||||||||||||||||||||||||||||||||
_ = stream.(network.Stream).Close() | ||||||||||||||||||||||||||||||||||||
sm.streams.Delete(id) | ||||||||||||||||||||||||||||||||||||
sm.lastReceivedMessage.Delete(id) | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
return true | ||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
func (sm *streamManager) logNewStream(stream network.Stream) { | ||||||||||||||||||||||||||||||||||||
sm.lastReceivedMessage.Store(stream.ID(), time.Now()) // prevents closing just opened streams, in case the cleanup goroutine runs at the same time stream is opened | ||||||||||||||||||||||||||||||||||||
sm.streams.Store(stream.ID(), stream) | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
func (sm *streamManager) logMessageReceived(streamID string) { | ||||||||||||||||||||||||||||||||||||
sm.lastReceivedMessage.Store(streamID, time.Now()) | ||||||||||||||||||||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
package network | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"testing" | ||
"time" | ||
|
||
"github.com/libp2p/go-libp2p" | ||
libp2phost "github.com/libp2p/go-libp2p-core/host" | ||
"github.com/libp2p/go-libp2p-core/network" | ||
"github.com/libp2p/go-libp2p-core/peer" | ||
ma "github.com/multiformats/go-multiaddr" | ||
|
||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func setupStreamManagerTest(t *testing.T) (context.Context, []libp2phost.Host, []*streamManager) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
|
||
cleanupStreamInterval = time.Millisecond * 500 | ||
t.Cleanup(func() { | ||
cleanupStreamInterval = time.Minute | ||
cancel() | ||
}) | ||
|
||
smA := newStreamManager(ctx) | ||
smB := newStreamManager(ctx) | ||
|
||
portA := 7001 | ||
portB := 7002 | ||
addrA, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", portA)) | ||
require.NoError(t, err) | ||
addrB, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", portB)) | ||
require.NoError(t, err) | ||
|
||
ha, err := libp2p.New( | ||
ctx, libp2p.ListenAddrs(addrA), | ||
) | ||
require.NoError(t, err) | ||
|
||
hb, err := libp2p.New( | ||
ctx, libp2p.ListenAddrs(addrB), | ||
) | ||
require.NoError(t, err) | ||
|
||
err = ha.Connect(ctx, peer.AddrInfo{ | ||
ID: hb.ID(), | ||
Addrs: hb.Addrs(), | ||
}) | ||
require.NoError(t, err) | ||
|
||
hb.SetStreamHandler("", func(stream network.Stream) { | ||
smB.logNewStream(stream) | ||
}) | ||
|
||
return ctx, []libp2phost.Host{ha, hb}, []*streamManager{smA, smB} | ||
} | ||
|
||
func TestStreamManager(t *testing.T) { | ||
ctx, hosts, sms := setupStreamManagerTest(t) | ||
ha, hb := hosts[0], hosts[1] | ||
smA, smB := sms[0], sms[1] | ||
|
||
stream, err := ha.NewStream(ctx, hb.ID(), "") | ||
require.NoError(t, err) | ||
|
||
smA.logNewStream(stream) | ||
smA.start() | ||
smB.start() | ||
|
||
time.Sleep(cleanupStreamInterval * 2) | ||
connsAToB := ha.Network().ConnsToPeer(hb.ID()) | ||
require.Equal(t, 1, len(connsAToB)) | ||
require.Equal(t, 0, len(connsAToB[0].GetStreams())) | ||
|
||
connsBToA := hb.Network().ConnsToPeer(ha.ID()) | ||
require.Equal(t, 1, len(connsBToA)) | ||
require.Equal(t, 0, len(connsBToA[0].GetStreams())) | ||
} | ||
|
||
func TestStreamManager_KeepStream(t *testing.T) { | ||
ctx, hosts, sms := setupStreamManagerTest(t) | ||
ha, hb := hosts[0], hosts[1] | ||
smA, smB := sms[0], sms[1] | ||
|
||
stream, err := ha.NewStream(ctx, hb.ID(), "") | ||
require.NoError(t, err) | ||
|
||
smA.logNewStream(stream) | ||
smA.start() | ||
smB.start() | ||
|
||
time.Sleep(cleanupStreamInterval / 2) | ||
connsAToB := ha.Network().ConnsToPeer(hb.ID()) | ||
require.Equal(t, 1, len(connsAToB)) | ||
require.Equal(t, 1, len(connsAToB[0].GetStreams())) | ||
|
||
connsBToA := hb.Network().ConnsToPeer(ha.ID()) | ||
require.Equal(t, 1, len(connsBToA)) | ||
require.Equal(t, 1, len(connsBToA[0].GetStreams())) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason why this spawns it's own goroutine instead of calling it via
go s.streamManager.start()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that there's a
ctx
created onNewService
. Do we need both thecloseCh
andctx
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no particular reason, I like it like this since the main network service doesn't need to be concerned with what the streamManager does