Skip to content

Commit

Permalink
Initial work on the appevent module.
Browse files Browse the repository at this point in the history
* Apps now use the appcommon.Hello JSON object as their initial dial message to appserver (this contains the proc_key, and egress connection related fields).
* Apps now listens for appserver connections. This connection is used for event broadcasts (from appserver -> app).
* app.NewClient() now requires appevent.Subscriptions as input. This is used for subscribing to events from appserver.
  • Loading branch information
evanlinjin committed May 22, 2020
1 parent a7a056c commit d21c1d0
Show file tree
Hide file tree
Showing 33 changed files with 718 additions and 239 deletions.
2 changes: 1 addition & 1 deletion cmd/apps/helloworld/helloworld.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (
var log = logrus.New()

func main() {
appC := app.NewClient()
appC := app.NewClient(nil)
defer appC.Close()

if _, err := buildinfo.Get().WriteTo(log.Writer()); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/apps/skychat/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var (
)

func main() {
appC = app.NewClient()
appC = app.NewClient(nil)
defer appC.Close()

if _, err := buildinfo.Get().WriteTo(os.Stdout); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/apps/skysocks-client/skysocks-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func dialServer(appCl *app.Client, pk cipher.PubKey) (net.Conn, error) {
}

func main() {
appC := app.NewClient()
appC := app.NewClient(nil)
defer appC.Close()

skysocks.Log = log
Expand Down
2 changes: 1 addition & 1 deletion cmd/apps/skysocks/skysocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (
var log = logrus.New()

func main() {
appC := app.NewClient()
appC := app.NewClient(nil)
defer appC.Close()

skysocks.Log = log
Expand Down
2 changes: 1 addition & 1 deletion cmd/apps/vpn-client/vpn-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func main() {
log.WithError(err).Fatalln("Invalid VPN server pub key")
}

appClient := app.NewClient()
appClient := app.NewClient(nil)
defer appClient.Close()

log.Infof("Connecting to VPN server %s", serverPK.String())
Expand Down
2 changes: 1 addition & 1 deletion cmd/apps/vpn-server/vpn-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var (
)

func main() {
appClient := app.NewClient()
appClient := app.NewClient(nil)
defer appClient.Close()

osSigs := make(chan os.Signal, 2)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/SkycoinProject/skywire-mainnet
go 1.13

require (
github.com/SkycoinProject/dmsg v0.1.1-0.20200420091742-8c1a3d828a49
github.com/SkycoinProject/dmsg v0.1.1-0.20200521071628-c247b328a193
github.com/SkycoinProject/skycoin v0.27.0
github.com/SkycoinProject/yamux v0.0.0-20191213015001-a36efeefbf6a
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/SkycoinProject/dmsg v0.0.0-20200306152741-acee74fa4514/go.mod h1:DzykXMLlx6Fx0fGjZsCIRas/MIvxW8DZpmDA6f2nCRk=
github.com/SkycoinProject/dmsg v0.1.1-0.20200420091742-8c1a3d828a49 h1:rYqmvSRA+rq6LTne/Ge34T0i4yjSHSwkhk0ER6relWU=
github.com/SkycoinProject/dmsg v0.1.1-0.20200420091742-8c1a3d828a49/go.mod h1:MiX+UG/6fl3g+9rS13/fq7BwUQ2eOlg1yOBOnNf6J6A=
github.com/SkycoinProject/dmsg v0.1.1-0.20200521071628-c247b328a193 h1:xaw9OMf97LLz2KmogFs1WbB8xb7p+8tnZkLTo5dg7fY=
github.com/SkycoinProject/dmsg v0.1.1-0.20200521071628-c247b328a193/go.mod h1:MiX+UG/6fl3g+9rS13/fq7BwUQ2eOlg1yOBOnNf6J6A=
github.com/SkycoinProject/skycoin v0.26.0/go.mod h1:xqPLOKh5B6GBZlGA7B5IJfQmCy7mwimD9NlqxR3gMXo=
github.com/SkycoinProject/skycoin v0.27.0 h1:N3IHxj8ossHOcsxLYOYugT+OaELLncYHJHxbbYLPPmY=
github.com/SkycoinProject/skycoin v0.27.0/go.mod h1:xqPLOKh5B6GBZlGA7B5IJfQmCy7mwimD9NlqxR3gMXo=
Expand Down
64 changes: 64 additions & 0 deletions pkg/app/appcommon/hello.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package appcommon

import (
"encoding/binary"
"encoding/json"
"fmt"
"io"
)

// Hello represents the first JSON object that an app sends the visor.
type Hello struct {
ProcKey ProcKey `json:"proc_key"` // proc key
EgressNet string `json:"egress_net,omitempty"` // network which hosts the RPCEgressGateway of the app
EgressAddr string `json:"egress_addr,omitempty"` // address which hosts the RPCEgressGateway of the app
EventSubs map[string]bool `json:"event_subs,omitempty"` // event subscriptions
}

// String implements fmt.Stringer
func (h *Hello) String() string {
j, err := json.Marshal(h)
if err != nil {
panic(err) // should never happen
}
return string(j)
}

func ReadHello(r io.Reader) (Hello, error) {
sizeRaw := make([]byte, 2)
if _, err := io.ReadFull(r, sizeRaw); err != nil {
return Hello{}, fmt.Errorf("failed to read hello size prefix: %w", err)
}
size := binary.BigEndian.Uint16(sizeRaw)

helloRaw := make([]byte, size)
if _, err := io.ReadFull(r, helloRaw); err != nil {
return Hello{}, fmt.Errorf("failed to read hello data: %w", err)
}

var hello Hello
if err := json.Unmarshal(helloRaw, &hello); err != nil {
return Hello{}, fmt.Errorf("failed to unmarshal hellp data: %w", err)
}

return hello, nil
}

func WriteHello(w io.Writer, hello Hello) error {
helloRaw, err := json.Marshal(hello)
if err != nil {
panic(err) // should never happen
}

raw := make([]byte, 2+len(helloRaw))
size := len(helloRaw)
binary.BigEndian.PutUint16(raw[:2], uint16(size))
if n := copy(raw[2:], helloRaw); n != size {
panic("hello write does not add up")
}

if _, err := w.Write(raw); err != nil {
return fmt.Errorf("failed to write hello data: %w", err)
}
return nil
}
85 changes: 85 additions & 0 deletions pkg/app/appevent/broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package appevent

import (
"context"
"sync"

"github.com/sirupsen/logrus"
)

// Broadcaster combines multiple event rpc clients.
type Broadcaster struct {
log logrus.FieldLogger
clients map[RPCClient]chan error
closed bool
mx sync.Mutex
}

// NewBroadcaster instantiates a Broadcaster
func NewBroadcaster(log logrus.FieldLogger) *Broadcaster {
return &Broadcaster{
log: log,
clients: make(map[RPCClient]chan error),
closed: false,
}
}

// AddClient adds an RPCClient.
func (mc *Broadcaster) AddClient(c RPCClient) {
mc.mx.Lock()
if !mc.closed {
mc.clients[c] = make(chan error, 1)
}
mc.mx.Unlock()
}

// Broadcast broadcasts an event to all subscribed channels of all rpc gateways.
func (mc *Broadcaster) Broadcast(ctx context.Context, e *Event) error {
mc.mx.Lock()
defer mc.mx.Unlock()

if mc.closed {
return ErrSubscriptionsClosed
}

if len(mc.clients) == 0 {
return nil
}

for c, errCh := range mc.clients {
go func(c RPCClient, ch chan error) {
ch <- c.Notify(ctx, e)
}(c, errCh)
}

// delete inactive clients
for c, errCh := range mc.clients {
if err := <-errCh; err != nil {
mc.log.
WithError(err).
WithField("hello", c.Hello().String()).
Info("Deleting egress RPC client.")
close(errCh)
delete(mc.clients, c)
}
}

return nil
}

// Close implements io.Closer
func (mc *Broadcaster) Close() error {
mc.mx.Lock()
defer mc.mx.Unlock()

if mc.closed {
return ErrSubscriptionsClosed
}
mc.closed = true

for c, errCh := range mc.clients {
close(errCh)
delete(mc.clients, c)
}
return nil
}
32 changes: 32 additions & 0 deletions pkg/app/appevent/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package appevent

// Supported TCP events.
const (
TCPDial = "tcp_dial"
TCPClose = "tcp_close"
)

// Supported dmsg events.
const (
DmsgSessionDial = "dmsg_session_dial"
DmsgSessionClose = "dmsg_session_close"
)

// typeAssociations contains event associations.
// key: event type
// value: a list of event types that also triggers if the 'key' event is triggered.
var typeAssociations = map[string][]string{
DmsgSessionDial: {TCPDial},
DmsgSessionClose: {TCPClose},
}

// AllAssociatedTypes obtains a list of associated event types (inclusive of the inputted type).
func AllAssociatedTypes(eventType string) []string {
return append(typeAssociations[eventType], eventType)
}

// Event represents an event that is to be broadcasted.
type Event struct {
Type string
Data []byte
}
111 changes: 111 additions & 0 deletions pkg/app/appevent/rpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package appevent

import (
"context"
"fmt"
"io"
"net/rpc"

"github.com/SkycoinProject/skycoin/src/util/logging"
"github.com/sirupsen/logrus"

"github.com/SkycoinProject/skywire-mainnet/pkg/app/appcommon"
"github.com/SkycoinProject/skywire-mainnet/pkg/util/rpcutil"
)

// RPCGateway represents the RPC gateway for visor -> app communication.
// This is for sending visor events which the app is subscribed to.
type RPCGateway struct {
log logrus.FieldLogger
subs *Subscriptions
}

// NewRPCGateway returns a new RPCGateway.
func NewRPCGateway(log logrus.FieldLogger, subs *Subscriptions) *RPCGateway {
if log == nil {
log = logging.MustGetLogger("app_rpc_egress_gateway")
}
if subs == nil {
panic("'subs' input cannot be nil")
}
return &RPCGateway{log: log, subs: subs}
}

// Notify notifies the app about events.
func (g *RPCGateway) Notify(e *Event, _ *struct{}) (err error) {
defer rpcutil.LogCall(g.log, "Notify", e)(nil, &err)
return PushToSubscriptions(g.subs, e)
}

//go:generate mockery -name RPCEgressClient -case underscore -inpkg

// RPCClient describes the RPC client interface that communicates the NewRPCGateway.
type RPCClient interface {
io.Closer
Notify(ctx context.Context, e *Event) error
Hello() *appcommon.Hello
}

// NewRPCClient constructs a new 'rpcClient'.
func NewRPCClient(hello *appcommon.Hello) (RPCClient, error) {
if hello.EgressNet == "" || hello.EgressAddr == "" {
return &emptyRPCClient{hello: hello}, nil
}

rpcC, err := rpc.Dial(hello.EgressNet, hello.EgressAddr)
if err != nil {
return nil, fmt.Errorf("failed to dial RPC: %w", err)
}
return &rpcClient{rpcC: rpcC, hello: hello}, nil
}

type rpcClient struct {
rpcC *rpc.Client
hello *appcommon.Hello
}

// Notify sends a notify to the rpc gateway.
func (c *rpcClient) Notify(ctx context.Context, e *Event) error {
// only send subscribed event
ok := false
for _, t := range AllAssociatedTypes(e.Type) {
if ok = c.hello.EventSubs[t]; ok {
break
}
}
if !ok {
return nil
}

call := c.rpcC.Go(c.formatMethod("Notify"), e, nil, nil)
select {
case <-call.Done:
return call.Error
case <-ctx.Done():
return ctx.Err()
}
}

// Hello returns the internal hello object.
func (c *rpcClient) Hello() *appcommon.Hello {
return c.hello
}

// Close implements io.Closer
func (c *rpcClient) Close() error {
return c.rpcC.Close()
}

// formatMethod formats complete RPC method signature.
func (c *rpcClient) formatMethod(method string) string {
const methodFmt = "%s.%s"
return fmt.Sprintf(methodFmt, c.hello.ProcKey.String(), method)
}

type emptyRPCClient struct {
hello *appcommon.Hello
}

func (c *emptyRPCClient) Notify(_ context.Context, _ *Event) error { return nil }
func (c *emptyRPCClient) Hello() *appcommon.Hello { return c.hello }
func (c *emptyRPCClient) Close() error { return nil }
Loading

0 comments on commit d21c1d0

Please sign in to comment.