diff --git a/client.go b/client.go index 013dd5a..25e81e2 100644 --- a/client.go +++ b/client.go @@ -2,15 +2,11 @@ package multistream import ( "bytes" - "crypto/rand" - "encoding/binary" "errors" "fmt" "io" "os" "runtime/debug" - "strconv" - "strings" ) // ErrNotSupported is the error returned when the muxer doesn't support @@ -34,12 +30,6 @@ func (e ErrNotSupported[T]) Is(target error) bool { // specified. var ErrNoProtocols = errors.New("no protocols specified") -const ( - tieBreakerPrefix = "select:" - initiator = "initiator" - responder = "responder" -) - // SelectProtoOrFail performs the initial multistream handshake // to inform the muxer of the protocol that will be used to communicate // on this ReadWriteCloser. It returns an error if, for example, @@ -110,84 +100,6 @@ func SelectOneOf[T StringLike](protos []T, rwc io.ReadWriteCloser) (proto T, err return proto, err } -const simOpenProtocol = "/libp2p/simultaneous-connect" - -// SelectWithSimopenOrFail performs protocol negotiation with the simultaneous open extension. -// The returned boolean indicator will be true if we should act as a server. -func SelectWithSimopenOrFail[T StringLike](protos []T, rwc io.ReadWriteCloser) (proto T, isServer bool, err error) { - defer func() { - if rerr := recover(); rerr != nil { - fmt.Fprintf(os.Stderr, "caught panic: %s\n%s\n", rerr, debug.Stack()) - err = fmt.Errorf("panic selecting protocol with simopen: %s", rerr) - } - }() - - if len(protos) == 0 { - return "", false, ErrNoProtocols - } - - werrCh := make(chan error, 1) - go func() { - var buf bytes.Buffer - if err := delitmWriteAll(&buf, []byte(ProtocolID), []byte(simOpenProtocol), []byte(protos[0])); err != nil { - werrCh <- err - return - } - - _, err := io.Copy(rwc, &buf) - werrCh <- err - }() - - if err := readMultistreamHeader(rwc); err != nil { - return "", false, err - } - - tok, err := ReadNextToken[T](rwc) - if err != nil { - return "", false, err - } - - if err = <-werrCh; err != nil { - return "", false, err - } - - switch tok { - case simOpenProtocol: - // simultaneous open - return simOpen(protos, rwc) - case "na": - // client open - proto, err := clientOpen(protos, rwc) - if err != nil { - return "", false, err - } - return proto, false, nil - default: - return "", false, fmt.Errorf("unexpected response: %s", tok) - } -} - -func clientOpen[T StringLike](protos []T, rwc io.ReadWriteCloser) (T, error) { - // check to see if we selected the pipelined protocol - tok, err := ReadNextToken[T](rwc) - if err != nil { - return "", err - } - - switch tok { - case protos[0]: - return tok, nil - case "na": - proto, err := selectProtosOrFail(protos[1:], rwc) - if _, ok := err.(ErrNotSupported[T]); ok { - return "", ErrNotSupported[T]{protos} - } - return proto, err - default: - return "", fmt.Errorf("unexpected response: %s", tok) - } -} - func selectProtosOrFail[T StringLike](protos []T, rwc io.ReadWriteCloser) (T, error) { for _, p := range protos { err := trySelect(p, rwc) @@ -202,131 +114,6 @@ func selectProtosOrFail[T StringLike](protos []T, rwc io.ReadWriteCloser) (T, er return "", ErrNotSupported[T]{protos} } -func simOpen[T StringLike](protos []T, rwc io.ReadWriteCloser) (T, bool, error) { - randBytes := make([]byte, 8) - _, err := rand.Read(randBytes) - if err != nil { - return "", false, err - } - myNonce := binary.LittleEndian.Uint64(randBytes) - - werrCh := make(chan error, 1) - go func() { - myselect := []byte(tieBreakerPrefix + strconv.FormatUint(myNonce, 10)) - err := delimWriteBuffered(rwc, myselect) - werrCh <- err - }() - - // skip exactly one protocol - // see https://github.com/multiformats/go-multistream/pull/42#discussion_r558757135 - _, err = ReadNextToken[T](rwc) - if err != nil { - return "", false, err - } - - // read the tie breaker nonce - tok, err := ReadNextToken[T](rwc) - if err != nil { - return "", false, err - } - if !strings.HasPrefix(string(tok), tieBreakerPrefix) { - return "", false, errors.New("tie breaker nonce not sent with the correct prefix") - } - - if err = <-werrCh; err != nil { - return "", false, err - } - - peerNonce, err := strconv.ParseUint(string(tok[len(tieBreakerPrefix):]), 10, 64) - if err != nil { - return "", false, err - } - - var iamserver bool - - if peerNonce == myNonce { - return "", false, errors.New("failed client selection; identical nonces") - } - iamserver = peerNonce > myNonce - - var proto T - if iamserver { - proto, err = simOpenSelectServer(protos, rwc) - } else { - proto, err = simOpenSelectClient(protos, rwc) - } - - return proto, iamserver, err -} - -func simOpenSelectServer[T StringLike](protos []T, rwc io.ReadWriteCloser) (T, error) { - werrCh := make(chan error, 1) - go func() { - err := delimWriteBuffered(rwc, []byte(responder)) - werrCh <- err - }() - - tok, err := ReadNextToken[T](rwc) - if err != nil { - return "", err - } - if tok != initiator { - return "", fmt.Errorf("unexpected response: %s", tok) - } - if err = <-werrCh; err != nil { - return "", err - } - for { - tok, err = ReadNextToken[T](rwc) - - if err == io.EOF { - return "", ErrNotSupported[T]{protos} - } - - if err != nil { - return "", err - } - - for _, p := range protos { - if tok == p { - err = delimWriteBuffered(rwc, []byte(p)) - if err != nil { - return "", err - } - - return p, nil - } - } - - err = delimWriteBuffered(rwc, []byte("na")) - if err != nil { - return "", err - } - } - -} - -func simOpenSelectClient[T StringLike](protos []T, rwc io.ReadWriteCloser) (T, error) { - werrCh := make(chan error, 1) - go func() { - err := delimWriteBuffered(rwc, []byte(initiator)) - werrCh <- err - }() - - tok, err := ReadNextToken[T](rwc) - if err != nil { - return "", err - } - if tok != responder { - return "", fmt.Errorf("unexpected response: %s", tok) - } - if err = <-werrCh; err != nil { - return "", err - } - - return selectProtosOrFail(protos, rwc) -} - func readMultistreamHeader(r io.Reader) error { tok, err := ReadNextToken[string](r) if err != nil { diff --git a/multistream_test.go b/multistream_test.go index 96570f9..199dc60 100644 --- a/multistream_test.go +++ b/multistream_test.go @@ -801,177 +801,6 @@ func TestNegotiatePeerSendsAndCloses(t *testing.T) { } } -func TestSimopenClientServer(t *testing.T) { - a, b := newPipe(t) - - mux := NewMultistreamMuxer[string]() - mux.AddHandler("/a", nil) - - done := make(chan struct{}) - go func() { - selected, _, err := mux.Negotiate(a) - if err != nil { - t.Error(err) - } - if selected != "/a" { - t.Error("incorrect protocol selected") - } - close(done) - }() - - proto, server, err := SelectWithSimopenOrFail([]string{"/a"}, b) - if err != nil { - t.Fatal(err) - } - - if proto != "/a" { - t.Fatal("wrong protocol selected") - } - - if server { - t.Fatal("expected to be client") - } - - select { - case <-time.After(time.Second): - t.Fatal("protocol negotiation didn't complete") - case <-done: - } - - verifyPipe(t, a, b) -} - -func TestSimopenClientServerFail(t *testing.T) { - a, b := newPipe(t) - - mux := NewMultistreamMuxer[string]() - mux.AddHandler("/a", nil) - - done := make(chan struct{}) - go func() { - _, _, err := mux.Negotiate(a) - if err != io.EOF { - t.Error(err) - } - close(done) - }() - - _, _, err := SelectWithSimopenOrFail([]string{"/b"}, b) - if !cmpErrNotSupport(err, ErrNotSupported[string]{[]string{"/b"}}) { - t.Fatal(err) - } - b.Close() - - select { - case <-time.After(time.Second): - t.Fatal("protocol negotiation didn't complete") - case <-done: - } -} - -func TestSimopenClientClient(t *testing.T) { - a, b := newPipe(t) - - done := make(chan bool, 1) - go func() { - proto, server, err := SelectWithSimopenOrFail([]string{"/a"}, b) - if err != nil { - t.Error(err) - } - if proto != "/a" { - t.Error("wrong protocol selected") - } - done <- server - }() - - proto, servera, err := SelectWithSimopenOrFail([]string{"/a"}, a) - if err != nil { - t.Fatal(err) - } - if proto != "/a" { - t.Fatal("wrong protocol selected") - } - - var serverb bool - select { - case <-time.After(time.Second): - t.Fatal("protocol negotiation didn't complete") - - case serverb = <-done: - } - - if servera == serverb { - t.Fatal("client selection failed") - } - - verifyPipe(t, a, b) -} - -func TestSimopenClientClient2(t *testing.T) { - a, b := newPipe(t) - - done := make(chan bool, 1) - go func() { - proto, server, err := SelectWithSimopenOrFail([]string{"/a", "/b"}, b) - if err != nil { - t.Error(err) - } - if proto != "/b" { - t.Error("wrong protocol selected") - } - done <- server - }() - - proto, servera, err := SelectWithSimopenOrFail([]string{"/b"}, a) - if err != nil { - t.Fatal(err) - } - if proto != "/b" { - t.Fatal("wrong protocol selected") - } - - var serverb bool - select { - case <-time.After(time.Second): - t.Fatal("protocol negotiation didn't complete") - - case serverb = <-done: - } - - if servera == serverb { - t.Fatal("client selection failed") - } - - verifyPipe(t, a, b) -} - -func TestSimopenClientClientFail(t *testing.T) { - a, b := newPipe(t) - - done := make(chan struct{}) - go func() { - _, _, err := SelectWithSimopenOrFail([]string{"/a"}, b) - if !cmpErrNotSupport(err, ErrNotSupported[string]{[]string{"/a"}}) { - t.Error(err) - } - b.Close() - close(done) - }() - - _, _, err := SelectWithSimopenOrFail([]string{"/b"}, a) - if !cmpErrNotSupport(err, ErrNotSupported[string]{[]string{"/b"}}) { - t.Fatal(err) - } - a.Close() - - select { - case <-time.After(time.Second): - t.Fatal("protocol negotiation didn't complete") - - case <-done: - } -} - type rwc struct { *strings.Reader } @@ -985,7 +814,7 @@ func (*rwc) Close() error { } func FuzzMultistream(f *testing.F) { - f.Add("/libp2p/simultaneous-connect") + f.Add("/multistream/1.0.0") f.Add(ProtocolID) f.Fuzz(func(t *testing.T, b string) {