Skip to content

Commit

Permalink
maxbuffersize fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ckousik committed Dec 2, 2022
1 parent 4437740 commit a33c9c3
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 25 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/libp2p/go-libp2p

go 1.18

replace github.com/libp2p/go-msgio => github.com/ckousik/go-msgio v0.0.0-20221201141125-79d34d00568e

require (
github.com/benbjohnson/clock v1.3.0
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/cilium/ebpf v0.2.0/go.mod h1:To2CFviqOWL/M0gIMsvSMlqe7em/l1ALkX1PyjrX2Qs=
github.com/ckousik/go-msgio v0.0.0-20221201141125-79d34d00568e h1:Fgh7/75YRtseDVrUFJABwmczrf/oAlyzw20LL/eufUs=
github.com/ckousik/go-msgio v0.0.0-20221201141125-79d34d00568e/go.mod h1:0XrmmWaKXpZiE2zRSrUK9f4AC/K02z7hnXb48nP+KwM=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327/go.mod h1:ZJeTFisyysqgcCdecO57Dj79RfL0LNeGiFUqLYQRYLE=
Expand Down Expand Up @@ -309,8 +311,6 @@ github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUI
github.com/libp2p/go-libp2p-testing v0.12.0/go.mod h1:KcGDRXyN7sQCllucn1cOOS+Dmm7ujhfEyXQL5lvkcPg=
github.com/libp2p/go-mplex v0.7.0 h1:BDhFZdlk5tbr0oyFq/xv/NPGfjbnrsDam1EvutpBDbY=
github.com/libp2p/go-mplex v0.7.0/go.mod h1:rW8ThnRcYWft/Jb2jeORBmPd6xuG3dGxWN/W168L9EU=
github.com/libp2p/go-msgio v0.2.0 h1:W6shmB+FeynDrUVl2dgFQvzfBZcXiyqY4VmpQLu9FqU=
github.com/libp2p/go-msgio v0.2.0/go.mod h1:dBVM1gW3Jk9XqHkU4eKdGvVHdLa51hoGfll6jMJMSlY=
github.com/libp2p/go-nat v0.1.0 h1:MfVsH6DLcpa04Xr+p8hmVRG4juse0s3J8HyNWYHffXg=
github.com/libp2p/go-nat v0.1.0/go.mod h1:X7teVkwRHNInVNWQiO/tAiAVRwSr5zoRz4YSTC3uRBM=
github.com/libp2p/go-netroute v0.1.2/go.mod h1:jZLDV+1PE8y5XxBySEBgbuVAXbhtuHSdmLPL2n9MKbk=
Expand Down
36 changes: 21 additions & 15 deletions p2p/transport/webrtc/datachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,21 @@ var _ network.MuxedStream = &dataChannel{}

const (
// maxMessageSize is limited to 16384 bytes in the SDP.
maxMessageSize uint64 = 16384
// Max message size limit in the SDP is limited to 16384 bytes.
// We keep a maximum of 2 messages in the buffer
maxBufferedAmount uint64 = 3 * maxMessageSize
maxMessageSize int = 16384
// maxMessageSize is set to 1MB since pion SCTP streams have
// an internal buffer size of 1MB by default. Currently, there is
// no method to change this value when creating a datachannel
// or a PeerConnection.
// https://github.com/pion/sctp/blob/c0159aa2d49c240362038edf88baa8a9e6cfcede/association.go#L47
maxBufferedAmount int = 1024 * 1024
// bufferedAmountLowThreshold and maxBufferedAmount are bound
// to a stream but congestion control is done on the whole
// SCTP association. This means that a single stream can monopolize
// the complete congestion control window (cwnd) if it does not
// read stream data and it's remote continues to send. We can
// add messages to the send buffer once there is space for 1 full
// sized message.
bufferedAmountLowThreshold uint64 = 16384
bufferedAmountLowThreshold uint64 = uint64(maxMessageSize)

protoOverhead int = 5
varintOverhead int = 2
Expand Down Expand Up @@ -85,7 +88,7 @@ func newDataChannel(
ctx: ctx,
cancel: cancel,
writeAvailable: make(chan struct{}),
reader: protoio.NewDelimitedReader(rwc, 16384),
reader: protoio.NewDelimitedReaderWithSizedBuffer(rwc, 16384),
writer: protoio.NewDelimitedWriter(rwc),
readBuf: []byte{},
requestRead: make(chan struct{}, 5),
Expand All @@ -95,7 +98,11 @@ func newDataChannel(

channel.SetBufferedAmountLowThreshold(bufferedAmountLowThreshold)
channel.OnBufferedAmountLow(func() {
result.writeAvailable <- struct{}{}
result.m.Lock()
writeAvailable := result.writeAvailable
result.writeAvailable = make(chan struct{})
result.m.Unlock()
close(writeAvailable)
})

result.wg.Add(1)
Expand Down Expand Up @@ -171,19 +178,16 @@ func (d *dataChannel) Write(b []byte) (int, error) {

var err error
var (
chunkSize = int(maxMessageSize) - protoOverhead - varintOverhead
chunkSize = maxMessageSize - protoOverhead - varintOverhead
n = 0
)

for len(b) > 0 {
end := chunkSize
if len(b) < end {
end = len(b)
}
end := min(chunkSize, len(b))

written, err := d.partialWrite(b[:end])
if err != nil {
break
return n + written, err
}
b = b[end:]
n += written
Expand All @@ -204,6 +208,7 @@ func (d *dataChannel) partialWrite(b []byte) (int, error) {
d.m.Lock()
deadline := d.writeDeadline
deadlineUpdated := d.deadlineUpdated
writeAvailable := d.writeAvailable
d.m.Unlock()
if !deadline.IsZero() {
// check if deadline exceeded
Expand All @@ -219,11 +224,12 @@ func (d *dataChannel) partialWrite(b []byte) (int, error) {
}

msg := &pb.Message{Message: b}
if d.channel.BufferedAmount()+uint64(len(b))+uint64(varintOverhead) > maxBufferedAmount {
bufferedAmount := int(d.channel.BufferedAmount()) + len(b) + protoOverhead + varintOverhead
if bufferedAmount > maxBufferedAmount {
select {
case <-timeout:
return 0, os.ErrDeadlineExceeded
case <-d.writeAvailable:
case <-writeAvailable:
return d.writeMessage(msg)
case <-d.ctx.Done():
return 0, io.ErrClosedPipe
Expand Down
70 changes: 62 additions & 8 deletions p2p/transport/webrtc/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,22 +239,22 @@ func TestTransportWebRTC_DialerCanCreateStreams(t *testing.T) {

}

func TestTransportWebRTC_StreamReadDeadline(t *testing.T) {
func TestTransportWebRTC_StreamSetReadDeadline(t *testing.T) {
tr, listeningPeer := getTransport(t)
listenMultiaddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/udp/0/webrtc", listenerIp))
require.NoError(t, err)
listener, err := tr.Listen(listenMultiaddr)
require.NoError(t, err)

tr1, connectingPeer := getTransport(t)
done := make(chan struct{})

go func() {
lconn, err := listener.Accept()
require.NoError(t, err)
t.Logf("listener accepted connection")
require.Equal(t, connectingPeer, lconn.RemotePeer())
done <- struct{}{}
_, err = lconn.AcceptStream()
require.NoError(t, err)
}()

conn, err := tr1.Dial(context.Background(), listener.Multiaddrs()[0], listeningPeer)
Expand All @@ -274,11 +274,65 @@ func TestTransportWebRTC_StreamReadDeadline(t *testing.T) {
_, err = stream.Read([]byte{0, 0})
require.ErrorIs(t, err, os.ErrDeadlineExceeded)

select {
case <-done:
case <-time.After(10 * time.Second):
t.Fatal("timed out")
}
}

func TestTransportWebRTC_StreamSetWriteDeadline(t *testing.T) {
tr, listeningPeer := getTransport(t)
listenMultiaddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/udp/0/webrtc", listenerIp))
require.NoError(t, err)
listener, err := tr.Listen(listenMultiaddr)
require.NoError(t, err)

tr1, connectingPeer := getTransport(t)

go func() {
lconn, err := listener.Accept()
require.NoError(t, err)
require.Equal(t, connectingPeer, lconn.RemotePeer())
_, err = lconn.AcceptStream()
require.NoError(t, err)
}()

conn, err := tr1.Dial(context.Background(), listener.Multiaddrs()[0], listeningPeer)
require.NoError(t, err)
stream, err := conn.OpenStream(context.Background())
require.NoError(t, err)

stream.SetWriteDeadline(time.Now().Add(200 * time.Millisecond))
_, err = stream.Write(make([]byte, 2*maxBufferedAmount))
require.ErrorIs(t, err, os.ErrDeadlineExceeded)
}

func TestTransportWebRTC_ReadPartialMessage(t *testing.T) {
tr, listeningPeer := getTransport(t)
listenMultiaddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/udp/0/webrtc", listenerIp))
require.NoError(t, err)
listener, err := tr.Listen(listenMultiaddr)
require.NoError(t, err)

tr1, connectingPeer := getTransport(t)

go func() {
lconn, err := listener.Accept()
require.NoError(t, err)
require.Equal(t, connectingPeer, lconn.RemotePeer())
stream, err := lconn.AcceptStream()
require.NoError(t, err)
_, err = stream.Write(make([]byte, 2*maxBufferedAmount))
require.NoError(t, err)
}()

conn, err := tr1.Dial(context.Background(), listener.Multiaddrs()[0], listeningPeer)
require.NoError(t, err)
stream, err := conn.OpenStream(context.Background())
require.NoError(t, err)

buf := make([]byte, 10)
stream.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
n, err := stream.Read(buf)
require.NoError(t, err)
require.Equal(t, n, 10)

}

func TestTransportWebRTC_StreamCanCloseWhenReadActive(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions p2p/transport/webrtc/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,10 @@ func genUfrag(n int) string {
}
return string(b)
}

func min(a, b int) int {
if a < b {
return a
}
return b
}

0 comments on commit a33c9c3

Please sign in to comment.