Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "system socket" API #154

Merged
merged 2 commits into from
Aug 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cap/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (

type Host api.Host

func (h Host) Resolve(ctx context.Context) error {
return capnp.Client(h).Resolve(ctx)
}

func (h Host) AddRef() Host {
return Host(capnp.Client(h).AddRef())
}
Expand Down
37 changes: 37 additions & 0 deletions guest/system/boot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package system

import (
"context"
"io"
"runtime"

local "github.com/libp2p/go-libp2p/core/host"

"capnproto.org/go/capnp/v3"
"capnproto.org/go/capnp/v3/rpc"
)

type Dialer interface {
DialRPC(context.Context, local.Host) (*rpc.Conn, error)
}

func Bootstrap[T ~capnp.ClientKind](ctx context.Context) T {
conn, err := FDSockDialer{}.DialRPC(ctx)
if err != nil {
return failure[T](err)
}
runtime.SetFinalizer(conn, func(c io.Closer) error {
return c.Close()
})

client := conn.Bootstrap(ctx)
if err := client.Resolve(ctx); err != nil {
return failure[T](err)
}

return T(client)
}

func failure[T ~capnp.ClientKind](err error) T {
return T(capnp.ErrorClient(err))
}
72 changes: 22 additions & 50 deletions guest/system/system.go
Original file line number Diff line number Diff line change
@@ -1,82 +1,54 @@
package system

/*
* The contents of this file will be moved to the ww repository
*/

import (
"context"
"io"
"net"
"os"
"runtime"
"syscall"

"capnproto.org/go/capnp/v3"
"capnproto.org/go/capnp/v3/rpc"
"github.com/wetware/pkg/system"
"golang.org/x/exp/slog"
)

const (
// file descriptor for pre-openned TCP socket
// file descriptor for first pre-openned file descriptor.
PREOPENED_FD = 3
)

// Boot bootstraps and resolves the Capnp client attached
// to the other end of the pre-openned TCP connection.
// capnp.Client will be capnp.ErrorClient if an error ocurred.
func Boot[T ~capnp.ClientKind](ctx context.Context) (T, capnp.ReleaseFunc) {
var closers []io.Closer
release := func() {
for i := range closers {
// call in reverse order, similar to defer
_ = closers[len(closers)-i-1].Close()
}
// FDSockDialer binds to a pre-opened file descriptor (usually a TCP socket),
// and provides an *rcp.Conn to the host.
type FDSockDialer struct{}

func (s FDSockDialer) DialRPC(context.Context) (*rpc.Conn, error) {
f := os.NewFile(uintptr(PREOPENED_FD), "")
if err := syscall.SetNonblock(PREOPENED_FD, false); err != nil {
return nil, err
}

l, err := preopenedListener(&closers)
// Make sure we eventually release the file descriptor.
runtime.SetFinalizer(f, func(c io.Closer) error {
return c.Close()
})

l, err := net.FileListener(f)
if err != nil {
defer release()
return failure[T](err)
return nil, err
}
closers = append(closers, l)
defer l.Close()

tcpConn, err := l.Accept()
raw, err := l.Accept()
if err != nil {
defer release()
return failure[T](err)
return nil, err
}
closers = append(closers, tcpConn)

conn := rpc.NewConn(rpc.NewStreamTransport(tcpConn), &rpc.Options{
conn := rpc.NewConn(rpc.NewStreamTransport(raw), &rpc.Options{
ErrorReporter: system.ErrorReporter{
Logger: slog.Default().WithGroup("guest"),
},
})
closers = append(closers, conn)

client := conn.Bootstrap(ctx)
return T(client), release
}

func failure[T ~capnp.ClientKind](err error) (T, capnp.ReleaseFunc) {
return T(capnp.ErrorClient(err)), func() {}
}

// return the a TCP listener from pre-opened tcp connection by using the fd
func preopenedListener(closers *[]io.Closer) (net.Listener, error) {
f := os.NewFile(uintptr(PREOPENED_FD), "")

if err := syscall.SetNonblock(PREOPENED_FD, false); err != nil {
return nil, err
}
*closers = append(*closers, f)

l, err := net.FileListener(f)
if err != nil {
return nil, err
}
*closers = append(*closers, l)

return l, err
return conn, nil
}
4 changes: 2 additions & 2 deletions rom/internal/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
var ctx = context.Background()

func main() {
host, release := system.Boot[host.Host](ctx)
defer release()
host := system.Bootstrap[host.Host](ctx)
defer host.Release()

view, release := host.View(ctx)
defer release()
Expand Down
Binary file modified rom/internal/main.wasm
Binary file not shown.
44 changes: 44 additions & 0 deletions system/netsock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package system

import (
"context"
"net"

"capnproto.org/go/capnp/v3"
"capnproto.org/go/capnp/v3/rpc"
"github.com/wetware/pkg/util/log"
)

// NetSock is a system socket that uses the host's IP stack.
type NetSock struct {
Addr net.Addr
Logger log.Logger
BootstrapClient capnp.Client

conn *rpc.Conn
}

func (sock *NetSock) Close(context.Context) error {
sock.BootstrapClient.Release()

return sock.conn.Close()
}

func (sock *NetSock) dial(ctx context.Context) error {
raw, err := dial(ctx, sock.Addr)
if err != nil {
return err
}

sock.conn = rpc.NewConn(rpc.NewStreamTransport(raw), &rpc.Options{
ErrorReporter: ErrorReporter{Logger: sock.Logger},
BootstrapClient: sock.BootstrapClient,
})

return nil
}

func dial(ctx context.Context, addr net.Addr) (net.Conn, error) {
dialer := net.Dialer{}
return dialer.DialContext(ctx, addr.Network(), addr.String())
}
60 changes: 13 additions & 47 deletions system/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"capnproto.org/go/capnp/v3"
"capnproto.org/go/capnp/v3/rpc"
"github.com/jpillora/backoff"
"github.com/stealthrocket/wazergo"
"github.com/tetratelabs/wazero"
Expand Down Expand Up @@ -76,16 +75,16 @@ func (wetware[T]) Instantiate(ctx context.Context, r wazero.Runtime, t T) (api.C
}

// module for wetware Host
var module wazergo.HostModule[*Socket] = functions{
var module wazergo.HostModule[*NetSock] = functions{
// TODO(soon): socket exports
// "answer": F0((*Module).Answer),
// "double": F1((*Module).Double),
// "foo": F0((*NetSock).Foo),
// "bar": F1((*NetSock).Bar),
}

// Instantiate the system host module. If instantiation fails, the
// returned context is expired, and the ctx.Err() method returns the
// offending error.
func Instantiate[T ~capnp.ClientKind](ctx context.Context, r wazero.Runtime, t T) (*wazergo.ModuleInstance[*Socket], context.Context, error) {
func Instantiate[T ~capnp.ClientKind](ctx context.Context, r wazero.Runtime, t T) (*wazergo.ModuleInstance[*NetSock], context.Context, error) {
l, err := net.Listen("tcp", ":0") // TODO: localhost?
if err != nil {
return nil, ctx, fmt.Errorf("net: listen: %w", err)
Expand Down Expand Up @@ -115,41 +114,41 @@ func Instantiate[T ~capnp.ClientKind](ctx context.Context, r wazero.Runtime, t T

}

type Option = wazergo.Option[*Socket]
type Option = wazergo.Option[*NetSock]

func logger(log log.Logger) Option {
return wazergo.OptionFunc(func(h *Socket) {
return wazergo.OptionFunc(func(h *NetSock) {
h.Logger = log
})
}

func transport(addr net.Addr) Option {
return wazergo.OptionFunc(func(h *Socket) {
return wazergo.OptionFunc(func(h *NetSock) {
h.Addr = addr
})
}

func bootstrap[T ~capnp.ClientKind](t T) Option {
return wazergo.OptionFunc(func(h *Socket) {
return wazergo.OptionFunc(func(h *NetSock) {
h.BootstrapClient = capnp.Client(t)
})
}

// The `functions` type impements `Module[*Module]`, providing the
// module name, map of exported functions, and the ability to create
// instances of the module type
type functions wazergo.Functions[*Socket]
type functions wazergo.Functions[*NetSock]

func (f functions) Name() string {
return "ww"
}

func (f functions) Functions() wazergo.Functions[*Socket] {
return (wazergo.Functions[*Socket])(f)
func (f functions) Functions() wazergo.Functions[*NetSock] {
return (wazergo.Functions[*NetSock])(f)
}

func (f functions) Instantiate(ctx context.Context, opts ...Option) (out *Socket, err error) {
wazergo.Configure(new(Socket), append(opts, wazergo.OptionFunc(func(h *Socket) {
func (f functions) Instantiate(ctx context.Context, opts ...Option) (out *NetSock, err error) {
wazergo.Configure(new(NetSock), append(opts, wazergo.OptionFunc(func(h *NetSock) {
var b = backoff.Backoff{
Min: time.Millisecond * 1,
Max: time.Minute,
Expand Down Expand Up @@ -178,36 +177,3 @@ func (f functions) Instantiate(ctx context.Context, opts ...Option) (out *Socket

return
}

type Socket struct {
Addr net.Addr
Logger log.Logger
BootstrapClient capnp.Client

conn *rpc.Conn
}

func (sock *Socket) Close(context.Context) error {
sock.BootstrapClient.Release()

return sock.conn.Close()
}

func (sock *Socket) dial(ctx context.Context) error {
raw, err := dial(ctx, sock.Addr)
if err != nil {
return err
}

sock.conn = rpc.NewConn(rpc.NewStreamTransport(raw), &rpc.Options{
ErrorReporter: ErrorReporter{Logger: sock.Logger},
BootstrapClient: sock.BootstrapClient,
})

return nil
}

func dial(ctx context.Context, addr net.Addr) (net.Conn, error) {
dialer := net.Dialer{}
return dialer.DialContext(ctx, addr.Network(), addr.String())
}