Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement vecnet.ReadFrom on Windows #54

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion vecnet/vecnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Buffers net.Buffers
//
// ReadFrom keeps reading until all bufs are filled or EOF is received.
//
// The pre-allocatted space used by ReadFrom is based upon slice lengths.
// The pre-allocated space used by ReadFrom is based upon slice lengths.
func (bufs Buffers) ReadFrom(r io.Reader) (int64, error) {
if conn, ok := r.(syscall.Conn); ok && readFromBuffers != nil {
return readFromBuffers(bufs, conn)
Expand Down
3 changes: 2 additions & 1 deletion vecnet/vecnet_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !linux linux,386
//go:build (!linux && !windows) || (linux && 386)
// +build !linux,!windows linux,386

package vecnet

Expand Down
76 changes: 76 additions & 0 deletions vecnet/vecnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package vecnet

import (
"bytes"
"io"
"net"
"strings"
"testing"
)
Expand Down Expand Up @@ -46,3 +48,77 @@ func TestReadFromSanity(t *testing.T) {
t.Errorf("ReadFrom() = (%v, %#v), want (%v, %#v)", s1, s2, s[:10], s[10:])
}
}

func TestReadFromNetwork(t *testing.T) {
testCloser := func(closer io.Closer) {
t.Helper()
if err := closer.Close(); err != nil {
t.Error(err)
}
}

readerConn, readerAddr := newLocalListener(t)
defer testCloser(readerConn)
writerConn := dialListener(t, readerAddr)
defer testCloser(writerConn)

var (
expected = Buffers{
[]byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ"),
[]byte("0123456789"),
[]byte("abcdefghijklmnopqrstuvwxyz"),
}
received = func() (received Buffers) {
received = make(Buffers, len(expected))
for i := range received {
received[i] = make([]byte, len(expected[i]))
}
return
}()
)

writeMessages(t, writerConn, expected)
if _, err := received.ReadFrom(readerConn); err != nil {
t.Fatal(err)
}

for i, got := range received {
want := expected[i]
if !bytes.Equal(got, want) {
t.Errorf("buffer %d did not match expected data"+
"\n\tgot: %s"+
"\n\twant: %s",
i, got, want,
)
}
}
}

func newLocalListener(t *testing.T) (*net.UDPConn, *net.UDPAddr) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. UDP was chosen here mainly for compat and simplicity, but could easily be changed to use Unix domain sockets instead.
    I could be wrong, but I don't think the message passing semantics differ between them (especially when local).
    In addition, (modern) Unices and Windows typically support UDS, so instead of letting the system choose a UDP port for us, we could create (and cleanup) socket files during the test. But I don't think we have to make this change, so I won't unless requested.

  2. It should be noted that pkg net (as of 1.19) implies all the MsgX methods are not implemented on Plan9 regardless of transport.

t.Helper()
addr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1)}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
t.Fatal(err)
}
addr.Port = conn.LocalAddr().(*net.UDPAddr).Port
return conn, addr
}

func dialListener(t *testing.T, readerAddr *net.UDPAddr) *net.UDPConn {
t.Helper()
writerConn, err := net.DialUDP("udp", nil, readerAddr)
if err != nil {
t.Fatal(err)
}
return writerConn
}

func writeMessages(t *testing.T, conn *net.UDPConn, expected Buffers) {
for _, buf := range expected {
if _, _, err := conn.WriteMsgUDP(buf, nil, nil); err != nil {
t.Fatal(err)
return
}
}
}
95 changes: 95 additions & 0 deletions vecnet/vecnet_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2018 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package vecnet

import (
"io"
"syscall"

"golang.org/x/sys/windows"
)

var readFromBuffers = readFromBuffersWindows

func readFromBuffersWindows(bufs Buffers, conn syscall.Conn) (int64, error) {
rc, err := conn.SyscallConn()
if err != nil {
return 0, err
}

length := int64(0)
for _, buf := range bufs {
length += int64(len(buf))
}
for n := int64(0); n < length; {
cur, err := recvmsg(bufs, rc)
if err != nil && (cur == 0 || err != io.EOF) {
return n, err
}
n += int64(cur)

// Consume buffers to retry.
for consumed := 0; consumed < cur; {
if len(bufs[0]) <= cur-consumed {
consumed += len(bufs[0])
bufs = bufs[1:]
} else {
bufs[0] = bufs[0][cur-consumed:]
break
}
}
}
return length, nil
}

func buildWSABufs(bufs Buffers, WSABufs []windows.WSABuf) []windows.WSABuf {
for _, buf := range bufs {
if l := len(buf); l > 0 {
WSABufs = append(WSABufs, windows.WSABuf{
Len: uint32(l),
Buf: &buf[0],
})
}
}
return WSABufs
}

func recvmsg(bufs Buffers, rc syscall.RawConn) (int, error) {
var (
bytesReceived uint32
WSABufs = buildWSABufs(bufs, make([]windows.WSABuf, 0, 2))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving a note that this magic 2 for the capacity is copied from vecnet_linux.go which does the same.
Not sure the original rationale for using 2 other than it being a reasonable alloc default (as opposed to 0 since we immediately append to it within buildWSABufs)

If there's a better value (typical average bufcount?), it could be used here. But this is probably fine as-is.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually probably stole the 2 from gVisor and didn't think about it further.

bufCount = len(bufs)

msg = windows.WSAMsg{
Buffers: &WSABufs[0],
BufferCount: uint32(bufCount),
}
recvmsgCallBack = func(fd uintptr) bool {
winErr := windows.WSARecvMsg(
windows.Handle(fd),
&msg, &bytesReceived,
nil, nil) // TODO: overlapped structure?
if winErr != nil {
// TODO: double check documentation for other temporary issues
// retry if err is temporary
canRetry := (winErr == windows.WSAEINTR || winErr == windows.WSAEWOULDBLOCK)
return !canRetry
}
return true
}
err = rc.Read(recvmsgCallBack)
)
return int(bytesReceived), err
}