Skip to content

Commit

Permalink
Merge pull request #154 from wetware/feat/system-sock-exports
Browse files Browse the repository at this point in the history
Add "system socket" API
  • Loading branch information
mikelsr committed Aug 13, 2023
2 parents 9317d46 + bb1b9fd commit 214ad5e
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 99 deletions.
4 changes: 4 additions & 0 deletions cap/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (

type Host api.Host

func (h Host) Resolve(ctx context.Context) error {
return capnp.Client(h).Resolve(ctx)
}

func (h Host) AddRef() Host {
return Host(capnp.Client(h).AddRef())
}
Expand Down
37 changes: 37 additions & 0 deletions guest/system/boot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package system

import (
"context"
"io"
"runtime"

local "github.com/libp2p/go-libp2p/core/host"

"capnproto.org/go/capnp/v3"
"capnproto.org/go/capnp/v3/rpc"
)

type Dialer interface {
DialRPC(context.Context, local.Host) (*rpc.Conn, error)
}

func Bootstrap[T ~capnp.ClientKind](ctx context.Context) T {
conn, err := FDSockDialer{}.DialRPC(ctx)
if err != nil {
return failure[T](err)
}
runtime.SetFinalizer(conn, func(c io.Closer) error {
return c.Close()
})

client := conn.Bootstrap(ctx)
if err := client.Resolve(ctx); err != nil {
return failure[T](err)
}

return T(client)
}

func failure[T ~capnp.ClientKind](err error) T {
return T(capnp.ErrorClient(err))
}
72 changes: 22 additions & 50 deletions guest/system/system.go
Original file line number Diff line number Diff line change
@@ -1,82 +1,54 @@
package system

/*
* The contents of this file will be moved to the ww repository
*/

import (
"context"
"io"
"net"
"os"
"runtime"
"syscall"

"capnproto.org/go/capnp/v3"
"capnproto.org/go/capnp/v3/rpc"
"github.com/wetware/pkg/system"
"golang.org/x/exp/slog"
)

const (
// file descriptor for pre-openned TCP socket
// file descriptor for first pre-openned file descriptor.
PREOPENED_FD = 3
)

// Boot bootstraps and resolves the Capnp client attached
// to the other end of the pre-openned TCP connection.
// capnp.Client will be capnp.ErrorClient if an error ocurred.
func Boot[T ~capnp.ClientKind](ctx context.Context) (T, capnp.ReleaseFunc) {
var closers []io.Closer
release := func() {
for i := range closers {
// call in reverse order, similar to defer
_ = closers[len(closers)-i-1].Close()
}
// FDSockDialer binds to a pre-opened file descriptor (usually a TCP socket),
// and provides an *rcp.Conn to the host.
type FDSockDialer struct{}

func (s FDSockDialer) DialRPC(context.Context) (*rpc.Conn, error) {
f := os.NewFile(uintptr(PREOPENED_FD), "")
if err := syscall.SetNonblock(PREOPENED_FD, false); err != nil {
return nil, err
}

l, err := preopenedListener(&closers)
// Make sure we eventually release the file descriptor.
runtime.SetFinalizer(f, func(c io.Closer) error {
return c.Close()
})

l, err := net.FileListener(f)
if err != nil {
defer release()
return failure[T](err)
return nil, err
}
closers = append(closers, l)
defer l.Close()

tcpConn, err := l.Accept()
raw, err := l.Accept()
if err != nil {
defer release()
return failure[T](err)
return nil, err
}
closers = append(closers, tcpConn)

conn := rpc.NewConn(rpc.NewStreamTransport(tcpConn), &rpc.Options{
conn := rpc.NewConn(rpc.NewStreamTransport(raw), &rpc.Options{
ErrorReporter: system.ErrorReporter{
Logger: slog.Default().WithGroup("guest"),
},
})
closers = append(closers, conn)

client := conn.Bootstrap(ctx)
return T(client), release
}

func failure[T ~capnp.ClientKind](err error) (T, capnp.ReleaseFunc) {
return T(capnp.ErrorClient(err)), func() {}
}

// return the a TCP listener from pre-opened tcp connection by using the fd
func preopenedListener(closers *[]io.Closer) (net.Listener, error) {
f := os.NewFile(uintptr(PREOPENED_FD), "")

if err := syscall.SetNonblock(PREOPENED_FD, false); err != nil {
return nil, err
}
*closers = append(*closers, f)

l, err := net.FileListener(f)
if err != nil {
return nil, err
}
*closers = append(*closers, l)

return l, err
return conn, nil
}
4 changes: 2 additions & 2 deletions rom/internal/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
var ctx = context.Background()

func main() {
host, release := system.Boot[host.Host](ctx)
defer release()
host := system.Bootstrap[host.Host](ctx)
defer host.Release()

view, release := host.View(ctx)
defer release()
Expand Down
Binary file modified rom/internal/main.wasm
Binary file not shown.
44 changes: 44 additions & 0 deletions system/netsock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package system

import (
"context"
"net"

"capnproto.org/go/capnp/v3"
"capnproto.org/go/capnp/v3/rpc"
"github.com/wetware/pkg/util/log"
)

// NetSock is a system socket that uses the host's IP stack.
type NetSock struct {
Addr net.Addr
Logger log.Logger
BootstrapClient capnp.Client

conn *rpc.Conn
}

func (sock *NetSock) Close(context.Context) error {
sock.BootstrapClient.Release()

return sock.conn.Close()
}

func (sock *NetSock) dial(ctx context.Context) error {
raw, err := dial(ctx, sock.Addr)
if err != nil {
return err
}

sock.conn = rpc.NewConn(rpc.NewStreamTransport(raw), &rpc.Options{
ErrorReporter: ErrorReporter{Logger: sock.Logger},
BootstrapClient: sock.BootstrapClient,
})

return nil
}

func dial(ctx context.Context, addr net.Addr) (net.Conn, error) {
dialer := net.Dialer{}
return dialer.DialContext(ctx, addr.Network(), addr.String())
}
60 changes: 13 additions & 47 deletions system/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"capnproto.org/go/capnp/v3"
"capnproto.org/go/capnp/v3/rpc"
"github.com/jpillora/backoff"
"github.com/stealthrocket/wazergo"
"github.com/tetratelabs/wazero"
Expand Down Expand Up @@ -76,16 +75,16 @@ func (wetware[T]) Instantiate(ctx context.Context, r wazero.Runtime, t T) (api.C
}

// module for wetware Host
var module wazergo.HostModule[*Socket] = functions{
var module wazergo.HostModule[*NetSock] = functions{
// TODO(soon): socket exports
// "answer": F0((*Module).Answer),
// "double": F1((*Module).Double),
// "foo": F0((*NetSock).Foo),
// "bar": F1((*NetSock).Bar),
}

// Instantiate the system host module. If instantiation fails, the
// returned context is expired, and the ctx.Err() method returns the
// offending error.
func Instantiate[T ~capnp.ClientKind](ctx context.Context, r wazero.Runtime, t T) (*wazergo.ModuleInstance[*Socket], context.Context, error) {
func Instantiate[T ~capnp.ClientKind](ctx context.Context, r wazero.Runtime, t T) (*wazergo.ModuleInstance[*NetSock], context.Context, error) {
l, err := net.Listen("tcp", ":0") // TODO: localhost?
if err != nil {
return nil, ctx, fmt.Errorf("net: listen: %w", err)
Expand Down Expand Up @@ -115,41 +114,41 @@ func Instantiate[T ~capnp.ClientKind](ctx context.Context, r wazero.Runtime, t T

}

type Option = wazergo.Option[*Socket]
type Option = wazergo.Option[*NetSock]

func logger(log log.Logger) Option {
return wazergo.OptionFunc(func(h *Socket) {
return wazergo.OptionFunc(func(h *NetSock) {
h.Logger = log
})
}

func transport(addr net.Addr) Option {
return wazergo.OptionFunc(func(h *Socket) {
return wazergo.OptionFunc(func(h *NetSock) {
h.Addr = addr
})
}

func bootstrap[T ~capnp.ClientKind](t T) Option {
return wazergo.OptionFunc(func(h *Socket) {
return wazergo.OptionFunc(func(h *NetSock) {
h.BootstrapClient = capnp.Client(t)
})
}

// The `functions` type impements `Module[*Module]`, providing the
// module name, map of exported functions, and the ability to create
// instances of the module type
type functions wazergo.Functions[*Socket]
type functions wazergo.Functions[*NetSock]

func (f functions) Name() string {
return "ww"
}

func (f functions) Functions() wazergo.Functions[*Socket] {
return (wazergo.Functions[*Socket])(f)
func (f functions) Functions() wazergo.Functions[*NetSock] {
return (wazergo.Functions[*NetSock])(f)
}

func (f functions) Instantiate(ctx context.Context, opts ...Option) (out *Socket, err error) {
wazergo.Configure(new(Socket), append(opts, wazergo.OptionFunc(func(h *Socket) {
func (f functions) Instantiate(ctx context.Context, opts ...Option) (out *NetSock, err error) {
wazergo.Configure(new(NetSock), append(opts, wazergo.OptionFunc(func(h *NetSock) {
var b = backoff.Backoff{
Min: time.Millisecond * 1,
Max: time.Minute,
Expand Down Expand Up @@ -178,36 +177,3 @@ func (f functions) Instantiate(ctx context.Context, opts ...Option) (out *Socket

return
}

type Socket struct {
Addr net.Addr
Logger log.Logger
BootstrapClient capnp.Client

conn *rpc.Conn
}

func (sock *Socket) Close(context.Context) error {
sock.BootstrapClient.Release()

return sock.conn.Close()
}

func (sock *Socket) dial(ctx context.Context) error {
raw, err := dial(ctx, sock.Addr)
if err != nil {
return err
}

sock.conn = rpc.NewConn(rpc.NewStreamTransport(raw), &rpc.Options{
ErrorReporter: ErrorReporter{Logger: sock.Logger},
BootstrapClient: sock.BootstrapClient,
})

return nil
}

func dial(ctx context.Context, addr net.Addr) (net.Conn, error) {
dialer := net.Dialer{}
return dialer.DialContext(ctx, addr.Network(), addr.String())
}

0 comments on commit 214ad5e

Please sign in to comment.