diff --git a/go.mod b/go.mod index 6e5057e..fdddfad 100644 --- a/go.mod +++ b/go.mod @@ -7,14 +7,12 @@ require ( github.com/ipfs/go-log/v2 v2.5.0 github.com/libp2p/go-libp2p-core v0.14.0 github.com/multiformats/go-multiaddr v0.4.1 - github.com/stretchr/testify v1.7.0 go.uber.org/atomic v1.7.0 google.golang.org/grpc v1.31.1 ) require ( github.com/btcsuite/btcd v0.20.1-beta // indirect - github.com/davecgh/go-spew v1.1.1 // indirect github.com/gogo/protobuf v1.3.1 // indirect github.com/ipfs/go-cid v0.0.7 // indirect github.com/libp2p/go-buffer-pool v0.0.2 // indirect @@ -28,12 +26,10 @@ require ( github.com/multiformats/go-multibase v0.0.3 // indirect github.com/multiformats/go-multihash v0.0.14 // indirect github.com/multiformats/go-varint v0.0.6 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.19.1 // indirect golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 // indirect golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect - gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) diff --git a/go.sum b/go.sum index b3df2d1..0d872bb 100644 --- a/go.sum +++ b/go.sum @@ -45,10 +45,8 @@ github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlT github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs= github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM= @@ -168,7 +166,6 @@ google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQ google.golang.org/grpc v1.31.1 h1:SfXqXS5hkufcdZ/mHtYCh53P2b+92WQq/DZcKLgsFRs= google.golang.org/grpc v1.31.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= diff --git a/suites/mux/muxer_suite.go b/suites/mux/muxer_suite.go deleted file mode 100644 index 9bb8d28..0000000 --- a/suites/mux/muxer_suite.go +++ /dev/null @@ -1,650 +0,0 @@ -package test - -import ( - "bytes" - "context" - crand "crypto/rand" - "fmt" - "io" - mrand "math/rand" - "net" - "reflect" - "runtime" - "runtime/debug" - "strings" - "sync" - "testing" - "time" - - "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-testing/ci" - - "github.com/stretchr/testify/require" -) - -var randomness []byte -var Subtests map[string]TransportTest - -func init() { - // read 1MB of randomness - randomness = make([]byte, 1<<20) - if _, err := crand.Read(randomness); err != nil { - panic(err) - } - - Subtests = make(map[string]TransportTest) - for _, f := range subtests { - Subtests[getFunctionName(f)] = f - } -} - -func getFunctionName(i interface{}) string { - return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name() -} - -type peerScope struct { - mx sync.Mutex - memory int -} - -func (p *peerScope) ReserveMemory(size int, _ uint8) error { - p.mx.Lock() - p.memory += size - p.mx.Unlock() - return nil -} - -func (p *peerScope) ReleaseMemory(size int) { - p.mx.Lock() - defer p.mx.Unlock() - if p.memory < size { - panic(fmt.Sprintf("tried to release too much memory: %d (current: %d)", size, p.memory)) - } - p.memory -= size -} - -// Check checks that we don't have any more reserved memory. -func (p *peerScope) Check(t *testing.T) { - p.mx.Lock() - defer p.mx.Unlock() - require.Zero(t, p.memory, "expected all reserved memory to have been released") -} - -func (p *peerScope) Stat() network.ScopeStat { return network.ScopeStat{} } -func (p *peerScope) BeginSpan() (network.ResourceScopeSpan, error) { return nil, nil } -func (p *peerScope) Peer() peer.ID { panic("implement me") } - -var _ network.PeerScope = &peerScope{} - -type Options struct { - tr network.Multiplexer - connNum int - streamNum int - msgNum int - msgMin int - msgMax int -} - -func randBuf(size int) []byte { - n := len(randomness) - size - if size < 1 { - panic(fmt.Errorf("requested too large buffer (%d). max is %d", size, len(randomness))) - } - - start := mrand.Intn(n) - return randomness[start : start+size] -} - -func checkErr(t *testing.T, err error) { - if err != nil { - debug.PrintStack() - t.Fatal(err) - } -} - -func echoStream(s network.MuxedStream) { - defer s.Close() - io.Copy(s, s) // echo everything -} - -func GoServe(t *testing.T, tr network.Multiplexer, l net.Listener) (done func()) { - closed := make(chan struct{}, 1) - - go func() { - for { - c1, err := l.Accept() - if err != nil { - select { - case <-closed: - return // closed naturally. - default: - checkErr(t, err) - } - } - - sc1, err := tr.NewConn(c1, true, nil) - checkErr(t, err) - go func() { - for { - str, err := sc1.AcceptStream() - if err != nil { - break - } - go echoStream(str) - } - }() - } - }() - - return func() { - closed <- struct{}{} - } -} - -func SubtestSimpleWrite(t *testing.T, tr network.Multiplexer) { - l, err := net.Listen("tcp", "localhost:0") - checkErr(t, err) - done := GoServe(t, tr, l) - defer done() - - nc1, err := net.Dial("tcp", l.Addr().String()) - checkErr(t, err) - defer nc1.Close() - - scope := &peerScope{} - c1, err := tr.NewConn(nc1, false, scope) - checkErr(t, err) - defer func() { - c1.Close() - scope.Check(t) - }() - - // serve the outgoing conn, because some muxers assume - // that we _always_ call serve. (this is an error?) - go c1.AcceptStream() - - s1, err := c1.OpenStream(context.Background()) - checkErr(t, err) - defer s1.Close() - - buf1 := randBuf(4096) - _, err = s1.Write(buf1) - checkErr(t, err) - - buf2 := make([]byte, len(buf1)) - _, err = io.ReadFull(s1, buf2) - checkErr(t, err) - - require.Equal(t, buf1, buf2) -} - -func SubtestStress(t *testing.T, opt Options) { - msgsize := 1 << 11 - errs := make(chan error) // dont block anything. - - rateLimitN := 5000 // max of 5k funcs, because -race has 8k max. - rateLimitChan := make(chan struct{}, rateLimitN) - for i := 0; i < rateLimitN; i++ { - rateLimitChan <- struct{}{} - } - - rateLimit := func(f func()) { - <-rateLimitChan - f() - rateLimitChan <- struct{}{} - } - - writeStream := func(s network.MuxedStream, bufs chan<- []byte) { - for i := 0; i < opt.msgNum; i++ { - buf := randBuf(msgsize) - bufs <- buf - if _, err := s.Write(buf); err != nil { - errs <- fmt.Errorf("s.Write(buf): %s", err) - continue - } - } - } - - readStream := func(s network.MuxedStream, bufs <-chan []byte) { - buf2 := make([]byte, msgsize) - for buf1 := range bufs { - if _, err := io.ReadFull(s, buf2); err != nil { - errs <- fmt.Errorf("io.ReadFull(s, buf2): %s", err) - continue - } - if !bytes.Equal(buf1, buf2) { - errs <- fmt.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3]) - } - } - } - - openStreamAndRW := func(c network.MuxedConn) { - s, err := c.OpenStream(context.Background()) - if err != nil { - errs <- fmt.Errorf("failed to create NewStream: %s", err) - return - } - - bufs := make(chan []byte, opt.msgNum) - go func() { - writeStream(s, bufs) - close(bufs) - }() - - readStream(s, bufs) - s.Close() - } - - openConnAndRW := func() { - l, err := net.Listen("tcp", "localhost:0") - checkErr(t, err) - done := GoServe(t, opt.tr, l) - defer done() - - nla := l.Addr() - nc, err := net.Dial(nla.Network(), nla.String()) - checkErr(t, err) - if err != nil { - t.Fatal(fmt.Errorf("net.Dial(%s, %s): %s", nla.Network(), nla.String(), err)) - return - } - - scope := &peerScope{} - c, err := opt.tr.NewConn(nc, false, scope) - if err != nil { - t.Fatal(fmt.Errorf("a.AddConn(%s <--> %s): %s", nc.LocalAddr(), nc.RemoteAddr(), err)) - return - } - - // serve the outgoing conn, because some muxers assume - // that we _always_ call serve. (this is an error?) - go func() { - for { - str, err := c.AcceptStream() - if err != nil { - break - } - go echoStream(str) - } - }() - - var wg sync.WaitGroup - for i := 0; i < opt.streamNum; i++ { - wg.Add(1) - go rateLimit(func() { - defer wg.Done() - openStreamAndRW(c) - }) - } - wg.Wait() - c.Close() - scope.Check(t) - } - - openConnsAndRW := func() { - var wg sync.WaitGroup - for i := 0; i < opt.connNum; i++ { - wg.Add(1) - go rateLimit(func() { - defer wg.Done() - openConnAndRW() - }) - } - wg.Wait() - } - - go func() { - openConnsAndRW() - close(errs) // done - }() - - for err := range errs { - t.Error(err) - } - -} - -func tcpPipe(t *testing.T) (net.Conn, net.Conn) { - list, err := net.Listen("tcp", "0.0.0.0:0") - if err != nil { - t.Fatal(err) - } - - con1, err := net.Dial("tcp", list.Addr().String()) - if err != nil { - t.Fatal(err) - } - - con2, err := list.Accept() - if err != nil { - t.Fatal(err) - } - - return con1, con2 -} - -func SubtestStreamOpenStress(t *testing.T, tr network.Multiplexer) { - wg := new(sync.WaitGroup) - - a, b := tcpPipe(t) - defer a.Close() - defer b.Close() - - defer wg.Wait() - - wg.Add(1) - count := 10000 - workers := 5 - go func() { - defer wg.Done() - muxa, err := tr.NewConn(a, true, nil) - if err != nil { - t.Error(err) - return - } - stress := func() { - defer wg.Done() - for i := 0; i < count; i++ { - s, err := muxa.OpenStream(context.Background()) - if err != nil { - t.Error(err) - return - } - err = s.CloseWrite() - if err != nil { - t.Error(err) - } - n, err := s.Read([]byte{0}) - if n != 0 { - t.Error("expected to read no bytes") - } - if err != io.EOF { - t.Errorf("expected an EOF, got %s", err) - } - } - } - - for i := 0; i < workers; i++ { - wg.Add(1) - go stress() - } - }() - - scope := &peerScope{} - muxb, err := tr.NewConn(b, false, scope) - if err != nil { - t.Fatal(err) - } - defer func() { - muxb.Close() - scope.Check(t) - }() - - time.Sleep(time.Millisecond * 50) - - wg.Add(1) - recv := make(chan struct{}, count*workers) - go func() { - defer wg.Done() - for i := 0; i < count*workers; i++ { - str, err := muxb.AcceptStream() - if err != nil { - break - } - wg.Add(1) - go func() { - defer wg.Done() - str.Close() - select { - case recv <- struct{}{}: - default: - t.Error("too many stream") - } - }() - } - }() - - timeout := time.Second * 10 - if ci.IsRunning() { - timeout *= 10 - } - - limit := time.After(timeout) - for i := 0; i < count*workers; i++ { - select { - case <-recv: - case <-limit: - t.Fatal("timed out receiving streams") - } - } - - wg.Wait() -} - -func SubtestStreamReset(t *testing.T, tr network.Multiplexer) { - wg := new(sync.WaitGroup) - defer wg.Wait() - - a, b := tcpPipe(t) - defer a.Close() - defer b.Close() - - wg.Add(1) - scopea := &peerScope{} - muxa, err := tr.NewConn(a, true, scopea) - if err != nil { - t.Error(err) - return - } - defer func() { - muxa.Close() - scopea.Check(t) - }() - - go func() { - defer wg.Done() - s, err := muxa.OpenStream(context.Background()) - if err != nil { - t.Error(err) - return - } - time.Sleep(time.Millisecond * 50) - - _, err = s.Write([]byte("foo")) - if err != network.ErrReset { - t.Error("should have been stream reset") - } - s.Close() - }() - - scopeb := &peerScope{} - muxb, err := tr.NewConn(b, false, scopeb) - if err != nil { - t.Fatal(err) - } - defer func() { - muxb.Close() - scopeb.Check(t) - }() - - str, err := muxb.AcceptStream() - checkErr(t, err) - str.Reset() - - wg.Wait() -} - -// check that Close also closes the underlying net.Conn -func SubtestWriteAfterClose(t *testing.T, tr network.Multiplexer) { - a, b := tcpPipe(t) - - scopea := &peerScope{} - muxa, err := tr.NewConn(a, true, scopea) - checkErr(t, err) - - scopeb := &peerScope{} - muxb, err := tr.NewConn(b, false, scopeb) - checkErr(t, err) - - checkErr(t, muxa.Close()) - scopea.Check(t) - checkErr(t, muxb.Close()) - scopeb.Check(t) - - // make sure the underlying net.Conn was closed - if _, err := a.Write([]byte("foobar")); err == nil || !strings.Contains(err.Error(), "use of closed network connection") { - t.Fatal("write should have failed") - } - if _, err := b.Write([]byte("foobar")); err == nil || !strings.Contains(err.Error(), "use of closed network connection") { - t.Fatal("write should have failed") - } -} - -func SubtestStreamLeftOpen(t *testing.T, tr network.Multiplexer) { - a, b := tcpPipe(t) - - const numStreams = 10 - const dataLen = 50 * 1024 - - scopea := &peerScope{} - muxa, err := tr.NewConn(a, true, scopea) - checkErr(t, err) - - scopeb := &peerScope{} - muxb, err := tr.NewConn(b, false, scopeb) - checkErr(t, err) - - var wg sync.WaitGroup - wg.Add(1 + numStreams) - go func() { - defer wg.Done() - for i := 0; i < numStreams; i++ { - stra, err := muxa.OpenStream(context.Background()) - checkErr(t, err) - go func() { - defer wg.Done() - _, err = stra.Write(randBuf(dataLen)) - checkErr(t, err) - // do NOT close or reset the stream - }() - } - }() - - wg.Add(1 + numStreams) - go func() { - defer wg.Done() - for i := 0; i < numStreams; i++ { - str, err := muxb.AcceptStream() - checkErr(t, err) - go func() { - defer wg.Done() - _, err = io.ReadFull(str, make([]byte, dataLen)) - checkErr(t, err) - }() - } - }() - - // Now we have a bunch of open streams. - // Make sure that their memory is returned when we close the connection. - wg.Wait() - - muxa.Close() - scopea.Check(t) - muxb.Close() - scopeb.Check(t) -} - -func SubtestStress1Conn1Stream1Msg(t *testing.T, tr network.Multiplexer) { - SubtestStress(t, Options{ - tr: tr, - connNum: 1, - streamNum: 1, - msgNum: 1, - msgMax: 100, - msgMin: 100, - }) -} - -func SubtestStress1Conn1Stream100Msg(t *testing.T, tr network.Multiplexer) { - SubtestStress(t, Options{ - tr: tr, - connNum: 1, - streamNum: 1, - msgNum: 100, - msgMax: 100, - msgMin: 100, - }) -} - -func SubtestStress1Conn100Stream100Msg(t *testing.T, tr network.Multiplexer) { - SubtestStress(t, Options{ - tr: tr, - connNum: 1, - streamNum: 100, - msgNum: 100, - msgMax: 100, - msgMin: 100, - }) -} - -func SubtestStress10Conn10Stream50Msg(t *testing.T, tr network.Multiplexer) { - SubtestStress(t, Options{ - tr: tr, - connNum: 10, - streamNum: 10, - msgNum: 50, - msgMax: 100, - msgMin: 100, - }) -} - -func SubtestStress1Conn1000Stream10Msg(t *testing.T, tr network.Multiplexer) { - SubtestStress(t, Options{ - tr: tr, - connNum: 1, - streamNum: 1000, - msgNum: 10, - msgMax: 100, - msgMin: 100, - }) -} - -func SubtestStress1Conn100Stream100Msg10MB(t *testing.T, tr network.Multiplexer) { - SubtestStress(t, Options{ - tr: tr, - connNum: 1, - streamNum: 100, - msgNum: 100, - msgMax: 10000, - msgMin: 1000, - }) -} - -// Subtests are all the subtests run by SubtestAll -var subtests = []TransportTest{ - SubtestSimpleWrite, - SubtestWriteAfterClose, - SubtestStress1Conn1Stream1Msg, - SubtestStress1Conn1Stream100Msg, - SubtestStress1Conn100Stream100Msg, - SubtestStress10Conn10Stream50Msg, - SubtestStress1Conn1000Stream10Msg, - SubtestStress1Conn100Stream100Msg10MB, - SubtestStreamOpenStress, - SubtestStreamReset, - SubtestStreamLeftOpen, -} - -// SubtestAll runs all the stream multiplexer tests against the target -// transport. -func SubtestAll(t *testing.T, tr network.Multiplexer) { - for name, f := range Subtests { - t.Run(name, func(t *testing.T) { - f(t, tr) - }) - } -} - -// TransportTest is a stream multiplex transport test case -type TransportTest func(t *testing.T, tr network.Multiplexer) diff --git a/suites/sec/transport_suite.go b/suites/sec/transport_suite.go deleted file mode 100644 index d4dadcb..0000000 --- a/suites/sec/transport_suite.go +++ /dev/null @@ -1,373 +0,0 @@ -package tsec - -import ( - "bytes" - "context" - "io" - "net" - "sync" - "testing" - "time" - - "math/rand" - - "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/sec" -) - -var Subtests = map[string]func(t *testing.T, at, bt sec.SecureTransport, ap, bp peer.ID){ - "RW": SubtestRW, - "Keys": SubtestKeys, - "WrongPeer": SubtestWrongPeer, - "Stream": SubtestStream, - "CancelHandshakeInbound": SubtestCancelHandshakeInbound, - "CancelHandshakeOutbound": SubtestCancelHandshakeOutbound, -} - -var TestMessage = []byte("hello world!") -var TestStreamLen int64 = 1024 * 1024 -var TestSeed int64 = 1812 - -func SubtestAll(t *testing.T, at, bt sec.SecureTransport, ap, bp peer.ID) { - for n, f := range Subtests { - t.Run(n, func(t *testing.T) { - f(t, at, bt, ap, bp) - f(t, bt, at, bp, ap) - }) - } -} - -func randStream() io.Reader { - return &io.LimitedReader{ - R: rand.New(rand.NewSource(TestSeed)), - N: TestStreamLen, - } -} - -func testWriteSustain(t *testing.T, c sec.SecureConn) { - source := randStream() - n := int64(0) - for { - coppied, err := io.CopyN(c, source, int64(rand.Intn(8000))) - n += coppied - - switch err { - case io.EOF: - if n != TestStreamLen { - t.Fatal("incorrect random stream length") - } - return - case nil: - default: - t.Fatal(err) - } - } -} - -func testReadSustain(t *testing.T, c sec.SecureConn) { - expected := randStream() - total := 0 - ebuf := make([]byte, 1024) - abuf := make([]byte, 1024) - for { - n, err := c.Read(abuf) - if err != nil { - t.Fatal(err) - } - total += n - _, err = io.ReadFull(expected, ebuf[:n]) - if err != nil { - t.Fatal(err) - } - if !bytes.Equal(abuf[:n], ebuf[:n]) { - t.Fatal("bytes not equal") - } - if total == int(TestStreamLen) { - return - } - } -} -func testWrite(t *testing.T, c sec.SecureConn) { - n, err := c.Write(TestMessage) - if err != nil { - t.Fatal(err) - } - if n != len(TestMessage) { - t.Errorf("wrote %d bytes, expected to write %d bytes", n, len(TestMessage)) - } -} - -func testRead(t *testing.T, c sec.SecureConn) { - buf := make([]byte, 100) - n, err := c.Read(buf) - if err != nil { - t.Fatal(err) - } - if n != len(TestMessage) { - t.Errorf("wrote %d bytes, expected to write %d bytes", n, len(TestMessage)) - } - if !bytes.Equal(buf[:n], TestMessage) { - t.Errorf("received bad test message: %s", string(buf[:n])) - } -} - -func testWriteFail(t *testing.T, c sec.SecureConn) { - n, err := c.Write(TestMessage) - if n != 0 || err == nil { - t.Error("shouldn't have been able to write to a closed conn") - } -} - -func testReadFail(t *testing.T, c sec.SecureConn) { - buf := make([]byte, len(TestMessage)) - n, err := c.Read(buf) - if n != 0 || err == nil { - t.Error("shouldn't have been able to write to a closed conn") - } -} - -func testEOF(t *testing.T, c sec.SecureConn) { - buf := make([]byte, 100) - n, err := c.Read(buf) - if n != 0 { - t.Errorf("didn't expect to read any bytes, read: %d", n) - } - if err != io.EOF { - t.Errorf("expected read to fail with EOF, got: %s", err) - } -} - -func SubtestRW(t *testing.T, at, bt sec.SecureTransport, ap, bp peer.ID) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - a, b := net.Pipe() - - var wg sync.WaitGroup - wg.Add(2) - go func() { - defer wg.Done() - c, err := at.SecureInbound(ctx, a, "") - if err != nil { - a.Close() - t.Error(err) - return - } - - if c.LocalPeer() != ap { - t.Errorf("expected local peer %s, got %s", ap, c.LocalPeer()) - } - testWrite(t, c) - testRead(t, c) - c.Close() - testWriteFail(t, c) - testReadFail(t, c) - }() - - go func() { - defer wg.Done() - c, err := bt.SecureOutbound(ctx, b, ap) - if err != nil { - b.Close() - t.Error(err) - return - } - - if c.RemotePeer() != ap { - t.Errorf("expected remote peer %s, got %s", ap, c.RemotePeer()) - } - if c.LocalPeer() != bp { - t.Errorf("expected local peer %s, got %s", bp, c.LocalPeer()) - } - testRead(t, c) - testWrite(t, c) - testEOF(t, c) - testWriteFail(t, c) - c.Close() - }() - wg.Wait() -} - -func SubtestKeys(t *testing.T, at, bt sec.SecureTransport, ap, bp peer.ID) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - a, b := net.Pipe() - - var wg sync.WaitGroup - wg.Add(2) - go func() { - defer wg.Done() - c, err := at.SecureInbound(ctx, a, "") - if err != nil { - a.Close() - t.Error(err) - return - } - defer c.Close() - - if c.RemotePeer() != bp { - t.Errorf("expected remote peer %s, got remote peer %s", bp, c.RemotePeer()) - } - if c.LocalPeer() != ap { - t.Errorf("expected local peer %s, got local peer %s", ap, c.LocalPeer()) - } - if !c.LocalPeer().MatchesPrivateKey(c.LocalPrivateKey()) { - t.Error("local private key mismatch") - } - if !c.RemotePeer().MatchesPublicKey(c.RemotePublicKey()) { - t.Error("local private key mismatch") - } - }() - - go func() { - defer wg.Done() - c, err := bt.SecureOutbound(ctx, b, ap) - if err != nil { - b.Close() - t.Error(err) - return - } - defer c.Close() - - if c.RemotePeer() != ap { - t.Errorf("expected remote peer %s, got remote peer %s", ap, c.RemotePeer()) - } - if c.LocalPeer() != bp { - t.Errorf("expected local peer %s, got local peer %s", bp, c.LocalPeer()) - } - if !c.LocalPeer().MatchesPrivateKey(c.LocalPrivateKey()) { - t.Error("local private key mismatch") - } - if !c.RemotePeer().MatchesPublicKey(c.RemotePublicKey()) { - t.Error("local private key mismatch") - } - c.Close() - }() - wg.Wait() -} - -func SubtestWrongPeer(t *testing.T, at, bt sec.SecureTransport, ap, bp peer.ID) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - a, b := net.Pipe() - - var wg sync.WaitGroup - wg.Add(2) - go func() { - defer wg.Done() - defer a.Close() - if _, err := at.SecureInbound(ctx, a, ""); err == nil { - t.Error("connection should have failed") - } - }() - - go func() { - defer wg.Done() - defer b.Close() - if _, err := bt.SecureOutbound(ctx, b, bp); err == nil { - t.Error("connection should have failed") - } - }() - wg.Wait() -} - -func SubtestCancelHandshakeOutbound(t *testing.T, at, bt sec.SecureTransport, ap, bp peer.ID) { - ctx, cancel := context.WithCancel(context.Background()) - a, b := net.Pipe() - defer a.Close() - defer b.Close() - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - if _, err := at.SecureOutbound(ctx, a, ap); err == nil { - t.Error("connection should have failed") - } - }() - time.Sleep(time.Millisecond) - cancel() - wg.Add(1) - go func() { - defer wg.Done() - if _, err := bt.SecureInbound(ctx, b, ""); err == nil { - t.Error("connection should have failed") - } - }() - - wg.Wait() - -} - -func SubtestCancelHandshakeInbound(t *testing.T, at, bt sec.SecureTransport, ap, bp peer.ID) { - ctx, cancel := context.WithCancel(context.Background()) - a, b := net.Pipe() - defer a.Close() - defer b.Close() - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - if _, err := at.SecureInbound(ctx, a, ""); err == nil { - t.Error("connection should have failed") - } - }() - time.Sleep(time.Millisecond) - cancel() - wg.Add(1) - go func() { - defer wg.Done() - if _, err := bt.SecureOutbound(ctx, b, bp); err == nil { - t.Error("connection should have failed") - } - }() - - wg.Wait() - -} - -func SubtestStream(t *testing.T, at, bt sec.SecureTransport, ap, bp peer.ID) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - a, b := net.Pipe() - - defer a.Close() - defer b.Close() - - var wg sync.WaitGroup - wg.Add(2) - go func() { - defer wg.Done() - - c, err := at.SecureInbound(ctx, a, "") - if err != nil { - t.Error(err) - return - } - - var swg sync.WaitGroup - swg.Add(2) - go func() { - defer swg.Done() - testWriteSustain(t, c) - }() - go func() { - defer swg.Done() - testReadSustain(t, c) - }() - swg.Wait() - c.Close() - }() - - go func() { - defer wg.Done() - c, err := bt.SecureOutbound(ctx, b, ap) - if err != nil { - t.Error(err) - return - } - io.Copy(c, c) - c.Close() - }() - wg.Wait() -} diff --git a/suites/transport/stream_suite.go b/suites/transport/stream_suite.go deleted file mode 100644 index e7770bf..0000000 --- a/suites/transport/stream_suite.go +++ /dev/null @@ -1,448 +0,0 @@ -package ttransport - -import ( - "bytes" - "context" - "fmt" - "io" - "io/ioutil" - "os" - "strconv" - "sync" - "testing" - "time" - - crand "crypto/rand" - mrand "math/rand" - - "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/transport" - "github.com/libp2p/go-libp2p-testing/race" - - ma "github.com/multiformats/go-multiaddr" -) - -var randomness []byte - -var StressTestTimeout = 1 * time.Minute - -func init() { - // read 1MB of randomness - randomness = make([]byte, 1<<20) - if _, err := crand.Read(randomness); err != nil { - panic(err) - } - - if timeout := os.Getenv("TEST_STRESS_TIMEOUT_MS"); timeout != "" { - if v, err := strconv.ParseInt(timeout, 10, 32); err == nil { - StressTestTimeout = time.Duration(v) * time.Millisecond - } - } -} - -type Options struct { - ConnNum int - StreamNum int - MsgNum int - MsgMin int - MsgMax int -} - -func fullClose(t *testing.T, s network.MuxedStream) { - if err := s.CloseWrite(); err != nil { - t.Error(err) - s.Reset() - return - } - b, err := ioutil.ReadAll(s) - if err != nil { - t.Error(err) - } - if len(b) != 0 { - t.Error("expected to be done reading") - } - if err := s.Close(); err != nil { - t.Error(err) - } -} - -func randBuf(size int) []byte { - n := len(randomness) - size - if size < 1 { - panic(fmt.Errorf("requested too large buffer (%d). max is %d", size, len(randomness))) - } - - start := mrand.Intn(n) - return randomness[start : start+size] -} - -func echoStream(t *testing.T, s network.MuxedStream) { - // echo everything - if _, err := io.Copy(s, s); err != nil { - t.Error(err) - } -} - -func echo(t *testing.T, c transport.CapableConn) { - var wg sync.WaitGroup - defer wg.Wait() - for { - str, err := c.AcceptStream() - if err != nil { - break - } - wg.Add(1) - go func() { - defer wg.Done() - defer str.Close() - echoStream(t, str) - }() - } -} - -func serve(t *testing.T, l transport.Listener) { - var wg sync.WaitGroup - defer wg.Wait() - - for { - c, err := l.Accept() - if err != nil { - return - } - defer c.Close() - - wg.Add(1) - go func() { - defer wg.Done() - echo(t, c) - }() - } -} - -func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID, opt Options) { - msgsize := 1 << 11 - - rateLimitN := 5000 // max of 5k funcs, because -race has 8k max. - rateLimitChan := make(chan struct{}, rateLimitN) - for i := 0; i < rateLimitN; i++ { - rateLimitChan <- struct{}{} - } - - rateLimit := func(f func()) { - <-rateLimitChan - f() - rateLimitChan <- struct{}{} - } - - writeStream := func(s network.MuxedStream, bufs chan<- []byte) { - for i := 0; i < opt.MsgNum; i++ { - buf := randBuf(msgsize) - bufs <- buf - if _, err := s.Write(buf); err != nil { - t.Errorf("s.Write(buf): %s", err) - return - } - } - } - - readStream := func(s network.MuxedStream, bufs <-chan []byte) { - buf2 := make([]byte, msgsize) - i := 0 - for buf1 := range bufs { - i++ - - if _, err := io.ReadFull(s, buf2); err != nil { - t.Errorf("io.ReadFull(s, buf2): %s", err) - return - } - if !bytes.Equal(buf1, buf2) { - t.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3]) - return - } - } - } - - openStreamAndRW := func(c network.MuxedConn) { - s, err := c.OpenStream(context.Background()) - if err != nil { - t.Errorf("failed to create NewStream: %s", err) - return - } - - bufs := make(chan []byte, opt.MsgNum) - go func() { - writeStream(s, bufs) - close(bufs) - }() - - readStream(s, bufs) - fullClose(t, s) - } - - openConnAndRW := func() { - var wg sync.WaitGroup - defer wg.Wait() - - l, err := ta.Listen(maddr) - if err != nil { - t.Error(err) - return - } - defer l.Close() - - wg.Add(1) - go func() { - defer wg.Done() - serve(t, l) - }() - - c, err := tb.Dial(context.Background(), l.Multiaddr(), peerA) - if err != nil { - t.Error(err) - return - } - defer c.Close() - - // serve the outgoing conn, because some muxers assume - // that we _always_ call serve. (this is an error?) - wg.Add(1) - go func() { - defer wg.Done() - echo(t, c) - }() - - var openWg sync.WaitGroup - for i := 0; i < opt.StreamNum; i++ { - openWg.Add(1) - go rateLimit(func() { - defer openWg.Done() - openStreamAndRW(c) - }) - } - openWg.Wait() - } - - var wg sync.WaitGroup - defer wg.Wait() - for i := 0; i < opt.ConnNum; i++ { - wg.Add(1) - go rateLimit(func() { - defer wg.Done() - openConnAndRW() - }) - } -} - -func SubtestStreamOpenStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { - l, err := ta.Listen(maddr) - if err != nil { - t.Fatal(err) - } - defer l.Close() - - count := 10000 - workers := 5 - - if race.WithRace() { - // the race detector can only deal with 8128 simultaneous goroutines, so let's make sure we don't go overboard. - count = 1000 - } - - var ( - connA, connB transport.CapableConn - ) - - accepted := make(chan error, 1) - go func() { - var err error - connA, err = l.Accept() - accepted <- err - }() - connB, err = tb.Dial(context.Background(), l.Multiaddr(), peerA) - if err != nil { - t.Fatal(err) - } - err = <-accepted - if err != nil { - t.Fatal(err) - } - - defer func() { - if connA != nil { - connA.Close() - } - if connB != nil { - connB.Close() - } - }() - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - for j := 0; j < workers; j++ { - wg.Add(1) - go func() { - defer wg.Done() - for i := 0; i < count; i++ { - s, err := connA.OpenStream(context.Background()) - if err != nil { - t.Error(err) - return - } - wg.Add(1) - go func() { - defer wg.Done() - fullClose(t, s) - }() - } - }() - } - }() - - wg.Add(1) - go func() { - defer wg.Done() - for i := 0; i < count*workers; i++ { - str, err := connB.AcceptStream() - if err != nil { - break - } - wg.Add(1) - go func() { - defer wg.Done() - fullClose(t, str) - }() - } - }() - - timeout := time.After(StressTestTimeout) - done := make(chan struct{}) - - go func() { - wg.Wait() - close(done) - }() - - select { - case <-timeout: - t.Fatal("timed out receiving streams") - case <-done: - } -} - -func SubtestStreamReset(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { - var wg sync.WaitGroup - defer wg.Wait() - - l, err := ta.Listen(maddr) - if err != nil { - t.Fatal(err) - } - defer l.Close() - - wg.Add(1) - go func() { - defer wg.Done() - - muxa, err := l.Accept() - if err != nil { - t.Error(err) - return - } - defer muxa.Close() - - s, err := muxa.OpenStream(context.Background()) - if err != nil { - t.Error(err) - return - } - defer s.Close() - - // Some transports won't open the stream until we write. That's - // fine. - _, _ = s.Write([]byte("foo")) - - time.Sleep(time.Millisecond * 50) - - _, err = s.Write([]byte("bar")) - if err == nil { - t.Error("should have failed to write") - } - - }() - - muxb, err := tb.Dial(context.Background(), l.Multiaddr(), peerA) - if err != nil { - t.Fatal(err) - } - defer muxb.Close() - - str, err := muxb.AcceptStream() - if err != nil { - t.Error(err) - return - } - str.Reset() -} - -func SubtestStress1Conn1Stream1Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { - SubtestStress(t, ta, tb, maddr, peerA, Options{ - ConnNum: 1, - StreamNum: 1, - MsgNum: 1, - MsgMax: 100, - MsgMin: 100, - }) -} - -func SubtestStress1Conn1Stream100Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { - SubtestStress(t, ta, tb, maddr, peerA, Options{ - ConnNum: 1, - StreamNum: 1, - MsgNum: 100, - MsgMax: 100, - MsgMin: 100, - }) -} - -func SubtestStress1Conn100Stream100Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { - SubtestStress(t, ta, tb, maddr, peerA, Options{ - ConnNum: 1, - StreamNum: 100, - MsgNum: 100, - MsgMax: 100, - MsgMin: 100, - }) -} - -func SubtestStress50Conn10Stream50Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { - SubtestStress(t, ta, tb, maddr, peerA, Options{ - ConnNum: 50, - StreamNum: 10, - MsgNum: 50, - MsgMax: 100, - MsgMin: 100, - }) -} - -func SubtestStress1Conn1000Stream10Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { - SubtestStress(t, ta, tb, maddr, peerA, Options{ - ConnNum: 1, - StreamNum: 1000, - MsgNum: 10, - MsgMax: 100, - MsgMin: 100, - }) -} - -func SubtestStress1Conn100Stream100Msg10MB(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { - SubtestStress(t, ta, tb, maddr, peerA, Options{ - ConnNum: 1, - StreamNum: 100, - MsgNum: 100, - MsgMax: 10000, - MsgMin: 1000, - }) -} diff --git a/suites/transport/transport_suite.go b/suites/transport/transport_suite.go deleted file mode 100644 index 6e6b300..0000000 --- a/suites/transport/transport_suite.go +++ /dev/null @@ -1,305 +0,0 @@ -package ttransport - -import ( - "bytes" - "context" - "fmt" - "io/ioutil" - "sync" - "testing" - - "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/transport" - - ma "github.com/multiformats/go-multiaddr" -) - -var testData = []byte("this is some test data") - -func SubtestProtocols(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { - rawIPAddr, _ := ma.NewMultiaddr("/ip4/1.2.3.4") - if ta.CanDial(rawIPAddr) || tb.CanDial(rawIPAddr) { - t.Error("nothing should be able to dial raw IP") - } - - tprotos := make(map[int]bool) - for _, p := range ta.Protocols() { - tprotos[p] = true - } - - if !ta.Proxy() { - protos := maddr.Protocols() - proto := protos[len(protos)-1] - if !tprotos[proto.Code] { - t.Errorf("transport should have reported that it supports protocol '%s' (%d)", proto.Name, proto.Code) - } - } else { - found := false - for _, proto := range maddr.Protocols() { - if tprotos[proto.Code] { - found = true - break - } - } - if !found { - t.Errorf("didn't find any matching proxy protocols in maddr: %s", maddr) - } - } -} - -func SubtestBasic(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - list, err := ta.Listen(maddr) - if err != nil { - t.Fatal(err) - } - defer list.Close() - - var ( - connA, connB transport.CapableConn - done = make(chan struct{}) - ) - defer func() { - <-done - if connA != nil { - connA.Close() - } - if connB != nil { - connB.Close() - } - }() - - go func() { - defer close(done) - var err error - connB, err = list.Accept() - if err != nil { - t.Error(err) - return - } - s, err := connB.AcceptStream() - if err != nil { - t.Error(err) - return - } - - buf, err := ioutil.ReadAll(s) - if err != nil { - t.Error(err) - return - } - - if !bytes.Equal(testData, buf) { - t.Errorf("expected %s, got %s", testData, buf) - } - - n, err := s.Write(testData) - if err != nil { - t.Error(err) - return - } - if n != len(testData) { - t.Error(err) - return - } - - err = s.Close() - if err != nil { - t.Error(err) - } - }() - - if !tb.CanDial(list.Multiaddr()) { - t.Error("CanDial should have returned true") - } - - connA, err = tb.Dial(ctx, list.Multiaddr(), peerA) - if err != nil { - t.Fatal(err) - } - - s, err := connA.OpenStream(context.Background()) - if err != nil { - t.Fatal(err) - } - - n, err := s.Write(testData) - if err != nil { - t.Fatal(err) - return - } - - if n != len(testData) { - t.Fatalf("failed to write enough data (a->b)") - return - } - - if err = s.CloseWrite(); err != nil { - t.Fatal(err) - return - } - - buf, err := ioutil.ReadAll(s) - if err != nil { - t.Fatal(err) - return - } - if !bytes.Equal(testData, buf) { - t.Errorf("expected %s, got %s", testData, buf) - } - - if err = s.Close(); err != nil { - t.Fatal(err) - return - } -} - -func SubtestPingPong(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { - streams := 100 - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - list, err := ta.Listen(maddr) - if err != nil { - t.Fatal(err) - } - defer list.Close() - - var ( - connA, connB transport.CapableConn - ) - defer func() { - if connA != nil { - connA.Close() - } - if connB != nil { - connB.Close() - } - }() - - var wg sync.WaitGroup - - wg.Add(1) - go func() { - defer wg.Done() - var err error - connA, err = list.Accept() - if err != nil { - t.Error(err) - return - } - - var sWg sync.WaitGroup - for i := 0; i < streams; i++ { - s, err := connA.AcceptStream() - if err != nil { - t.Error(err) - return - } - - sWg.Add(1) - go func() { - defer sWg.Done() - - data, err := ioutil.ReadAll(s) - if err != nil { - s.Reset() - t.Error(err) - return - } - if !bytes.HasPrefix(data, testData) { - t.Errorf("expected %q to have prefix %q", string(data), string(testData)) - } - - n, err := s.Write(data) - if err != nil { - s.Reset() - t.Error(err) - return - } - - if n != len(data) { - s.Reset() - t.Error(err) - return - } - s.Close() - }() - } - sWg.Wait() - }() - - if !tb.CanDial(list.Multiaddr()) { - t.Error("CanDial should have returned true") - } - - connB, err = tb.Dial(ctx, list.Multiaddr(), peerA) - if err != nil { - t.Fatal(err) - } - - for i := 0; i < streams; i++ { - s, err := connB.OpenStream(context.Background()) - if err != nil { - t.Error(err) - continue - } - - wg.Add(1) - go func(i int) { - defer wg.Done() - data := []byte(fmt.Sprintf("%s - %d", testData, i)) - n, err := s.Write(data) - if err != nil { - s.Reset() - t.Error(err) - return - } - - if n != len(data) { - s.Reset() - t.Error("failed to write enough data (a->b)") - return - } - if err = s.CloseWrite(); err != nil { - t.Error(err) - return - } - - ret, err := ioutil.ReadAll(s) - if err != nil { - s.Reset() - t.Error(err) - return - } - if !bytes.Equal(data, ret) { - t.Errorf("expected %q, got %q", string(data), string(ret)) - } - - if err = s.Close(); err != nil { - t.Error(err) - return - } - }(i) - } - wg.Wait() -} - -func SubtestCancel(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { - list, err := ta.Listen(maddr) - if err != nil { - t.Fatal(err) - } - defer list.Close() - - ctx, cancel := context.WithCancel(context.Background()) - cancel() - c, err := tb.Dial(ctx, list.Multiaddr(), peerA) - if err == nil { - c.Close() - t.Fatal("dial should have failed") - } -} diff --git a/suites/transport/utils_suite.go b/suites/transport/utils_suite.go deleted file mode 100644 index 1d520ff..0000000 --- a/suites/transport/utils_suite.go +++ /dev/null @@ -1,45 +0,0 @@ -package ttransport - -import ( - "reflect" - "runtime" - "testing" - - "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/transport" - - ma "github.com/multiformats/go-multiaddr" -) - -var Subtests = []func(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID){ - SubtestProtocols, - SubtestBasic, - SubtestCancel, - SubtestPingPong, - - // Stolen from the stream muxer test suite. - SubtestStress1Conn1Stream1Msg, - SubtestStress1Conn1Stream100Msg, - SubtestStress1Conn100Stream100Msg, - SubtestStress50Conn10Stream50Msg, - SubtestStress1Conn1000Stream10Msg, - SubtestStress1Conn100Stream100Msg10MB, - SubtestStreamOpenStress, - SubtestStreamReset, -} - -func getFunctionName(i interface{}) string { - return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name() -} - -func SubtestTransport(t *testing.T, ta, tb transport.Transport, addr string, peerA peer.ID) { - maddr, err := ma.NewMultiaddr(addr) - if err != nil { - t.Fatal(err) - } - for _, f := range Subtests { - t.Run(getFunctionName(f), func(t *testing.T) { - f(t, ta, tb, maddr, peerA) - }) - } -}