From 1820d44ffb0c9748799d93e29f7d13fbc2bea3ad Mon Sep 17 00:00:00 2001 From: ersonp Date: Wed, 2 Nov 2022 14:32:05 +0530 Subject: [PATCH 01/30] Fix Latency in network stats The latency in networkStats is stored as uint32 miliseconds but while converting it to time.Duration, it requires input of nanoseconds. Before we were passing in uint32 miliseconds instead of nanoseconds which caused the time.duration to be as ns and not ms. To fix this we multiply the latencyMs with a miliseconds with a nanosecond integer count to convert our latencyMs from milisecond to nanosecond. --- pkg/router/network_stats.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/router/network_stats.go b/pkg/router/network_stats.go index b3fec7225..f954ec2a1 100644 --- a/pkg/router/network_stats.go +++ b/pkg/router/network_stats.go @@ -31,8 +31,8 @@ func (s *networkStats) SetLatency(latency uint32) { func (s *networkStats) Latency() time.Duration { latencyMs := atomic.LoadUint32(&s.latency) - - return time.Duration(latencyMs) + // the latency is store in uint32 of millisecond but time.Duration takes nanosecond + return time.Duration(latencyMs * uint32(time.Millisecond.Nanoseconds())) } func (s *networkStats) SetUploadSpeed(speed uint32) { From af45c902941db30845794a55f3b22ef489b9e0b5 Mon Sep 17 00:00:00 2001 From: ersonp Date: Wed, 2 Nov 2022 14:45:52 +0530 Subject: [PATCH 02/30] Add visor ping subcommand This commit adds the subcommand ping to the command visor. It takes one argument that is a remote PK. This is the first version of the ping where we are pinging the skychat app on the skynet and retrieving the latency of the connection. --- cmd/skywire-cli/commands/visor/ping.go | 30 ++++++++++++++++++ pkg/visor/api.go | 44 ++++++++++++++++++++++++++ pkg/visor/rpc.go | 8 +++++ pkg/visor/rpc_client.go | 12 +++++++ 4 files changed, 94 insertions(+) create mode 100644 cmd/skywire-cli/commands/visor/ping.go diff --git a/cmd/skywire-cli/commands/visor/ping.go b/cmd/skywire-cli/commands/visor/ping.go new file mode 100644 index 000000000..f4fd153fd --- /dev/null +++ b/cmd/skywire-cli/commands/visor/ping.go @@ -0,0 +1,30 @@ +// Package clivisor cmd/skywire-cli/commands/visor/ping.go +package clivisor + +import ( + "fmt" + + "github.com/spf13/cobra" + + clirpc "github.com/skycoin/skywire/cmd/skywire-cli/commands/rpc" + "github.com/skycoin/skywire/cmd/skywire-cli/internal" +) + +func init() { + RootCmd.AddCommand(testCmd) +} + +var testCmd = &cobra.Command{ + Use: "ping ", + Short: "Return routing rule by route ID key", + Long: "\n Return routing rule by route ID key", + Args: cobra.MinimumNArgs(1), + Run: func(cmd *cobra.Command, args []string) { + pk := internal.ParsePK(cmd.Flags(), "pk", args[0]) + + latency, err := clirpc.Client(cmd.Flags()).TestRouting(pk) + internal.Catch(cmd.Flags(), err) + internal.PrintOutput(cmd.Flags(), latency, fmt.Sprintf(latency+"\n")) + + }, +} diff --git a/pkg/visor/api.go b/pkg/visor/api.go index 53f46d5b5..a1f37f8bd 100644 --- a/pkg/visor/api.go +++ b/pkg/visor/api.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "errors" "fmt" + "net" "net/http" "os" "os/exec" @@ -21,6 +22,7 @@ import ( "github.com/skycoin/skywire-utilities/pkg/cipher" "github.com/skycoin/skywire-utilities/pkg/logging" "github.com/skycoin/skywire-utilities/pkg/netutil" + "github.com/skycoin/skywire/pkg/app/appnet" "github.com/skycoin/skywire/pkg/app/appserver" "github.com/skycoin/skywire/pkg/routing" "github.com/skycoin/skywire/pkg/servicedisc" @@ -77,6 +79,7 @@ type API interface { RoutingRule(key routing.RouteID) (routing.Rule, error) SaveRoutingRule(rule routing.Rule) error RemoveRoutingRule(key routing.RouteID) error + TestRouting(pk cipher.PubKey) (string, error) RouteGroups() ([]RouteGroupInfo, error) @@ -795,6 +798,47 @@ func (v *Visor) RoutingRules() ([]routing.Rule, error) { return v.router.Rules(), nil } +// TestRouting implements API. +func (v *Visor) TestRouting(pk cipher.PubKey) (string, error) { + addr := appnet.Addr{ + Net: appnet.TypeSkynet, + PubKey: pk, + Port: 1, + } + + var err error + var conn net.Conn + + ctx := context.TODO() + var r = netutil.NewRetrier(v.log, 2*time.Second, netutil.DefaultMaxBackoff, 5, 2) + err = r.Do(ctx, func() error { + conn, err = appnet.Dial(addr) + return err + }) + if err != nil { + return "", err + } + + defer func() { + err = conn.Close() + }() + + skywireConn, isSkywireConn := conn.(*appnet.SkywireConn) + if !isSkywireConn { + return "", fmt.Errorf("Can't get such info from this conn") + } + + var latency time.Duration + err = r.Do(ctx, func() error { + latency = skywireConn.Latency() + if latency.String() != "0s" { + return nil + } + return fmt.Errorf("unable to retrieve latency") + }) + return fmt.Sprint(latency), nil +} + // RoutingRule implements API. func (v *Visor) RoutingRule(key routing.RouteID) (routing.Rule, error) { return v.router.Rule(key) diff --git a/pkg/visor/rpc.go b/pkg/visor/rpc.go index 6f365f279..47a278df0 100644 --- a/pkg/visor/rpc.go +++ b/pkg/visor/rpc.go @@ -493,6 +493,14 @@ func (r *RPC) RoutingRules(_ *struct{}, out *[]routing.Rule) (err error) { return err } +// TestRouting obtains all routing rules of the RoutingTable. +func (r *RPC) TestRouting(pk *cipher.PubKey, out *string) (err error) { + defer rpcutil.LogCall(r.log, "TestRouting", pk)(out, &err) + + *out, err = r.visor.TestRouting(*pk) + return err +} + // RoutingRule obtains a routing rule of given RouteID. func (r *RPC) RoutingRule(key *routing.RouteID, rule *routing.Rule) (err error) { defer rpcutil.LogCall(r.log, "RoutingRule", key)(rule, &err) diff --git a/pkg/visor/rpc_client.go b/pkg/visor/rpc_client.go index e7d33162d..56a302f31 100644 --- a/pkg/visor/rpc_client.go +++ b/pkg/visor/rpc_client.go @@ -353,6 +353,13 @@ func (rc *rpcClient) RoutingRules() ([]routing.Rule, error) { return entries, err } +// TestRouting calls RoutingRules. +func (rc *rpcClient) TestRouting(pk cipher.PubKey) (string, error) { + var latency string + err := rc.Call("TestRouting", &pk, &latency) + return latency, err +} + // RoutingRule calls RoutingRule. func (rc *rpcClient) RoutingRule(key routing.RouteID) (routing.Rule, error) { var rule routing.Rule @@ -950,6 +957,11 @@ func (mc *mockRPCClient) RoutingRules() ([]routing.Rule, error) { return mc.rt.AllRules(), nil } +// RoutingRules implements API. +func (mc *mockRPCClient) TestRouting(_ cipher.PubKey) (string, error) { + return "", nil +} + // RoutingRule implements API. func (mc *mockRPCClient) RoutingRule(key routing.RouteID) (routing.Rule, error) { return mc.rt.Rule(key) From d942ec88fbb62f52b3846e787d8b2e826f9c1798 Mon Sep 17 00:00:00 2001 From: ersonp Date: Wed, 2 Nov 2022 15:26:22 +0530 Subject: [PATCH 03/30] Fix summarie in proc after the Latency change The json of summary is expected to have the latency in ms but we were sending it in ns after the recent change. So we convert it back to ms while creating the summary. --- pkg/app/appserver/proc.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/app/appserver/proc.go b/pkg/app/appserver/proc.go index 3f10d673d..cff2aa959 100644 --- a/pkg/app/appserver/proc.go +++ b/pkg/app/appserver/proc.go @@ -426,10 +426,10 @@ func (p *Proc) ConnectionsSummary() []ConnectionSummary { }) return true } - summaries = append(summaries, ConnectionSummary{ - IsAlive: skywireConn.IsAlive(), - Latency: skywireConn.Latency(), + IsAlive: skywireConn.IsAlive(), + // Latency in summary is expected to be in ms and not ns so we change the base to ms + Latency: time.Duration(skywireConn.Latency().Milliseconds()), UploadSpeed: skywireConn.UploadSpeed(), DownloadSpeed: skywireConn.DownloadSpeed(), BandwidthSent: skywireConn.BandwidthSent(), From c6443691866cd6bd54881a3a0b3554107a414134 Mon Sep 17 00:00:00 2001 From: ersonp Date: Mon, 7 Nov 2022 11:23:55 +0530 Subject: [PATCH 04/30] test --- pkg/skyenv/values.go | 3 +++ pkg/visor/init.go | 31 ++++++++++++++++++++++++++++++- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/pkg/skyenv/values.go b/pkg/skyenv/values.go index 6b92596b2..81e3a4393 100644 --- a/pkg/skyenv/values.go +++ b/pkg/skyenv/values.go @@ -61,6 +61,9 @@ const ( SkychatPort uint16 = 1 SkychatAddr = ":8001" + PingTestName = "pingtest" + PingTestPort uint16 = 2 + SkysocksName = "skysocks" SkysocksPort uint16 = 3 diff --git a/pkg/visor/init.go b/pkg/visor/init.go index fdcb737e8..33c562a77 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -118,6 +118,8 @@ var ( dmsgHTTP vinit.Module // Dmsg trackers module dmsgTrackers vinit.Module + // Ping module + ping vinit.Module // visor that groups all modules together vis vinit.Module ) @@ -155,8 +157,9 @@ func registerModules(logger *logging.MasterLogger) { trs = maker("transport_setup", initTransportSetup, &dmsgC, &tr) tm = vinit.MakeModule("transports", vinit.DoNothing, logger, &sc, &sudphC, &dmsgCtrl, &dmsgHTTPLogServer, &dmsgTrackers) pvs = maker("public_visor", initPublicVisor, &tr, &ar, &disc, &stcprC) + ping = maker("ping", initPing, &dmsgC, &tm) vis = vinit.MakeModule("visor", vinit.DoNothing, logger, &ebc, &ar, &disc, &pty, - &tr, &rt, &launch, &cli, &hvs, &ut, &pv, &pvs, &trs, &stcpC, &stcprC) + &tr, &rt, &launch, &cli, &hvs, &ut, &pv, &pvs, &trs, &stcpC, &stcprC, &ping) hv = maker("hypervisor", initHypervisor, &vis) } @@ -557,6 +560,32 @@ func initTransportSetup(ctx context.Context, v *Visor, log *logging.Logger) erro return nil } +func initPing(ctx context.Context, v *Visor, log *logging.Logger) error { + // ctx, cancel := context.WithCancel(ctx) + // To remove the block set by NewTransportListener if dmsg is not initialized + // go func() { + // ts, err := ts.NewTransportListener(ctx, v.conf, v.dmsgC, v.tpM, v.MasterLogger()) + // if err != nil { + // log.Warn(err) + // cancel() + // } + // select { + // case <-ctx.Done(): + // default: + // go ts.Serve(ctx) + // } + // }() + + // waiting for at least one transport to initialize + <-v.tpM.Ready() + + // v.pushCloseStack("transport_setup.rpc", func() error { + // cancel() + // return nil + // }) + return nil +} + // getRouteSetupHooks aka autotransport func getRouteSetupHooks(ctx context.Context, v *Visor, log *logging.Logger) []router.RouteSetupHook { retrier := netutil.NewRetrier(log, time.Second, time.Second*20, 3, 1.3) From d442f4bd0e74525a391a0aba28951ede8faf13b7 Mon Sep 17 00:00:00 2001 From: ersonp Date: Fri, 2 Dec 2022 14:17:57 +0530 Subject: [PATCH 05/30] Add ping app module --- pkg/skyenv/values.go | 3 ++ pkg/visor/init.go | 74 ++++++++++++++++++++++++++++++++------------ 2 files changed, 58 insertions(+), 19 deletions(-) diff --git a/pkg/skyenv/values.go b/pkg/skyenv/values.go index 81e3a4393..19e55a8c0 100644 --- a/pkg/skyenv/values.go +++ b/pkg/skyenv/values.go @@ -77,6 +77,9 @@ const ( VPNClientName = "vpn-client" // TODO(darkrengarius): this one's not needed for the app to run but lack of it causes errors VPNClientPort uint16 = 43 + + SkyPingName = "sky-ping" + SkyPingPort uint16 = 48 ) // RPC constants. diff --git a/pkg/visor/init.go b/pkg/visor/init.go index 33c562a77..0050faafa 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -32,11 +32,13 @@ import ( "github.com/skycoin/skywire/internal/vpn" "github.com/skycoin/skywire/pkg/app/appdisc" "github.com/skycoin/skywire/pkg/app/appevent" + "github.com/skycoin/skywire/pkg/app/appnet" "github.com/skycoin/skywire/pkg/app/appserver" "github.com/skycoin/skywire/pkg/app/launcher" "github.com/skycoin/skywire/pkg/dmsgc" "github.com/skycoin/skywire/pkg/routefinder/rfclient" "github.com/skycoin/skywire/pkg/router" + "github.com/skycoin/skywire/pkg/routing" "github.com/skycoin/skywire/pkg/servicedisc" "github.com/skycoin/skywire/pkg/setup/setupclient" "github.com/skycoin/skywire/pkg/skyenv" @@ -561,31 +563,65 @@ func initTransportSetup(ctx context.Context, v *Visor, log *logging.Logger) erro } func initPing(ctx context.Context, v *Visor, log *logging.Logger) error { - // ctx, cancel := context.WithCancel(ctx) - // To remove the block set by NewTransportListener if dmsg is not initialized - // go func() { - // ts, err := ts.NewTransportListener(ctx, v.conf, v.dmsgC, v.tpM, v.MasterLogger()) - // if err != nil { - // log.Warn(err) - // cancel() - // } - // select { - // case <-ctx.Done(): - // default: - // go ts.Serve(ctx) - // } - // }() - + ctx, cancel := context.WithCancel(ctx) // waiting for at least one transport to initialize <-v.tpM.Ready() - // v.pushCloseStack("transport_setup.rpc", func() error { - // cancel() - // return nil - // }) + connApp := appnet.Addr{ + Net: appnet.TypeSkynet, + PubKey: v.conf.PK, + Port: routing.Port(skyenv.SkyPingPort), + } + l, err := appnet.ListenContext(ctx, connApp) + if err != nil { + cancel() + return err + } + + v.pushCloseStack("skywire_proxy", func() error { + cancel() + if cErr := l.Close(); cErr != nil { + log.WithError(cErr).Error("Error closing listener.") + } + return nil + }) + + go func() { + for { + log.Debug("Accepting sky proxy conn...") + conn, err := l.Accept() + if err != nil { + log.WithError(err).Error("Failed to accept conn") + return + } + log.Debug("Accepted sky proxy conn") + log.Error("11111111111111111111111111111111111111111111111111") + + log.Debug("Wrapping conn...") + wrappedConn, err := appnet.WrapConn(conn) + if err != nil { + log.WithError(err).Error("Failed to wrap conn") + return + } + + rAddr := wrappedConn.RemoteAddr().(appnet.Addr) + log.Debugf("Accepted sky proxy conn on %s from %s", wrappedConn.LocalAddr(), rAddr.PubKey) + go handleServerConn(log, wrappedConn) + } + }() return nil } +func handleServerConn(log *logging.Logger, remoteConn net.Conn) { + buf := make([]byte, 32*1024) + n, err := remoteConn.Read(buf) + if err != nil { + log.WithError(err).Error("Failed to read packet") + return + } + log.Debugf("Received: %s", buf[:n]) +} + // getRouteSetupHooks aka autotransport func getRouteSetupHooks(ctx context.Context, v *Visor, log *logging.Logger) []router.RouteSetupHook { retrier := netutil.NewRetrier(log, time.Second, time.Second*20, 3, 1.3) From 03ca1bf277aa734774962f9c6010c58e7a5e56bd Mon Sep 17 00:00:00 2001 From: ersonp Date: Fri, 2 Dec 2022 14:20:41 +0530 Subject: [PATCH 06/30] Add ping method to networker interface --- pkg/app/appnet/dmsg_networker.go | 7 +++++ pkg/app/appnet/mock_networker.go | 45 +++++++++++++++++++++++++++-- pkg/app/appnet/networker.go | 14 ++++++++- pkg/app/appnet/skywire_networker.go | 28 ++++++++++++++++++ 4 files changed, 91 insertions(+), 3 deletions(-) diff --git a/pkg/app/appnet/dmsg_networker.go b/pkg/app/appnet/dmsg_networker.go index 792d2b50c..c7cdeec81 100644 --- a/pkg/app/appnet/dmsg_networker.go +++ b/pkg/app/appnet/dmsg_networker.go @@ -6,6 +6,8 @@ import ( "net" "github.com/skycoin/dmsg/pkg/dmsg" + + "github.com/skycoin/skywire-utilities/pkg/cipher" ) // DmsgNetworker implements `Networker` for dmsg network. @@ -25,6 +27,11 @@ func (n *DmsgNetworker) Dial(addr Addr) (net.Conn, error) { return n.DialContext(context.Background(), addr) } +// Ping dials remote `addr` via dmsg network. +func (n *DmsgNetworker) Ping(pk cipher.PubKey, addr Addr) (net.Conn, error) { + return n.DialContext(context.Background(), addr) +} + // DialContext dials remote `addr` via dmsg network with context. func (n *DmsgNetworker) DialContext(ctx context.Context, addr Addr) (net.Conn, error) { remote := dmsg.Addr{ diff --git a/pkg/app/appnet/mock_networker.go b/pkg/app/appnet/mock_networker.go index 2258fc855..3c04b9ef0 100644 --- a/pkg/app/appnet/mock_networker.go +++ b/pkg/app/appnet/mock_networker.go @@ -1,12 +1,15 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. +// Code generated by mockery v2.14.1. DO NOT EDIT. package appnet import ( context "context" - net "net" + + cipher "github.com/skycoin/skywire-utilities/pkg/cipher" mock "github.com/stretchr/testify/mock" + + net "net" ) // MockNetworker is an autogenerated mock type for the Networker type @@ -105,3 +108,41 @@ func (_m *MockNetworker) ListenContext(ctx context.Context, addr Addr) (net.List return r0, r1 } + +// Ping provides a mock function with given fields: pk, addr +func (_m *MockNetworker) Ping(pk cipher.PubKey, addr Addr) (net.Conn, error) { + ret := _m.Called(pk, addr) + + var r0 net.Conn + if rf, ok := ret.Get(0).(func(cipher.PubKey, Addr) net.Conn); ok { + r0 = rf(pk, addr) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(net.Conn) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(cipher.PubKey, Addr) error); ok { + r1 = rf(pk, addr) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type mockConstructorTestingTNewMockNetworker interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockNetworker creates a new instance of MockNetworker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockNetworker(t mockConstructorTestingTNewMockNetworker) *MockNetworker { + mock := &MockNetworker{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/app/appnet/networker.go b/pkg/app/appnet/networker.go index 19267ce90..5b803ec4e 100644 --- a/pkg/app/appnet/networker.go +++ b/pkg/app/appnet/networker.go @@ -6,9 +6,11 @@ import ( "errors" "net" "sync" + + "github.com/skycoin/skywire-utilities/pkg/cipher" ) -//go:generate mockery -name Networker -case underscore -inpkg +//go:generate mockery --name Networker --case underscore --inpackage var ( // ErrNoSuchNetworker is being returned when there's no suitable networker. @@ -63,6 +65,7 @@ func ClearNetworkers() { // Networker defines basic network operations, such as Dial/Listen. type Networker interface { Dial(addr Addr) (net.Conn, error) + Ping(pk cipher.PubKey, addr Addr) (net.Conn, error) DialContext(ctx context.Context, addr Addr) (net.Conn, error) Listen(addr Addr) (net.Listener, error) ListenContext(ctx context.Context, addr Addr) (net.Listener, error) @@ -73,6 +76,15 @@ func Dial(addr Addr) (net.Conn, error) { return DialContext(context.Background(), addr) } +// Ping dials the remote `addr`. +func Ping(pk cipher.PubKey, addr Addr) (net.Conn, error) { + n, err := ResolveNetworker(addr.Net) + if err != nil { + return nil, err + } + return n.Ping(pk, addr) +} + // DialContext dials the remote `addr` with the context. func DialContext(ctx context.Context, addr Addr) (net.Conn, error) { n, err := ResolveNetworker(addr.Net) diff --git a/pkg/app/appnet/skywire_networker.go b/pkg/app/appnet/skywire_networker.go index dd609dfad..292ad2156 100644 --- a/pkg/app/appnet/skywire_networker.go +++ b/pkg/app/appnet/skywire_networker.go @@ -12,6 +12,7 @@ import ( "github.com/sirupsen/logrus" + "github.com/skycoin/skywire-utilities/pkg/cipher" "github.com/skycoin/skywire-utilities/pkg/netutil" "github.com/skycoin/skywire/pkg/router" "github.com/skycoin/skywire/pkg/routing" @@ -70,6 +71,33 @@ func (r *SkywireNetworker) DialContext(ctx context.Context, addr Addr) (conn net }, nil } +// Ping dials remote `addr` via `skynet` with context. +func (r *SkywireNetworker) Ping(pk cipher.PubKey, addr Addr) (net.Conn, error) { + ctx := context.Background() + localPort, freePort, err := r.porter.ReserveEphemeral(ctx, nil) + if err != nil { + return nil, err + } + + // ensure ports are freed on error. + defer func() { + if err != nil { + freePort() + } + }() + r.log.Error("11111111111111111111111") + conn, err := r.r.PingRoute(ctx, pk, routing.Port(localPort), addr.Port, router.DefaultDialOptions()) + if err != nil { + return nil, err + } + + return &SkywireConn{ + Conn: conn, + nrg: conn.(*router.NoiseRouteGroup), + freePort: freePort, + }, nil +} + // Listen starts listening on local `addr` in the skynet. func (r *SkywireNetworker) Listen(addr Addr) (net.Listener, error) { return r.ListenContext(context.Background(), addr) From 3a9fae0a4365c81b7a48bf66c9c6c22157befab0 Mon Sep 17 00:00:00 2001 From: ersonp Date: Fri, 2 Dec 2022 14:21:18 +0530 Subject: [PATCH 07/30] Add PingRoute method to router --- pkg/router/mock_router.go | 46 ++++++++++++- pkg/router/route_group.go | 1 + pkg/router/router.go | 134 ++++++++++++++++++++++++++++++++++++-- 3 files changed, 172 insertions(+), 9 deletions(-) diff --git a/pkg/router/mock_router.go b/pkg/router/mock_router.go index 0f681a374..12f03af4f 100644 --- a/pkg/router/mock_router.go +++ b/pkg/router/mock_router.go @@ -1,14 +1,16 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. +// Code generated by mockery v2.14.1. DO NOT EDIT. package router import ( context "context" - net "net" + + cipher "github.com/skycoin/skywire-utilities/pkg/cipher" mock "github.com/stretchr/testify/mock" - cipher "github.com/skycoin/skywire-utilities/pkg/cipher" + net "net" + routing "github.com/skycoin/skywire/pkg/routing" ) @@ -96,6 +98,29 @@ func (_m *MockRouter) IntroduceRules(rules routing.EdgeRules) error { return r0 } +// PingRoute provides a mock function with given fields: ctx, rPK, lPort, rPort, opts +func (_m *MockRouter) PingRoute(ctx context.Context, rPK cipher.PubKey, lPort routing.Port, rPort routing.Port, opts *DialOptions) (net.Conn, error) { + ret := _m.Called(ctx, rPK, lPort, rPort, opts) + + var r0 net.Conn + if rf, ok := ret.Get(0).(func(context.Context, cipher.PubKey, routing.Port, routing.Port, *DialOptions) net.Conn); ok { + r0 = rf(ctx, rPK, lPort, rPort, opts) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(net.Conn) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, cipher.PubKey, routing.Port, routing.Port, *DialOptions) error); ok { + r1 = rf(ctx, rPK, lPort, rPort, opts) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // ReserveKeys provides a mock function with given fields: n func (_m *MockRouter) ReserveKeys(n int) ([]routing.RouteID, error) { ret := _m.Called(n) @@ -233,3 +258,18 @@ func (_m *MockRouter) SetupIsTrusted(_a0 cipher.PubKey) bool { return r0 } + +type mockConstructorTestingTNewMockRouter interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockRouter creates a new instance of MockRouter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockRouter(t mockConstructorTestingTNewMockRouter) *MockRouter { + mock := &MockRouter{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index 9b8293e9b..b8e134fb7 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -650,6 +650,7 @@ func (rg *RouteGroup) handlePacket(packet routing.Packet) error { return rg.handleDataPacket(packet) case routing.HandshakePacket: rg.handshakeProcessedOnce.Do(func() { + rg.logger.Warnf("HandshakePacket here") // first packet is handshake packet, so we're communicating with the new visor rg.encrypt = true if packet.Payload()[0] == 0 { diff --git a/pkg/router/router.go b/pkg/router/router.go index a4911935d..7a45829ef 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -25,7 +25,7 @@ import ( "github.com/skycoin/skywire/pkg/transport/network" ) -//go:generate mockery -name Router -case underscore -inpkg +//go:generate mockery --name Router --case underscore --inpackage const ( // DefaultRouteKeepAlive is the default expiration interval for routes @@ -126,6 +126,7 @@ type Router interface { // - Save to routing.Table and internal RouteGroup map. // - Return RouteGroup if successful. DialRoutes(ctx context.Context, rPK cipher.PubKey, lPort, rPort routing.Port, opts *DialOptions) (net.Conn, error) + PingRoute(ctx context.Context, rPK cipher.PubKey, lPort, rPort routing.Port, opts *DialOptions) (net.Conn, error) // AcceptRoutes should block until we receive an AddRules packet from SetupNode // that contains ConsumeRule(s) or ForwardRule(s). @@ -292,7 +293,7 @@ func (r *router) DialRoutes( Initiator: true, } - nrg, err := r.saveRouteGroupRules(rules, nsConf) + nrg, err := r.saveRouteGroupRules(rules, nsConf, "one") if err != nil { return nil, fmt.Errorf("saveRouteGroupRules: %w", err) } @@ -304,6 +305,89 @@ func (r *router) DialRoutes( return nrg, nil } +// PingRoute dials to a given visor of 'rPK'. +// 'lPort'/'rPort' specifies the local/remote ports respectively. +// A nil 'opts' input results in a value of '1' for all DialOptions fields. +// A single call to DialRoutes should perform the following: +// - Find routes via RouteFinder (in one call). +// - Setup routes via SetupNode (in one call). +// - Save to routing.Table and internal RouteGroup map. +// - Return RouteGroup if successful. +func (r *router) PingRoute( + ctx context.Context, + rPK cipher.PubKey, + lPort, rPort routing.Port, + opts *DialOptions, +) (net.Conn, error) { + + if rPK.Null() { + err := ErrRemoteEmptyPK + r.logger.WithError(err).Error("Failed to dial routes.") + return nil, fmt.Errorf("failed to dial routes: %w", err) + } + + r.logger.Errorf("lPort :%v", lPort) + r.logger.Errorf("rPort :%v", rPort) + lPK := r.conf.PubKey + forwardDesc := routing.NewRouteDescriptor(lPK, lPK, lPort, rPort) + + r.routeSetupHookMu.Lock() + defer r.routeSetupHookMu.Unlock() + if len(r.routeSetupHooks) != 0 { + for _, rsf := range r.routeSetupHooks { + if err := rsf(rPK, r.tm); err != nil { + return nil, err + } + } + } + + // check if transports are available + ok := r.checkIfTransportAvailable() + if !ok { + return nil, ErrNoTransportFound + } + forwardPath, reversePath, err := r.fetchPingRoute(lPK, rPK, opts) + if err != nil { + return nil, fmt.Errorf("route finder: %w", err) + } + + req := routing.BidirectionalRoute{ + Desc: forwardDesc, + KeepAlive: DefaultRouteKeepAlive, + Forward: forwardPath, + Reverse: reversePath, + } + + rules, err := r.conf.RouteGroupDialer.Dial(ctx, r.logger, r.dmsgC, r.conf.SetupNodes, req) + if err != nil { + r.logger.WithError(err).Error("Error dialing route group") + return nil, err + } + + if err := r.SaveRoutingRules(rules.Forward, rules.Reverse); err != nil { + r.logger.WithError(err).Error("Error saving routing rules") + return nil, err + } + + nsConf := noise.Config{ + LocalPK: r.conf.PubKey, + LocalSK: r.conf.SecKey, + RemotePK: rPK, + Initiator: true, + } + + nrg, err := r.saveRouteGroupRules(rules, nsConf, "two") + if err != nil { + return nil, fmt.Errorf("saveRouteGroupRules: %w", err) + } + + nrg.rg.startOffServiceLoops() + + r.logger.Debugf("Created new routes to %s on port %d", lPK, lPort) + + return nrg, nil +} + // AcceptRoutes should block until we receive an AddRules packet from SetupNode // that contains ConsumeRule(s) or ForwardRule(s). // Then the following should happen: @@ -343,7 +427,7 @@ func (r *router) AcceptRoutes(ctx context.Context) (net.Conn, error) { Initiator: false, } - nrg, err := r.saveRouteGroupRules(rules, nsConf) + nrg, err := r.saveRouteGroupRules(rules, nsConf, "three") if err != nil { return nil, fmt.Errorf("saveRouteGroupRules: %w", err) } @@ -413,7 +497,7 @@ func (r *router) serveSetup() { } } -func (r *router) saveRouteGroupRules(rules routing.EdgeRules, nsConf noise.Config) (*NoiseRouteGroup, error) { +func (r *router) saveRouteGroupRules(rules routing.EdgeRules, nsConf noise.Config, test string) (*NoiseRouteGroup, error) { r.logger.Debugf("Saving route group rules with desc: %s", &rules.Desc) // When route group is wrapped with noise, it's put into `nrgs`. but before that, @@ -433,7 +517,7 @@ func (r *router) saveRouteGroupRules(rules routing.EdgeRules, nsConf noise.Confi // we need to close currently existing wrapped rg if there's one nrg, ok := r.rgsNs[rules.Desc] - r.logger.Debugf("Creating new route group rule with desc: %s", &rules.Desc) + r.logger.Warnf("Creating new route group rule with desc: %s", &rules.Desc) rg := NewRouteGroup(DefaultRouteGroupConfig(), r.rt, rules.Desc, r.mLogger) rg.appendRules(rules.Forward, rules.Reverse, r.tm.Transport(rules.Forward.NextTransportID())) // we put raw rg so it can be accessible to the router when handshake packets come in @@ -494,7 +578,7 @@ func (r *router) saveRouteGroupRules(rules routing.EdgeRules, nsConf noise.Confi // wrapping rg with noise wrappedRG, err := network.EncryptConn(nsConf, rg) if err != nil { - r.logger.WithError(err).Errorf("Failed to wrap route group (%s): %v, closing...", &rules.Desc, err) + r.logger.WithError(err).Errorf("Failed to wrap route group (%s): %v, closing... %v", &rules.Desc, err, test) if err := rg.Close(); err != nil { r.logger.WithError(err).Errorf("Failed to close route group (%s): %v", &rules.Desc, err) } @@ -1006,6 +1090,44 @@ fetchRoutesAgain: return paths[forward][0], paths[backward][0], nil } +func (r *router) fetchPingRoute(src, pingKey cipher.PubKey, opts *DialOptions) (fwd, rev []routing.Hop, err error) { + // TODO: use opts + if opts == nil { + opts = DefaultDialOptions() // nolint + } + + r.logger.Debugf("Requesting new routes from %s to %s", src, src) + + timer := time.NewTimer(retryDuration) + defer timer.Stop() + + forward := [2]cipher.PubKey{src, src} + backward := [2]cipher.PubKey{src, src} + +fetchRoutesAgain: + ctx := context.Background() + + paths, err := r.conf.RouteFinder.FindRoutes(ctx, []routing.PathEdges{forward, backward}, + &rfclient.RouteOptions{MinHops: 0, MaxHops: 2}) + if err == rfclient.ErrTransportNotFound { + return nil, nil, err + } + + if err != nil { + select { + case <-timer.C: + return nil, nil, err + default: + time.Sleep(retryInterval) + goto fetchRoutesAgain + } + } + + r.logger.Debugf("Found routes Forward: %s. Reverse %s", paths[forward], paths[backward]) + + return paths[forward][0], paths[backward][0], nil +} + // SetupIsTrusted checks if setup node is trusted. func (r *router) SetupIsTrusted(sPK cipher.PubKey) bool { _, ok := r.trustedVisors[sPK] From 4f0d06498f725d8d68f8405852cbd992025fb695 Mon Sep 17 00:00:00 2001 From: ersonp Date: Fri, 2 Dec 2022 14:21:40 +0530 Subject: [PATCH 08/30] Use Ping instead of Dial in API --- pkg/visor/api.go | 35 ++++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/pkg/visor/api.go b/pkg/visor/api.go index a1f37f8bd..c6bb673fb 100644 --- a/pkg/visor/api.go +++ b/pkg/visor/api.go @@ -802,8 +802,8 @@ func (v *Visor) RoutingRules() ([]routing.Rule, error) { func (v *Visor) TestRouting(pk cipher.PubKey) (string, error) { addr := appnet.Addr{ Net: appnet.TypeSkynet, - PubKey: pk, - Port: 1, + PubKey: v.conf.PK, + Port: routing.Port(skyenv.SkyPingPort), } var err error @@ -812,9 +812,10 @@ func (v *Visor) TestRouting(pk cipher.PubKey) (string, error) { ctx := context.TODO() var r = netutil.NewRetrier(v.log, 2*time.Second, netutil.DefaultMaxBackoff, 5, 2) err = r.Do(ctx, func() error { - conn, err = appnet.Dial(addr) + conn, err = appnet.Ping(pk, addr) return err }) + v.log.Error("zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz") if err != nil { return "", err } @@ -823,19 +824,23 @@ func (v *Visor) TestRouting(pk cipher.PubKey) (string, error) { err = conn.Close() }() - skywireConn, isSkywireConn := conn.(*appnet.SkywireConn) - if !isSkywireConn { - return "", fmt.Errorf("Can't get such info from this conn") - } - + // skywireConn, isSkywireConn := conn.(*appnet.SkywireConn) + // if !isSkywireConn { + // return "", fmt.Errorf("Can't get such info from this conn") + // } + // msh := "asdasdasdasdsa" + // _, err = skywireConn.Write([]byte(msh)) + // if err != nil { + // return "", err + // } var latency time.Duration - err = r.Do(ctx, func() error { - latency = skywireConn.Latency() - if latency.String() != "0s" { - return nil - } - return fmt.Errorf("unable to retrieve latency") - }) + // err = r.Do(ctx, func() error { + // latency = skywireConn.Latency() + // if latency.String() != "0s" { + // return nil + // } + // return fmt.Errorf("unable to retrieve latency") + // }) return fmt.Sprint(latency), nil } From 3105f8f96c9d92bb50a87ee811e25217cc5313d8 Mon Sep 17 00:00:00 2001 From: ersonp Date: Mon, 5 Dec 2022 18:49:46 +0530 Subject: [PATCH 09/30] Fix RulesMap in setup node This commit fixes the issue where the rule used to be the same for fwd and rev when both source and destination pk is the same. This happens becase the pk is used as a key in the RulesMap and in the above case the rule is overwritten instead of creating a new one. We fix this by changing the key to be a string of 'pk:port' instead of just the PK which makes the key unique in all cases. --- pkg/setup/node.go | 42 ++++++++++++++++++++++++++++++------------ pkg/setup/node_test.go | 2 +- pkg/setup/rules_map.go | 11 +++++------ 3 files changed, 36 insertions(+), 19 deletions(-) diff --git a/pkg/setup/node.go b/pkg/setup/node.go index aaebd3b4d..c22d55732 100644 --- a/pkg/setup/node.go +++ b/pkg/setup/node.go @@ -6,6 +6,7 @@ import ( "fmt" "net/http" "net/rpc" + "strings" "time" "github.com/sirupsen/logrus" @@ -128,8 +129,8 @@ func CreateRouteGroup(ctx context.Context, dialer network.Dialer, biRt routing.B // Generate forward and reverse routes. fwdRt, revRt := biRt.ForwardAndReverse() - srcPK := biRt.Desc.SrcPK() - dstPK := biRt.Desc.DstPK() + srcPK := biRt.Desc.Src() + dstPK := biRt.Desc.Dst() // Generate routing rules (for edge and intermediary routers) that are to be sent. // Rules are grouped by rule type [FWD, REV, INTER]. @@ -137,8 +138,8 @@ func CreateRouteGroup(ctx context.Context, dialer network.Dialer, biRt routing.B if err != nil { return routing.EdgeRules{}, err } - initEdge := routing.EdgeRules{Desc: revRt.Desc, Forward: fwdRules[srcPK][0], Reverse: revRules[srcPK][0]} - respEdge := routing.EdgeRules{Desc: fwdRt.Desc, Forward: fwdRules[dstPK][0], Reverse: revRules[dstPK][0]} + initEdge := routing.EdgeRules{Desc: revRt.Desc, Forward: fwdRules[srcPK.String()][0], Reverse: revRules[srcPK.String()][0]} + respEdge := routing.EdgeRules{Desc: fwdRt.Desc, Forward: fwdRules[dstPK.String()][0], Reverse: revRules[dstPK.String()][0]} log.Infof("Generated routing rules:\nInitiating edge: %v\nResponding edge: %v\nIntermediaries: %v", initEdge.String(), respEdge.String(), interRules.String()) @@ -202,8 +203,8 @@ func GenerateRules(idR IDReserver, routes []routing.Route) (fwdRules, revRules, } desc := route.Desc - srcPK := desc.SrcPK() - dstPK := desc.DstPK() + srcAddr := desc.Src() + dstAddr := desc.Dst() srcPort := desc.SrcPort() dstPort := desc.DstPort() @@ -215,19 +216,30 @@ func GenerateRules(idR IDReserver, routes []routing.Route) (fwdRules, revRules, return nil, nil, nil, ErrNoKey } + var port routing.Port + if desc.DstPK() == hop.From { + port = dstPort + } + if desc.SrcPK() == hop.From { + port = srcPort + } + addr := routing.Addr{ + PubKey: hop.From, + Port: port, + } if i == 0 { - rule := routing.ForwardRule(route.KeepAlive, rID, nxtRID, hop.TpID, srcPK, dstPK, srcPort, dstPort) - fwdRules[hop.From] = append(fwdRules[hop.From], rule) + rule := routing.ForwardRule(route.KeepAlive, rID, nxtRID, hop.TpID, srcAddr.PubKey, dstAddr.PubKey, srcPort, dstPort) + fwdRules[addr.String()] = append(fwdRules[addr.String()], rule) } else { rule := routing.IntermediaryForwardRule(route.KeepAlive, rID, nxtRID, hop.TpID) - interRules[hop.From] = append(interRules[hop.From], rule) + interRules[addr.String()] = append(interRules[addr.String()], rule) } rID = nxtRID } - rule := routing.ConsumeRule(route.KeepAlive, rID, srcPK, dstPK, srcPort, dstPort) - revRules[dstPK] = append(revRules[dstPK], rule) + rule := routing.ConsumeRule(route.KeepAlive, rID, srcAddr.PubKey, dstAddr.PubKey, srcPort, dstPort) + revRules[dstAddr.String()] = append(revRules[dstAddr.String()], rule) } return fwdRules, revRules, interRules, nil @@ -248,7 +260,13 @@ func BroadcastIntermediaryRules(ctx context.Context, log logrus.FieldLogger, rtI errCh := make(chan error, len(interRules)) defer close(errCh) - for pk, rules := range interRules { + for addr, rules := range interRules { + var pk cipher.PubKey + stringPK := strings.Split(addr, ":") + err = pk.Set(stringPK[0]) + if err != nil { + return err + } go func(pk cipher.PubKey, rules []routing.Rule) { _, err := rtIDR.Client(pk).AddIntermediaryRules(ctx, rules) if err != nil { diff --git a/pkg/setup/node_test.go b/pkg/setup/node_test.go index a0f03ca37..dbb3000fd 100644 --- a/pkg/setup/node_test.go +++ b/pkg/setup/node_test.go @@ -349,7 +349,7 @@ func randPKs(n int) []cipher.PubKey { func randRulesMap(pks []cipher.PubKey) RulesMap { rules := make(RulesMap, len(pks)) for _, pk := range pks { - rules[pk] = randIntermediaryRules(2) + rules[pk.String()] = randIntermediaryRules(2) } return rules } diff --git a/pkg/setup/rules_map.go b/pkg/setup/rules_map.go index 54f11fbca..c3fa59828 100644 --- a/pkg/setup/rules_map.go +++ b/pkg/setup/rules_map.go @@ -4,24 +4,23 @@ package setup import ( "encoding/json" - "github.com/skycoin/skywire-utilities/pkg/cipher" "github.com/skycoin/skywire/pkg/routing" ) -// RulesMap associates a slice of rules to a visor's public key. -type RulesMap map[cipher.PubKey][]routing.Rule +// RulesMap associates a slice of rules to a visor's string of `public key:port“. +type RulesMap map[string][]routing.Rule // String implements fmt.Stringer func (rm RulesMap) String() string { - out := make(map[cipher.PubKey][]string, len(rm)) + out := make(map[string][]string, len(rm)) - for pk, rules := range rm { + for addr, rules := range rm { str := make([]string, len(rules)) for i, rule := range rules { str[i] = rule.String() } - out[pk] = str + out[addr] = str } jb, err := json.MarshalIndent(out, "", "\t") From eeaa8a1d81881d94d5e715a4fc44a4cee4dcc94d Mon Sep 17 00:00:00 2001 From: ersonp Date: Fri, 9 Dec 2022 12:35:36 +0530 Subject: [PATCH 10/30] Fix noise message authentication error We fix this error by fixgin the noise config in PingRoute by setting the RemotePK the same as LocalPK. --- pkg/router/router.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/router/router.go b/pkg/router/router.go index 7a45829ef..242a92fa1 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -326,8 +326,6 @@ func (r *router) PingRoute( return nil, fmt.Errorf("failed to dial routes: %w", err) } - r.logger.Errorf("lPort :%v", lPort) - r.logger.Errorf("rPort :%v", rPort) lPK := r.conf.PubKey forwardDesc := routing.NewRouteDescriptor(lPK, lPK, lPort, rPort) @@ -372,7 +370,7 @@ func (r *router) PingRoute( nsConf := noise.Config{ LocalPK: r.conf.PubKey, LocalSK: r.conf.SecKey, - RemotePK: rPK, + RemotePK: r.conf.PubKey, Initiator: true, } From ed8274b57622988976c1a6775239854c6c1a81e3 Mon Sep 17 00:00:00 2001 From: ersonp Date: Fri, 9 Dec 2022 13:08:01 +0530 Subject: [PATCH 11/30] Remove test logs --- pkg/app/appnet/skywire_networker.go | 1 - pkg/router/route_group.go | 1 - pkg/router/router.go | 1 - pkg/visor/api.go | 19 +++++++++---------- pkg/visor/init.go | 1 - 5 files changed, 9 insertions(+), 14 deletions(-) diff --git a/pkg/app/appnet/skywire_networker.go b/pkg/app/appnet/skywire_networker.go index 292ad2156..ef306f605 100644 --- a/pkg/app/appnet/skywire_networker.go +++ b/pkg/app/appnet/skywire_networker.go @@ -85,7 +85,6 @@ func (r *SkywireNetworker) Ping(pk cipher.PubKey, addr Addr) (net.Conn, error) { freePort() } }() - r.log.Error("11111111111111111111111") conn, err := r.r.PingRoute(ctx, pk, routing.Port(localPort), addr.Port, router.DefaultDialOptions()) if err != nil { return nil, err diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index b8e134fb7..9b8293e9b 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -650,7 +650,6 @@ func (rg *RouteGroup) handlePacket(packet routing.Packet) error { return rg.handleDataPacket(packet) case routing.HandshakePacket: rg.handshakeProcessedOnce.Do(func() { - rg.logger.Warnf("HandshakePacket here") // first packet is handshake packet, so we're communicating with the new visor rg.encrypt = true if packet.Payload()[0] == 0 { diff --git a/pkg/router/router.go b/pkg/router/router.go index 242a92fa1..ea88b4d7c 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -515,7 +515,6 @@ func (r *router) saveRouteGroupRules(rules routing.EdgeRules, nsConf noise.Confi // we need to close currently existing wrapped rg if there's one nrg, ok := r.rgsNs[rules.Desc] - r.logger.Warnf("Creating new route group rule with desc: %s", &rules.Desc) rg := NewRouteGroup(DefaultRouteGroupConfig(), r.rt, rules.Desc, r.mLogger) rg.appendRules(rules.Forward, rules.Reverse, r.tm.Transport(rules.Forward.NextTransportID())) // we put raw rg so it can be accessible to the router when handshake packets come in diff --git a/pkg/visor/api.go b/pkg/visor/api.go index c6bb673fb..ae5469105 100644 --- a/pkg/visor/api.go +++ b/pkg/visor/api.go @@ -815,7 +815,6 @@ func (v *Visor) TestRouting(pk cipher.PubKey) (string, error) { conn, err = appnet.Ping(pk, addr) return err }) - v.log.Error("zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz") if err != nil { return "", err } @@ -824,15 +823,15 @@ func (v *Visor) TestRouting(pk cipher.PubKey) (string, error) { err = conn.Close() }() - // skywireConn, isSkywireConn := conn.(*appnet.SkywireConn) - // if !isSkywireConn { - // return "", fmt.Errorf("Can't get such info from this conn") - // } - // msh := "asdasdasdasdsa" - // _, err = skywireConn.Write([]byte(msh)) - // if err != nil { - // return "", err - // } + skywireConn, isSkywireConn := conn.(*appnet.SkywireConn) + if !isSkywireConn { + return "", fmt.Errorf("Can't get such info from this conn") + } + msh := "asdasdasdasdsa" + _, err = skywireConn.Write([]byte(msh)) + if err != nil { + return "", err + } var latency time.Duration // err = r.Do(ctx, func() error { // latency = skywireConn.Latency() diff --git a/pkg/visor/init.go b/pkg/visor/init.go index 0050faafa..34c6bc5fe 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -595,7 +595,6 @@ func initPing(ctx context.Context, v *Visor, log *logging.Logger) error { return } log.Debug("Accepted sky proxy conn") - log.Error("11111111111111111111111111111111111111111111111111") log.Debug("Wrapping conn...") wrappedConn, err := appnet.WrapConn(conn) From 5bb48a7b670c90e9ff2f7fa16a9494bf45040468 Mon Sep 17 00:00:00 2001 From: ersonp Date: Fri, 9 Dec 2022 13:13:20 +0530 Subject: [PATCH 12/30] Revert test changes --- pkg/router/router.go | 10 +++++----- pkg/visor/init.go | 1 + 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/router/router.go b/pkg/router/router.go index ea88b4d7c..af6f86cde 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -293,7 +293,7 @@ func (r *router) DialRoutes( Initiator: true, } - nrg, err := r.saveRouteGroupRules(rules, nsConf, "one") + nrg, err := r.saveRouteGroupRules(rules, nsConf) if err != nil { return nil, fmt.Errorf("saveRouteGroupRules: %w", err) } @@ -374,7 +374,7 @@ func (r *router) PingRoute( Initiator: true, } - nrg, err := r.saveRouteGroupRules(rules, nsConf, "two") + nrg, err := r.saveRouteGroupRules(rules, nsConf) if err != nil { return nil, fmt.Errorf("saveRouteGroupRules: %w", err) } @@ -425,7 +425,7 @@ func (r *router) AcceptRoutes(ctx context.Context) (net.Conn, error) { Initiator: false, } - nrg, err := r.saveRouteGroupRules(rules, nsConf, "three") + nrg, err := r.saveRouteGroupRules(rules, nsConf) if err != nil { return nil, fmt.Errorf("saveRouteGroupRules: %w", err) } @@ -495,7 +495,7 @@ func (r *router) serveSetup() { } } -func (r *router) saveRouteGroupRules(rules routing.EdgeRules, nsConf noise.Config, test string) (*NoiseRouteGroup, error) { +func (r *router) saveRouteGroupRules(rules routing.EdgeRules, nsConf noise.Config) (*NoiseRouteGroup, error) { r.logger.Debugf("Saving route group rules with desc: %s", &rules.Desc) // When route group is wrapped with noise, it's put into `nrgs`. but before that, @@ -575,7 +575,7 @@ func (r *router) saveRouteGroupRules(rules routing.EdgeRules, nsConf noise.Confi // wrapping rg with noise wrappedRG, err := network.EncryptConn(nsConf, rg) if err != nil { - r.logger.WithError(err).Errorf("Failed to wrap route group (%s): %v, closing... %v", &rules.Desc, err, test) + r.logger.WithError(err).Errorf("Failed to wrap route group (%s): %v, closing...", &rules.Desc, err) if err := rg.Close(); err != nil { r.logger.WithError(err).Errorf("Failed to close route group (%s): %v", &rules.Desc, err) } diff --git a/pkg/visor/init.go b/pkg/visor/init.go index 34c6bc5fe..9c6a083d0 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -572,6 +572,7 @@ func initPing(ctx context.Context, v *Visor, log *logging.Logger) error { PubKey: v.conf.PK, Port: routing.Port(skyenv.SkyPingPort), } + l, err := appnet.ListenContext(ctx, connApp) if err != nil { cancel() From 697ce71f116be70b85f36ab67d6479f667ab6110 Mon Sep 17 00:00:00 2001 From: ersonp Date: Fri, 9 Dec 2022 13:13:36 +0530 Subject: [PATCH 13/30] Update Ping usage in dmsg network --- pkg/app/appnet/dmsg_networker.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/app/appnet/dmsg_networker.go b/pkg/app/appnet/dmsg_networker.go index c7cdeec81..b572002ef 100644 --- a/pkg/app/appnet/dmsg_networker.go +++ b/pkg/app/appnet/dmsg_networker.go @@ -3,6 +3,7 @@ package appnet import ( "context" + "fmt" "net" "github.com/skycoin/dmsg/pkg/dmsg" @@ -29,7 +30,7 @@ func (n *DmsgNetworker) Dial(addr Addr) (net.Conn, error) { // Ping dials remote `addr` via dmsg network. func (n *DmsgNetworker) Ping(pk cipher.PubKey, addr Addr) (net.Conn, error) { - return n.DialContext(context.Background(), addr) + return nil, fmt.Errorf("Ping not available on dmsg network") } // DialContext dials remote `addr` via dmsg network with context. From 9115937f2b65d6592df54bc139f124199b46aa6d Mon Sep 17 00:00:00 2001 From: ersonp Date: Fri, 9 Dec 2022 18:39:45 +0530 Subject: [PATCH 14/30] Check if pk to be ping is in the route hops --- pkg/router/router.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/pkg/router/router.go b/pkg/router/router.go index af6f86cde..12bfeb484 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -1120,6 +1120,32 @@ fetchRoutesAgain: } } + var hopTo, hopFrom bool + // check if the remote pk is present in both the hops + // [ + // { + // "TpID":"", + // "From":"", + // "To":"" + // }, + // { + // "TpID":"", + // "From":"", + // "To":"" + // } + // ] + for _, hop := range paths[forward][0] { + if hop.To == pingKey { + hopTo = true + } + if hop.From == pingKey { + hopFrom = true + } + } + if !hopTo && hopFrom { + return nil, nil, fmt.Errorf("Unable to fetch route with a hop from %v", pingKey) + } + r.logger.Debugf("Found routes Forward: %s. Reverse %s", paths[forward], paths[backward]) return paths[forward][0], paths[backward][0], nil From 41555a6c87d2ef7195ac0f63e7e60cbedb27deb3 Mon Sep 17 00:00:00 2001 From: ersonp Date: Mon, 12 Dec 2022 21:21:32 +0530 Subject: [PATCH 15/30] Split TestRouting into DialPing, Ping and StopPing --- cmd/skywire-cli/commands/visor/ping.go | 6 ++- pkg/visor/api.go | 57 +++++++++++++++++--------- pkg/visor/init.go | 6 +-- pkg/visor/rpc.go | 30 ++++++++++---- pkg/visor/rpc_client.go | 44 ++++++++++++++------ pkg/visor/visor.go | 9 ++++ 6 files changed, 109 insertions(+), 43 deletions(-) diff --git a/cmd/skywire-cli/commands/visor/ping.go b/cmd/skywire-cli/commands/visor/ping.go index f4fd153fd..ff61c0d43 100644 --- a/cmd/skywire-cli/commands/visor/ping.go +++ b/cmd/skywire-cli/commands/visor/ping.go @@ -22,9 +22,13 @@ var testCmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { pk := internal.ParsePK(cmd.Flags(), "pk", args[0]) - latency, err := clirpc.Client(cmd.Flags()).TestRouting(pk) + err := clirpc.Client(cmd.Flags()).DialPing(pk) + internal.Catch(cmd.Flags(), err) + latency, err := clirpc.Client(cmd.Flags()).Ping(pk) internal.Catch(cmd.Flags(), err) internal.PrintOutput(cmd.Flags(), latency, fmt.Sprintf(latency+"\n")) + err = clirpc.Client(cmd.Flags()).StopPing(pk) + internal.Catch(cmd.Flags(), err) }, } diff --git a/pkg/visor/api.go b/pkg/visor/api.go index ae5469105..fc91e3de3 100644 --- a/pkg/visor/api.go +++ b/pkg/visor/api.go @@ -79,7 +79,6 @@ type API interface { RoutingRule(key routing.RouteID) (routing.Rule, error) SaveRoutingRule(rule routing.Rule) error RemoveRoutingRule(key routing.RouteID) error - TestRouting(pk cipher.PubKey) (string, error) RouteGroups() ([]RouteGroupInfo, error) @@ -96,6 +95,10 @@ type API interface { SetLogRotationInterval(visorconfig.Duration) error IsDMSGClientReady() (bool, error) + + DialPing(pk cipher.PubKey) error + Ping(pk cipher.PubKey) (string, error) + StopPing(pk cipher.PubKey) error } // HealthCheckable resource returns its health status as an integer @@ -798,8 +801,8 @@ func (v *Visor) RoutingRules() ([]routing.Rule, error) { return v.router.Rules(), nil } -// TestRouting implements API. -func (v *Visor) TestRouting(pk cipher.PubKey) (string, error) { +// DialPing implements API. +func (v *Visor) DialPing(pk cipher.PubKey) error { addr := appnet.Addr{ Net: appnet.TypeSkynet, PubKey: v.conf.PK, @@ -816,31 +819,47 @@ func (v *Visor) TestRouting(pk cipher.PubKey) (string, error) { return err }) if err != nil { - return "", err + return err } - defer func() { - err = conn.Close() - }() - skywireConn, isSkywireConn := conn.(*appnet.SkywireConn) if !isSkywireConn { - return "", fmt.Errorf("Can't get such info from this conn") + return fmt.Errorf("Can't get such info from this conn") } + + v.connMx.Lock() + v.pingConns[pk] = ping{ + conn: skywireConn, + } + v.connMx.Unlock() + return nil +} + +// Ping implements API. +func (v *Visor) Ping(pk cipher.PubKey) (string, error) { + v.connMx.Lock() + defer v.connMx.Unlock() + skywireConn := v.pingConns[pk].conn msh := "asdasdasdasdsa" - _, err = skywireConn.Write([]byte(msh)) + _, err := skywireConn.Write([]byte(msh)) if err != nil { return "", err } - var latency time.Duration - // err = r.Do(ctx, func() error { - // latency = skywireConn.Latency() - // if latency.String() != "0s" { - // return nil - // } - // return fmt.Errorf("unable to retrieve latency") - // }) - return fmt.Sprint(latency), nil + return "test", nil +} + +// StopPing implements API. +func (v *Visor) StopPing(pk cipher.PubKey) error { + v.connMx.Lock() + defer v.connMx.Unlock() + + skywireConn := v.pingConns[pk].conn + err := skywireConn.Close() + if err != nil { + return err + } + delete(v.pingConns, pk) + return nil } // RoutingRule implements API. diff --git a/pkg/visor/init.go b/pkg/visor/init.go index 9c6a083d0..6198bbe65 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -121,7 +121,7 @@ var ( // Dmsg trackers module dmsgTrackers vinit.Module // Ping module - ping vinit.Module + pi vinit.Module // visor that groups all modules together vis vinit.Module ) @@ -159,9 +159,9 @@ func registerModules(logger *logging.MasterLogger) { trs = maker("transport_setup", initTransportSetup, &dmsgC, &tr) tm = vinit.MakeModule("transports", vinit.DoNothing, logger, &sc, &sudphC, &dmsgCtrl, &dmsgHTTPLogServer, &dmsgTrackers) pvs = maker("public_visor", initPublicVisor, &tr, &ar, &disc, &stcprC) - ping = maker("ping", initPing, &dmsgC, &tm) + pi = maker("ping", initPing, &dmsgC, &tm) vis = vinit.MakeModule("visor", vinit.DoNothing, logger, &ebc, &ar, &disc, &pty, - &tr, &rt, &launch, &cli, &hvs, &ut, &pv, &pvs, &trs, &stcpC, &stcprC, &ping) + &tr, &rt, &launch, &cli, &hvs, &ut, &pv, &pvs, &trs, &stcpC, &stcprC, &pi) hv = maker("hypervisor", initHypervisor, &vis) } diff --git a/pkg/visor/rpc.go b/pkg/visor/rpc.go index 47a278df0..8c9caa12f 100644 --- a/pkg/visor/rpc.go +++ b/pkg/visor/rpc.go @@ -493,14 +493,6 @@ func (r *RPC) RoutingRules(_ *struct{}, out *[]routing.Rule) (err error) { return err } -// TestRouting obtains all routing rules of the RoutingTable. -func (r *RPC) TestRouting(pk *cipher.PubKey, out *string) (err error) { - defer rpcutil.LogCall(r.log, "TestRouting", pk)(out, &err) - - *out, err = r.visor.TestRouting(*pk) - return err -} - // RoutingRule obtains a routing rule of given RouteID. func (r *RPC) RoutingRule(key *routing.RouteID, rule *routing.Rule) (err error) { defer rpcutil.LogCall(r.log, "RoutingRule", key)(rule, &err) @@ -636,3 +628,25 @@ func (r *RPC) IsDMSGClientReady(_ *struct{}, out *bool) (err error) { *out = status return err } + +// DialPing dials to the ping module using the provided pk as a hop. +func (r *RPC) DialPing(pk *cipher.PubKey, _ *struct{}) (err error) { + defer rpcutil.LogCall(r.log, "DialPing", pk)(nil, &err) + + return r.visor.DialPing(*pk) +} + +// Ping pings the connected route via DialPing. +func (r *RPC) Ping(pk *cipher.PubKey, out *string) (err error) { + defer rpcutil.LogCall(r.log, "Ping", pk)(out, &err) + + *out, err = r.visor.Ping(*pk) + return err +} + +// StopPing stops the ping conn. +func (r *RPC) StopPing(pk *cipher.PubKey, _ *struct{}) (err error) { + defer rpcutil.LogCall(r.log, "StopPing", pk)(nil, &err) + + return r.visor.StopPing(*pk) +} diff --git a/pkg/visor/rpc_client.go b/pkg/visor/rpc_client.go index 56a302f31..e30726400 100644 --- a/pkg/visor/rpc_client.go +++ b/pkg/visor/rpc_client.go @@ -353,13 +353,6 @@ func (rc *rpcClient) RoutingRules() ([]routing.Rule, error) { return entries, err } -// TestRouting calls RoutingRules. -func (rc *rpcClient) TestRouting(pk cipher.PubKey) (string, error) { - var latency string - err := rc.Call("TestRouting", &pk, &latency) - return latency, err -} - // RoutingRule calls RoutingRule. func (rc *rpcClient) RoutingRule(key routing.RouteID) (routing.Rule, error) { var rule routing.Rule @@ -470,6 +463,23 @@ func (rc *rpcClient) IsDMSGClientReady() (bool, error) { return out, err } +// DialPing calls DialPing. +func (rc *rpcClient) DialPing(pk cipher.PubKey) error { + return rc.Call("DialPing", &pk, &struct{}{}) +} + +// Ping calls Ping. +func (rc *rpcClient) Ping(pk cipher.PubKey) (string, error) { + var latency string + err := rc.Call("Ping", &pk, &latency) + return latency, err +} + +// StopPing calls StopPing. +func (rc *rpcClient) StopPing(pk cipher.PubKey) error { + return rc.Call("StopPing", &pk, &struct{}{}) +} + // MockRPCClient mocks API. type mockRPCClient struct { startedAt time.Time @@ -957,11 +967,6 @@ func (mc *mockRPCClient) RoutingRules() ([]routing.Rule, error) { return mc.rt.AllRules(), nil } -// RoutingRules implements API. -func (mc *mockRPCClient) TestRouting(_ cipher.PubKey) (string, error) { - return "", nil -} - // RoutingRule implements API. func (mc *mockRPCClient) RoutingRule(key routing.RouteID) (routing.Rule, error) { return mc.rt.Rule(key) @@ -1062,3 +1067,18 @@ func (mc *mockRPCClient) RemoteVisors() ([]string, error) { func (mc *mockRPCClient) IsDMSGClientReady() (bool, error) { return false, nil } + +// DialPing implements API. +func (mc *mockRPCClient) DialPing(_ cipher.PubKey) error { + return nil +} + +// Ping implements API. +func (mc *mockRPCClient) Ping(_ cipher.PubKey) (string, error) { + return "", nil +} + +// StopPing implements API. +func (mc *mockRPCClient) StopPing(_ cipher.PubKey) error { + return nil +} diff --git a/pkg/visor/visor.go b/pkg/visor/visor.go index ef77220ef..a0fc82e3d 100644 --- a/pkg/visor/visor.go +++ b/pkg/visor/visor.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "net" "net/http" "sync" "time" @@ -93,6 +94,14 @@ type Visor struct { autoPeerIP string // autoPeerCmd is the command string used to return the public key of the hypervisor remoteVisors map[cipher.PubKey]Conn // remote hypervisors the visor is attempting to connect to connectedHypervisors map[cipher.PubKey]bool // remote hypervisors the visor is currently connected to + + pingConns map[cipher.PubKey]ping + connMx *sync.RWMutex +} + +type ping struct { + conn net.Conn + latency chan string } // todo: consider moving module closing to the module system From 6c3612bb04755eb2c1586fd8445cf3d16af7f20c Mon Sep 17 00:00:00 2001 From: ersonp Date: Mon, 12 Dec 2022 21:21:56 +0530 Subject: [PATCH 16/30] Linting --- cmd/skywire-cli/commands/visor/route.go | 20 ++++++------ cmd/skywire-cli/commands/visor/transports.go | 32 ++++++++++---------- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/cmd/skywire-cli/commands/visor/route.go b/cmd/skywire-cli/commands/visor/route.go index 6cadff2c6..c5f92d0ad 100644 --- a/cmd/skywire-cli/commands/visor/route.go +++ b/cmd/skywire-cli/commands/visor/route.go @@ -23,7 +23,7 @@ import ( var routeCmd = &cobra.Command{ Use: "route", Short: "View and set rules", - Long: "\n View and set routing rules", + Long: "\n View and set routing rules", } func init() { @@ -39,7 +39,7 @@ func init() { var lsRulesCmd = &cobra.Command{ Use: "ls-rules", Short: "List routing rules", - Long: "\n List routing rules", + Long: "\n List routing rules", Run: func(cmd *cobra.Command, _ []string) { rules, err := clirpc.Client(cmd.Flags()).RoutingRules() internal.Catch(cmd.Flags(), err) @@ -51,8 +51,8 @@ var lsRulesCmd = &cobra.Command{ var ruleCmd = &cobra.Command{ Use: "rule ", Short: "Return routing rule by route ID key", - Long: "\n Return routing rule by route ID key", - Args: cobra.MinimumNArgs(1), + Long: "\n Return routing rule by route ID key", + Args: cobra.MinimumNArgs(1), Run: func(cmd *cobra.Command, args []string) { id, err := strconv.ParseUint(args[0], 10, 32) internal.Catch(cmd.Flags(), err) @@ -71,7 +71,7 @@ func init() { var rmRuleCmd = &cobra.Command{ Use: "rm-rule ", Short: "Remove routing rule", - Long: "\n Remove routing rule", + Long: "\n Remove routing rule", //Args: cobra.MinimumNArgs(1), Run: func(cmd *cobra.Command, args []string) { //TODO @@ -91,7 +91,7 @@ var rmRuleCmd = &cobra.Command{ var addRuleCmd = &cobra.Command{ Use: "add-rule ( app | fwd | intfwd )", Short: "Add routing rule", - Long: "\n Add routing rule", + Long: "\n Add routing rule", } var keepAlive time.Duration @@ -122,9 +122,9 @@ func init() { } var addAppRuleCmd = &cobra.Command{ - Use: "app \\\n \\\n \\\n \\\n \\\n \\\n || ", + Use: "app \\\n \\\n \\\n \\\n \\\n \\\n || ", Short: "Add app/consume routing rule", - Long: "\n Add app/consume routing rule", + Long: "\n Add app/consume routing rule", Args: func(_ *cobra.Command, args []string) error { if rID == "" && lPK == "" && lPt == "" && rPK == "" && rPt == "" { if len(args) > 0 { @@ -233,7 +233,7 @@ func init() { } var addFwdRuleCmd = &cobra.Command{ - Use: "fwd \\\n \\\n \\\n \\\n \\\n \\\n \\\n \\\n || ", + Use: "fwd \\\n \\\n \\\n \\\n \\\n \\\n \\\n \\\n || ", Short: "Add forward routing rule", Long: "\n Add forward routing rule", Args: func(_ *cobra.Command, args []string) error { @@ -287,7 +287,7 @@ func init() { } var addIntFwdRuleCmd = &cobra.Command{ - Use: "intfwd \\\n \\\n \\\n \\\n || ", + Use: "intfwd \\\n \\\n \\\n \\\n || ", Short: "Add intermediary forward routing rule", Long: "\n Add intermediary forward routing rule", Args: func(_ *cobra.Command, args []string) error { diff --git a/cmd/skywire-cli/commands/visor/transports.go b/cmd/skywire-cli/commands/visor/transports.go index 233e17494..ad3d15c5c 100644 --- a/cmd/skywire-cli/commands/visor/transports.go +++ b/cmd/skywire-cli/commands/visor/transports.go @@ -56,9 +56,9 @@ var tpCmd = &cobra.Command{ } var lsTypesCmd = &cobra.Command{ - Use: "type", - Short: "Transport types used by the local visor", - Long: "\n Transport types used by the local visor", + Use: "type", + Short: "Transport types used by the local visor", + Long: "\n Transport types used by the local visor", DisableFlagsInUseLine: true, Run: func(cmd *cobra.Command, _ []string) { types, err := clirpc.Client(cmd.Flags()).TransportTypes() @@ -76,7 +76,7 @@ func init() { var lsTpCmd = &cobra.Command{ Use: "ls", Short: "Available transports", - Long: "\n Available transports\n\n displays transports of the local visor", + Long: "\n Available transports\n\n displays transports of the local visor", Run: func(cmd *cobra.Command, _ []string) { var pks cipher.PubKeys if filterPubKeys != nil { @@ -93,9 +93,9 @@ func init() { } var idCmd = &cobra.Command{ - Use: "id (-i) ", - Short: "Transport summary by id", - Long: "\n Transport summary by id", + Use: "id (-i) ", + Short: "Transport summary by id", + Long: "\n Transport summary by id", DisableFlagsInUseLine: true, Run: func(cmd *cobra.Command, args []string) { @@ -119,9 +119,9 @@ func init() { } var addTpCmd = &cobra.Command{ - Use: "add (-p) ", - Short: "Add a transport", - Long: "\n Add a transport\n \n If the transport type is unspecified,\n the visor will attempt to establish a transport\n in the following order: skywire-tcp, stcpr, sudph, dmsg", + Use: "add (-p) ", + Short: "Add a transport", + Long: "\n Add a transport\n \n If the transport type is unspecified,\n the visor will attempt to establish a transport\n in the following order: skywire-tcp, stcpr, sudph, dmsg", Args: cobra.MinimumNArgs(1), DisableFlagsInUseLine: true, Run: func(cmd *cobra.Command, args []string) { @@ -176,9 +176,9 @@ func init() { } var rmTpCmd = &cobra.Command{ - Use: "rm ( -a || -i ) ", - Short: "Remove transport(s) by id", - Long: "\n Remove transport(s) by id", + Use: "rm ( -a || -i ) ", + Short: "Remove transport(s) by id", + Long: "\n Remove transport(s) by id", DisableFlagsInUseLine: true, Run: func(cmd *cobra.Command, args []string) { //TODO @@ -257,9 +257,9 @@ func init() { } var discTpCmd = &cobra.Command{ - Use: "disc (--id= || --pk=)", - Short: "Discover remote transport(s)", - Long: "\n Discover remote transport(s) by ID or public key", + Use: "disc (--id= || --pk=)", + Short: "Discover remote transport(s)", + Long: "\n Discover remote transport(s) by ID or public key", DisableFlagsInUseLine: true, Args: func(_ *cobra.Command, _ []string) error { if tpID == "" && tpPK == "" { From 164354d9ce5fdd9bc482a8430a726c33fbaa20d2 Mon Sep 17 00:00:00 2001 From: ersonp Date: Mon, 12 Dec 2022 22:12:08 +0530 Subject: [PATCH 17/30] Add ping latency logic --- pkg/visor/api.go | 31 +++++++++++++++++++++---------- pkg/visor/init.go | 14 ++++++++++++-- pkg/visor/ping.go | 20 ++++++++++++++++++++ pkg/visor/visor.go | 12 ++++-------- 4 files changed, 57 insertions(+), 20 deletions(-) create mode 100644 pkg/visor/ping.go diff --git a/pkg/visor/api.go b/pkg/visor/api.go index fc91e3de3..9d23dfbd3 100644 --- a/pkg/visor/api.go +++ b/pkg/visor/api.go @@ -4,6 +4,7 @@ package visor import ( "context" "encoding/hex" + "encoding/json" "errors" "fmt" "net" @@ -827,31 +828,41 @@ func (v *Visor) DialPing(pk cipher.PubKey) error { return fmt.Errorf("Can't get such info from this conn") } - v.connMx.Lock() + v.pingConnMx.Lock() v.pingConns[pk] = ping{ - conn: skywireConn, + conn: skywireConn, + latency: make(chan string), } - v.connMx.Unlock() + v.pingConnMx.Unlock() return nil } // Ping implements API. func (v *Visor) Ping(pk cipher.PubKey) (string, error) { - v.connMx.Lock() - defer v.connMx.Unlock() + v.pingConnMx.Lock() + defer v.pingConnMx.Unlock() + skywireConn := v.pingConns[pk].conn - msh := "asdasdasdasdsa" - _, err := skywireConn.Write([]byte(msh)) + msg := PingMsg{ + Timestamp: time.Now(), + PingPk: pk, + } + b, err := json.Marshal(msg) + if err != nil { + return "", err + } + _, err = skywireConn.Write(b) if err != nil { return "", err } - return "test", nil + latency := <-v.pingConns[pk].latency + return latency, nil } // StopPing implements API. func (v *Visor) StopPing(pk cipher.PubKey) error { - v.connMx.Lock() - defer v.connMx.Unlock() + v.pingConnMx.Lock() + defer v.pingConnMx.Unlock() skywireConn := v.pingConns[pk].conn err := skywireConn.Close() diff --git a/pkg/visor/init.go b/pkg/visor/init.go index 6198bbe65..30c35b8ab 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -606,19 +606,29 @@ func initPing(ctx context.Context, v *Visor, log *logging.Logger) error { rAddr := wrappedConn.RemoteAddr().(appnet.Addr) log.Debugf("Accepted sky proxy conn on %s from %s", wrappedConn.LocalAddr(), rAddr.PubKey) - go handleServerConn(log, wrappedConn) + go handleServerConn(log, wrappedConn, v) } }() return nil } -func handleServerConn(log *logging.Logger, remoteConn net.Conn) { +func handleServerConn(log *logging.Logger, remoteConn net.Conn, v *Visor) { buf := make([]byte, 32*1024) n, err := remoteConn.Read(buf) if err != nil { log.WithError(err).Error("Failed to read packet") return } + var msg PingMsg + err = json.Unmarshal(buf[:n], &msg) + if err != nil { + log.WithError(err).Error("Failed to read packet") + return + } + now := time.Now() + diff := now.Sub(msg.Timestamp) + v.pingConns[msg.PingPk].latency <- fmt.Sprint(diff) + log.Debugf("Received: %s", buf[:n]) } diff --git a/pkg/visor/ping.go b/pkg/visor/ping.go new file mode 100644 index 000000000..5796ab85f --- /dev/null +++ b/pkg/visor/ping.go @@ -0,0 +1,20 @@ +// Package visor pkg/visor/ping.go +package visor + +import ( + "net" + "time" + + "github.com/skycoin/skywire-utilities/pkg/cipher" +) + +type ping struct { + conn net.Conn + latency chan string +} + +// PingMsg ... +type PingMsg struct { + Timestamp time.Time + PingPk cipher.PubKey +} diff --git a/pkg/visor/visor.go b/pkg/visor/visor.go index a0fc82e3d..924fd9241 100644 --- a/pkg/visor/visor.go +++ b/pkg/visor/visor.go @@ -5,7 +5,6 @@ import ( "context" "errors" "fmt" - "net" "net/http" "sync" "time" @@ -95,13 +94,8 @@ type Visor struct { remoteVisors map[cipher.PubKey]Conn // remote hypervisors the visor is attempting to connect to connectedHypervisors map[cipher.PubKey]bool // remote hypervisors the visor is currently connected to - pingConns map[cipher.PubKey]ping - connMx *sync.RWMutex -} - -type ping struct { - conn net.Conn - latency chan string + pingConns map[cipher.PubKey]ping + pingConnMx *sync.Mutex } // todo: consider moving module closing to the module system @@ -136,6 +130,8 @@ func NewVisor(ctx context.Context, conf *visorconfig.V1, restartCtx *restart.Con dtmReady: make(chan struct{}), stunReady: make(chan struct{}), connectedHypervisors: make(map[cipher.PubKey]bool), + pingConns: make(map[cipher.PubKey]ping), + pingConnMx: new(sync.Mutex), } v.isServicesHealthy.init() From 80635d0f868c8425483256f24219461e267b08aa Mon Sep 17 00:00:00 2001 From: ersonp Date: Mon, 12 Dec 2022 22:13:01 +0530 Subject: [PATCH 18/30] Add pk check --- pkg/visor/api.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/visor/api.go b/pkg/visor/api.go index 9d23dfbd3..ae7c84ac7 100644 --- a/pkg/visor/api.go +++ b/pkg/visor/api.go @@ -804,6 +804,9 @@ func (v *Visor) RoutingRules() ([]routing.Rule, error) { // DialPing implements API. func (v *Visor) DialPing(pk cipher.PubKey) error { + if pk == v.conf.PK { + return fmt.Errorf("Visor cannot ping itself") + } addr := appnet.Addr{ Net: appnet.TypeSkynet, PubKey: v.conf.PK, From 95c5701a06c64bf27f48895debfee6d7aacfc6a3 Mon Sep 17 00:00:00 2001 From: ersonp Date: Mon, 12 Dec 2022 22:34:38 +0530 Subject: [PATCH 19/30] Fix DialPing by waiting for atleast one tpm to initilize --- pkg/visor/api.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/visor/api.go b/pkg/visor/api.go index ae7c84ac7..9674c2086 100644 --- a/pkg/visor/api.go +++ b/pkg/visor/api.go @@ -807,6 +807,8 @@ func (v *Visor) DialPing(pk cipher.PubKey) error { if pk == v.conf.PK { return fmt.Errorf("Visor cannot ping itself") } + // waiting for at least one transport to initialize + <-v.tpM.Ready() addr := appnet.Addr{ Net: appnet.TypeSkynet, PubKey: v.conf.PK, From 2966e0b8017464ee3d9bb95c097b818508c8d70c Mon Sep 17 00:00:00 2001 From: ersonp Date: Mon, 12 Dec 2022 22:36:05 +0530 Subject: [PATCH 20/30] Fix handlePingConn --- pkg/visor/init.go | 40 ++++++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/pkg/visor/init.go b/pkg/visor/init.go index 30c35b8ab..67541c702 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -606,30 +606,34 @@ func initPing(ctx context.Context, v *Visor, log *logging.Logger) error { rAddr := wrappedConn.RemoteAddr().(appnet.Addr) log.Debugf("Accepted sky proxy conn on %s from %s", wrappedConn.LocalAddr(), rAddr.PubKey) - go handleServerConn(log, wrappedConn, v) + go handlePingConn(log, wrappedConn, v) } }() return nil } -func handleServerConn(log *logging.Logger, remoteConn net.Conn, v *Visor) { - buf := make([]byte, 32*1024) - n, err := remoteConn.Read(buf) - if err != nil { - log.WithError(err).Error("Failed to read packet") - return - } - var msg PingMsg - err = json.Unmarshal(buf[:n], &msg) - if err != nil { - log.WithError(err).Error("Failed to read packet") - return - } - now := time.Now() - diff := now.Sub(msg.Timestamp) - v.pingConns[msg.PingPk].latency <- fmt.Sprint(diff) +func handlePingConn(log *logging.Logger, remoteConn net.Conn, v *Visor) { + for { + buf := make([]byte, 32*1024) + n, err := remoteConn.Read(buf) + if err != nil { + if !errors.Is(err, io.EOF) { + log.WithError(err).Error("Failed to read packet") + } + return + } + var msg PingMsg + err = json.Unmarshal(buf[:n], &msg) + if err != nil { + log.WithError(err).Error("Failed to unmarshal json") + return + } + now := time.Now() + diff := now.Sub(msg.Timestamp) + v.pingConns[msg.PingPk].latency <- fmt.Sprint(diff) - log.Debugf("Received: %s", buf[:n]) + log.Debugf("Received: %s", buf[:n]) + } } // getRouteSetupHooks aka autotransport From 13a32a7491395e813f2a75be1c828b42183aa9a2 Mon Sep 17 00:00:00 2001 From: ersonp Date: Mon, 12 Dec 2022 22:36:25 +0530 Subject: [PATCH 21/30] Update ping subcommand --- cmd/skywire-cli/commands/visor/ping.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/cmd/skywire-cli/commands/visor/ping.go b/cmd/skywire-cli/commands/visor/ping.go index ff61c0d43..a6ce3240a 100644 --- a/cmd/skywire-cli/commands/visor/ping.go +++ b/cmd/skywire-cli/commands/visor/ping.go @@ -10,23 +10,28 @@ import ( "github.com/skycoin/skywire/cmd/skywire-cli/internal" ) +var no int + func init() { - RootCmd.AddCommand(testCmd) + RootCmd.AddCommand(pingCmd) + pingCmd.Flags().IntVarP(&no, "no", "n", 1, "Number of pings") } -var testCmd = &cobra.Command{ +var pingCmd = &cobra.Command{ Use: "ping ", - Short: "Return routing rule by route ID key", - Long: "\n Return routing rule by route ID key", + Short: "Ping the visor with given pk", + Long: "\n Creates a route with the provided pk as a hop and returns latency on the conn", Args: cobra.MinimumNArgs(1), Run: func(cmd *cobra.Command, args []string) { pk := internal.ParsePK(cmd.Flags(), "pk", args[0]) err := clirpc.Client(cmd.Flags()).DialPing(pk) internal.Catch(cmd.Flags(), err) - latency, err := clirpc.Client(cmd.Flags()).Ping(pk) - internal.Catch(cmd.Flags(), err) - internal.PrintOutput(cmd.Flags(), latency, fmt.Sprintf(latency+"\n")) + for i := 1; i <= no; i++ { + latency, err := clirpc.Client(cmd.Flags()).Ping(pk) + internal.Catch(cmd.Flags(), err) + internal.PrintOutput(cmd.Flags(), latency, fmt.Sprintf(latency+"\n")) + } err = clirpc.Client(cmd.Flags()).StopPing(pk) internal.Catch(cmd.Flags(), err) From c13a0b462c7168fcd1dedd5ff4208f9737d8b66c Mon Sep 17 00:00:00 2001 From: ersonp Date: Mon, 12 Dec 2022 22:40:41 +0530 Subject: [PATCH 22/30] Fix listening on close conn error on shutdown --- pkg/app/appnet/skywire_networker.go | 4 +++- pkg/visor/init.go | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/app/appnet/skywire_networker.go b/pkg/app/appnet/skywire_networker.go index ef306f605..a56c7a049 100644 --- a/pkg/app/appnet/skywire_networker.go +++ b/pkg/app/appnet/skywire_networker.go @@ -21,6 +21,8 @@ import ( var ( // ErrPortAlreadyBound is being returned when the desired port is already bound to. ErrPortAlreadyBound = errors.New("port already bound") + // ErrConnClosed is being returned when we listen on a closed conn. + ErrConnClosed = errors.New("listening on closed connection") ) // SkywireNetworker implements `Networker` for skynet. @@ -208,7 +210,7 @@ type skywireListener struct { func (l *skywireListener) Accept() (net.Conn, error) { conn, ok := <-l.connsCh if !ok { - return nil, errors.New("listening on closed connection") + return nil, ErrConnClosed } return &SkywireConn{ diff --git a/pkg/visor/init.go b/pkg/visor/init.go index 67541c702..62851698e 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -592,7 +592,9 @@ func initPing(ctx context.Context, v *Visor, log *logging.Logger) error { log.Debug("Accepting sky proxy conn...") conn, err := l.Accept() if err != nil { - log.WithError(err).Error("Failed to accept conn") + if !errors.Is(err, appnet.ErrConnClosed) { + log.WithError(err).Error("Failed to accept ping conn") + } return } log.Debug("Accepted sky proxy conn") From 3e36ede76247aa01f9f2e8b5ecce142113656ac1 Mon Sep 17 00:00:00 2001 From: ersonp Date: Mon, 12 Dec 2022 22:44:44 +0530 Subject: [PATCH 23/30] Minor initilization fix --- pkg/visor/api.go | 1 + pkg/visor/init.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/visor/api.go b/pkg/visor/api.go index 9674c2086..5e20ea60b 100644 --- a/pkg/visor/api.go +++ b/pkg/visor/api.go @@ -809,6 +809,7 @@ func (v *Visor) DialPing(pk cipher.PubKey) error { } // waiting for at least one transport to initialize <-v.tpM.Ready() + addr := appnet.Addr{ Net: appnet.TypeSkynet, PubKey: v.conf.PK, diff --git a/pkg/visor/init.go b/pkg/visor/init.go index 62851698e..db9ace37a 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -157,7 +157,7 @@ func registerModules(logger *logging.MasterLogger) { ut = maker("uptime_tracker", initUptimeTracker, &dmsgHTTP) pv = maker("public_autoconnect", initPublicAutoconnect, &tr, &disc) trs = maker("transport_setup", initTransportSetup, &dmsgC, &tr) - tm = vinit.MakeModule("transports", vinit.DoNothing, logger, &sc, &sudphC, &dmsgCtrl, &dmsgHTTPLogServer, &dmsgTrackers) + tm = vinit.MakeModule("transports", vinit.DoNothing, logger, &sc, &sudphC, &dmsgCtrl, &dmsgHTTPLogServer, &dmsgTrackers, &launch) pvs = maker("public_visor", initPublicVisor, &tr, &ar, &disc, &stcprC) pi = maker("ping", initPing, &dmsgC, &tm) vis = vinit.MakeModule("visor", vinit.DoNothing, logger, &ebc, &ar, &disc, &pty, From 9df4573dc632fd9a3fe4cd4b5393e20eaecc6bc7 Mon Sep 17 00:00:00 2001 From: MohammadReza Palide Date: Tue, 20 Dec 2022 14:11:11 +0330 Subject: [PATCH 24/30] change ping command for get arbitrary data size --- cmd/skywire-cli/commands/visor/ping.go | 18 +++++--- pkg/visor/api.go | 59 +++++++++++++++--------- pkg/visor/init.go | 3 +- pkg/visor/ping.go | 1 + pkg/visor/rpc.go | 12 ++--- pkg/visor/rpc_client.go | 18 ++++---- pkg/visor/visor.go | 5 +- vendor/github.com/godbus/dbus/v5/conn.go | 2 +- 8 files changed, 68 insertions(+), 50 deletions(-) diff --git a/cmd/skywire-cli/commands/visor/ping.go b/cmd/skywire-cli/commands/visor/ping.go index a6ce3240a..be54eac55 100644 --- a/cmd/skywire-cli/commands/visor/ping.go +++ b/cmd/skywire-cli/commands/visor/ping.go @@ -8,13 +8,16 @@ import ( clirpc "github.com/skycoin/skywire/cmd/skywire-cli/commands/rpc" "github.com/skycoin/skywire/cmd/skywire-cli/internal" + "github.com/skycoin/skywire/pkg/visor" ) -var no int +var tries int +var pcktSize int func init() { RootCmd.AddCommand(pingCmd) - pingCmd.Flags().IntVarP(&no, "no", "n", 1, "Number of pings") + pingCmd.Flags().IntVarP(&tries, "tries", "t", 1, "Number of pings") + pingCmd.Flags().IntVarP(&pcktSize, "size", "s", 32, "Size of packet, in KB, default is 32KB") } var pingCmd = &cobra.Command{ @@ -24,16 +27,17 @@ var pingCmd = &cobra.Command{ Args: cobra.MinimumNArgs(1), Run: func(cmd *cobra.Command, args []string) { pk := internal.ParsePK(cmd.Flags(), "pk", args[0]) + pingConfig := visor.PingConfig{PK: pk, Tries: tries, PcktSize: pcktSize} + err := clirpc.Client(cmd.Flags()).DialPing(pingConfig) + internal.Catch(cmd.Flags(), err) - err := clirpc.Client(cmd.Flags()).DialPing(pk) + latencies, err := clirpc.Client(cmd.Flags()).Ping(pingConfig) internal.Catch(cmd.Flags(), err) - for i := 1; i <= no; i++ { - latency, err := clirpc.Client(cmd.Flags()).Ping(pk) - internal.Catch(cmd.Flags(), err) + + for _, latency := range latencies { internal.PrintOutput(cmd.Flags(), latency, fmt.Sprintf(latency+"\n")) } err = clirpc.Client(cmd.Flags()).StopPing(pk) internal.Catch(cmd.Flags(), err) - }, } diff --git a/pkg/visor/api.go b/pkg/visor/api.go index 5e20ea60b..8c71852d2 100644 --- a/pkg/visor/api.go +++ b/pkg/visor/api.go @@ -97,8 +97,8 @@ type API interface { IsDMSGClientReady() (bool, error) - DialPing(pk cipher.PubKey) error - Ping(pk cipher.PubKey) (string, error) + DialPing(config PingConfig) error + Ping(config PingConfig) ([]string, error) StopPing(pk cipher.PubKey) error } @@ -802,11 +802,19 @@ func (v *Visor) RoutingRules() ([]routing.Rule, error) { return v.router.Rules(), nil } +// PingConfig use as configuration for ping command +type PingConfig struct { + PK cipher.PubKey + Tries int + PcktSize int +} + // DialPing implements API. -func (v *Visor) DialPing(pk cipher.PubKey) error { - if pk == v.conf.PK { +func (v *Visor) DialPing(conf PingConfig) error { + if conf.PK == v.conf.PK { return fmt.Errorf("Visor cannot ping itself") } + v.pingPcktSize = conf.PcktSize // waiting for at least one transport to initialize <-v.tpM.Ready() @@ -822,7 +830,7 @@ func (v *Visor) DialPing(pk cipher.PubKey) error { ctx := context.TODO() var r = netutil.NewRetrier(v.log, 2*time.Second, netutil.DefaultMaxBackoff, 5, 2) err = r.Do(ctx, func() error { - conn, err = appnet.Ping(pk, addr) + conn, err = appnet.Ping(conf.PK, addr) return err }) if err != nil { @@ -833,9 +841,8 @@ func (v *Visor) DialPing(pk cipher.PubKey) error { if !isSkywireConn { return fmt.Errorf("Can't get such info from this conn") } - v.pingConnMx.Lock() - v.pingConns[pk] = ping{ + v.pingConns[conf.PK] = ping{ conn: skywireConn, latency: make(chan string), } @@ -844,25 +851,31 @@ func (v *Visor) DialPing(pk cipher.PubKey) error { } // Ping implements API. -func (v *Visor) Ping(pk cipher.PubKey) (string, error) { +func (v *Visor) Ping(conf PingConfig) ([]string, error) { v.pingConnMx.Lock() defer v.pingConnMx.Unlock() - - skywireConn := v.pingConns[pk].conn - msg := PingMsg{ - Timestamp: time.Now(), - PingPk: pk, - } - b, err := json.Marshal(msg) - if err != nil { - return "", err - } - _, err = skywireConn.Write(b) - if err != nil { - return "", err + latencies := []string{} + // TODO (Mohammed): Arbitrary data size not work, should solve it later + // data := make([]byte, conf.PcktSize*1024) + data := make([]byte, 2*1024) + for i := 1; i <= conf.Tries; i++ { + skywireConn := v.pingConns[conf.PK].conn + msg := PingMsg{ + Timestamp: time.Now(), + PingPk: conf.PK, + Data: data, + } + b, err := json.Marshal(msg) + if err != nil { + return latencies, err + } + _, err = skywireConn.Write(b) + if err != nil { + return latencies, err + } + latencies = append(latencies, <-v.pingConns[conf.PK].latency) } - latency := <-v.pingConns[pk].latency - return latency, nil + return latencies, nil } // StopPing implements API. diff --git a/pkg/visor/init.go b/pkg/visor/init.go index db9ace37a..76dc3b8c7 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -598,7 +598,6 @@ func initPing(ctx context.Context, v *Visor, log *logging.Logger) error { return } log.Debug("Accepted sky proxy conn") - log.Debug("Wrapping conn...") wrappedConn, err := appnet.WrapConn(conn) if err != nil { @@ -616,7 +615,7 @@ func initPing(ctx context.Context, v *Visor, log *logging.Logger) error { func handlePingConn(log *logging.Logger, remoteConn net.Conn, v *Visor) { for { - buf := make([]byte, 32*1024) + buf := make([]byte, (32+v.pingPcktSize)*1024) n, err := remoteConn.Read(buf) if err != nil { if !errors.Is(err, io.EOF) { diff --git a/pkg/visor/ping.go b/pkg/visor/ping.go index 5796ab85f..48f918d6e 100644 --- a/pkg/visor/ping.go +++ b/pkg/visor/ping.go @@ -17,4 +17,5 @@ type ping struct { type PingMsg struct { Timestamp time.Time PingPk cipher.PubKey + Data []byte } diff --git a/pkg/visor/rpc.go b/pkg/visor/rpc.go index 8c9caa12f..9b5b32be9 100644 --- a/pkg/visor/rpc.go +++ b/pkg/visor/rpc.go @@ -630,17 +630,17 @@ func (r *RPC) IsDMSGClientReady(_ *struct{}, out *bool) (err error) { } // DialPing dials to the ping module using the provided pk as a hop. -func (r *RPC) DialPing(pk *cipher.PubKey, _ *struct{}) (err error) { - defer rpcutil.LogCall(r.log, "DialPing", pk)(nil, &err) +func (r *RPC) DialPing(conf PingConfig, _ *struct{}) (err error) { + defer rpcutil.LogCall(r.log, "DialPing", conf)(nil, &err) - return r.visor.DialPing(*pk) + return r.visor.DialPing(conf) } // Ping pings the connected route via DialPing. -func (r *RPC) Ping(pk *cipher.PubKey, out *string) (err error) { - defer rpcutil.LogCall(r.log, "Ping", pk)(out, &err) +func (r *RPC) Ping(conf PingConfig, out *[]string) (err error) { + defer rpcutil.LogCall(r.log, "Ping", conf)(out, &err) - *out, err = r.visor.Ping(*pk) + *out, err = r.visor.Ping(conf) return err } diff --git a/pkg/visor/rpc_client.go b/pkg/visor/rpc_client.go index e30726400..200e7ae02 100644 --- a/pkg/visor/rpc_client.go +++ b/pkg/visor/rpc_client.go @@ -464,15 +464,15 @@ func (rc *rpcClient) IsDMSGClientReady() (bool, error) { } // DialPing calls DialPing. -func (rc *rpcClient) DialPing(pk cipher.PubKey) error { - return rc.Call("DialPing", &pk, &struct{}{}) +func (rc *rpcClient) DialPing(conf PingConfig) error { + return rc.Call("DialPing", &conf, &struct{}{}) } // Ping calls Ping. -func (rc *rpcClient) Ping(pk cipher.PubKey) (string, error) { - var latency string - err := rc.Call("Ping", &pk, &latency) - return latency, err +func (rc *rpcClient) Ping(conf PingConfig) ([]string, error) { + var latencies []string + err := rc.Call("Ping", &conf, &latencies) + return latencies, err } // StopPing calls StopPing. @@ -1069,13 +1069,13 @@ func (mc *mockRPCClient) IsDMSGClientReady() (bool, error) { } // DialPing implements API. -func (mc *mockRPCClient) DialPing(_ cipher.PubKey) error { +func (mc *mockRPCClient) DialPing(_ PingConfig) error { return nil } // Ping implements API. -func (mc *mockRPCClient) Ping(_ cipher.PubKey) (string, error) { - return "", nil +func (mc *mockRPCClient) Ping(_ PingConfig) ([]string, error) { + return []string{}, nil } // StopPing implements API. diff --git a/pkg/visor/visor.go b/pkg/visor/visor.go index 924fd9241..80e446cae 100644 --- a/pkg/visor/visor.go +++ b/pkg/visor/visor.go @@ -94,8 +94,9 @@ type Visor struct { remoteVisors map[cipher.PubKey]Conn // remote hypervisors the visor is attempting to connect to connectedHypervisors map[cipher.PubKey]bool // remote hypervisors the visor is currently connected to - pingConns map[cipher.PubKey]ping - pingConnMx *sync.Mutex + pingConns map[cipher.PubKey]ping + pingConnMx *sync.Mutex + pingPcktSize int } // todo: consider moving module closing to the module system diff --git a/vendor/github.com/godbus/dbus/v5/conn.go b/vendor/github.com/godbus/dbus/v5/conn.go index 69978ea26..8080f3e91 100644 --- a/vendor/github.com/godbus/dbus/v5/conn.go +++ b/vendor/github.com/godbus/dbus/v5/conn.go @@ -432,7 +432,7 @@ func (conn *Conn) inWorker() { case TypeSignal: conn.handleSignal(sequence, msg) case TypeMethodCall: - go conn.handleCall(msg) +conn.handleCall(msg) } } From 99fc314644a59e9eddf0b2427cae1aaf7a6de6a6 Mon Sep 17 00:00:00 2001 From: MohammadReza Palide Date: Tue, 20 Dec 2022 14:32:26 +0330 Subject: [PATCH 25/30] fix rpcClient errors in new way --- cmd/skywire-cli/commands/visor/ping.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/cmd/skywire-cli/commands/visor/ping.go b/cmd/skywire-cli/commands/visor/ping.go index be54eac55..a7f09c2db 100644 --- a/cmd/skywire-cli/commands/visor/ping.go +++ b/cmd/skywire-cli/commands/visor/ping.go @@ -3,6 +3,7 @@ package clivisor import ( "fmt" + "os" "github.com/spf13/cobra" @@ -28,16 +29,20 @@ var pingCmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { pk := internal.ParsePK(cmd.Flags(), "pk", args[0]) pingConfig := visor.PingConfig{PK: pk, Tries: tries, PcktSize: pcktSize} - err := clirpc.Client(cmd.Flags()).DialPing(pingConfig) + rpcClient, err := clirpc.Client(cmd.Flags()) + if err != nil { + os.Exit(1) + } + err = rpcClient.DialPing(pingConfig) internal.Catch(cmd.Flags(), err) - latencies, err := clirpc.Client(cmd.Flags()).Ping(pingConfig) + latencies, err := rpcClient.Ping(pingConfig) internal.Catch(cmd.Flags(), err) for _, latency := range latencies { internal.PrintOutput(cmd.Flags(), latency, fmt.Sprintf(latency+"\n")) } - err = clirpc.Client(cmd.Flags()).StopPing(pk) + err = rpcClient.StopPing(pk) internal.Catch(cmd.Flags(), err) }, } From 3c70520b3cc82c4b51f5901bbeb24edb74925a03 Mon Sep 17 00:00:00 2001 From: MohammadReza Palide Date: Wed, 21 Dec 2022 02:21:34 +0330 Subject: [PATCH 26/30] add `skywire-cli visor test` command --- cmd/skywire-cli/commands/visor/ping.go | 23 +++++++- pkg/visor/api.go | 75 ++++++++++++++++++++++++-- pkg/visor/init.go | 2 +- pkg/visor/ping.go | 2 +- pkg/visor/rpc.go | 10 +++- pkg/visor/rpc_client.go | 20 +++++-- 6 files changed, 120 insertions(+), 12 deletions(-) diff --git a/cmd/skywire-cli/commands/visor/ping.go b/cmd/skywire-cli/commands/visor/ping.go index a7f09c2db..4038b184f 100644 --- a/cmd/skywire-cli/commands/visor/ping.go +++ b/cmd/skywire-cli/commands/visor/ping.go @@ -19,6 +19,9 @@ func init() { RootCmd.AddCommand(pingCmd) pingCmd.Flags().IntVarP(&tries, "tries", "t", 1, "Number of pings") pingCmd.Flags().IntVarP(&pcktSize, "size", "s", 32, "Size of packet, in KB, default is 32KB") + RootCmd.AddCommand(testCmd) + testCmd.Flags().IntVarP(&tries, "tries", "t", 1, "Number of tests per public visors") + testCmd.Flags().IntVarP(&pcktSize, "size", "s", 32, "Size of packet, in KB, default is 32KB") } var pingCmd = &cobra.Command{ @@ -40,9 +43,27 @@ var pingCmd = &cobra.Command{ internal.Catch(cmd.Flags(), err) for _, latency := range latencies { - internal.PrintOutput(cmd.Flags(), latency, fmt.Sprintf(latency+"\n")) + internal.PrintOutput(cmd.Flags(), fmt.Sprint(latency), fmt.Sprintf(fmt.Sprint(latency)+"\n")) } err = rpcClient.StopPing(pk) internal.Catch(cmd.Flags(), err) }, } + +var testCmd = &cobra.Command{ + Use: "test", + Short: "Test the visor with public visors on network", + Long: "\n Creates a route with public visors as a hop and returns latency on the conn", + Run: func(cmd *cobra.Command, args []string) { + pingConfig := visor.PingConfig{Tries: tries, PcktSize: pcktSize} + rpcClient, err := clirpc.Client(cmd.Flags()) + if err != nil { + os.Exit(1) + } + results, err := rpcClient.TestVisor(pingConfig) + internal.Catch(cmd.Flags(), err) + for i, result := range results { + internal.PrintOutput(cmd.Flags(), result, fmt.Sprintf("Test No. %d\nPK: %s\nMax: %s\nMin: %s\nMean: %s\n\n", i+1, result.PK, result.Max, result.Min, result.Mean)) + } + }, +} diff --git a/pkg/visor/api.go b/pkg/visor/api.go index 61dc2c1bf..c960fe06d 100644 --- a/pkg/visor/api.go +++ b/pkg/visor/api.go @@ -100,8 +100,10 @@ type API interface { IsDMSGClientReady() (bool, error) DialPing(config PingConfig) error - Ping(config PingConfig) ([]string, error) + Ping(config PingConfig) ([]time.Duration, error) StopPing(pk cipher.PubKey) error + + TestVisor(config PingConfig) ([]TestResult, error) } // HealthCheckable resource returns its health status as an integer @@ -713,6 +715,27 @@ func (v *Visor) VPNServers(version, country string) ([]servicedisc.Service, erro return vpnServers, nil } +// PublicVisors gets available public public visors from service discovery URL +func (v *Visor) PublicVisors(version, country string) ([]servicedisc.Service, error) { + log := logging.MustGetLogger("public_visors") + vLog := logging.NewMasterLogger() + vLog.SetLevel(logrus.InfoLevel) + + sdClient := servicedisc.NewClient(log, vLog, servicedisc.Config{ + Type: servicedisc.ServiceTypeVisor, + PK: v.conf.PK, + SK: v.conf.SK, + DiscAddr: v.conf.Launcher.ServiceDisc, + DisplayNodeIP: v.conf.Launcher.DisplayNodeIP, + }, &http.Client{Timeout: time.Duration(20) * time.Second}, "") + publicVisors, err := sdClient.Services(context.Background(), 0, version, country) + if err != nil { + v.log.Error("Error getting public vpn servers: ", err) + return nil, err + } + return publicVisors, nil +} + // RemoteVisors return list of connected remote visors func (v *Visor) RemoteVisors() ([]string, error) { var visors []string @@ -937,17 +960,17 @@ func (v *Visor) DialPing(conf PingConfig) error { v.pingConnMx.Lock() v.pingConns[conf.PK] = ping{ conn: skywireConn, - latency: make(chan string), + latency: make(chan time.Duration), } v.pingConnMx.Unlock() return nil } // Ping implements API. -func (v *Visor) Ping(conf PingConfig) ([]string, error) { +func (v *Visor) Ping(conf PingConfig) ([]time.Duration, error) { v.pingConnMx.Lock() defer v.pingConnMx.Unlock() - latencies := []string{} + latencies := []time.Duration{} // TODO (Mohammed): Arbitrary data size not work, should solve it later // data := make([]byte, conf.PcktSize*1024) data := make([]byte, 2*1024) @@ -985,6 +1008,50 @@ func (v *Visor) StopPing(pk cipher.PubKey) error { return nil } +// TestResult type of test result +type TestResult struct { + PK string + Max string + Min string + Mean string +} + +// TestVisor trying to test visor +func (v *Visor) TestVisor(conf PingConfig) ([]TestResult, error) { + result := []TestResult{} + publicVisors, err := v.PublicVisors("", "") + if err != nil { + return result, err + } + for _, publicVisor := range publicVisors { + conf.PK = publicVisor.Addr.PubKey() + err := v.DialPing(conf) + if err != nil { + return result, err + } + latencies, err := v.Ping(conf) + if err != nil { + v.StopPing(conf.PK) //nolint + return result, err + } + var max, min, mean, sumLatency time.Duration + min = time.Duration(10000000000) + for _, latency := range latencies { + if latency > max { + max = latency + } + if latency < min { + min = latency + } + sumLatency += latency + } + mean = sumLatency / time.Duration(len(latencies)) + result = append(result, TestResult{PK: conf.PK.String(), Max: fmt.Sprint(max), Min: fmt.Sprint(min), Mean: fmt.Sprint(mean)}) + v.StopPing(conf.PK) //nolint + } + return result, nil +} + // RoutingRule implements API. func (v *Visor) RoutingRule(key routing.RouteID) (routing.Rule, error) { return v.router.Rule(key) diff --git a/pkg/visor/init.go b/pkg/visor/init.go index b5126b576..c56cb7250 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -630,7 +630,7 @@ func handlePingConn(log *logging.Logger, remoteConn net.Conn, v *Visor) { } now := time.Now() diff := now.Sub(msg.Timestamp) - v.pingConns[msg.PingPk].latency <- fmt.Sprint(diff) + v.pingConns[msg.PingPk].latency <- diff log.Debugf("Received: %s", buf[:n]) } diff --git a/pkg/visor/ping.go b/pkg/visor/ping.go index 48f918d6e..ba5f36ca2 100644 --- a/pkg/visor/ping.go +++ b/pkg/visor/ping.go @@ -10,7 +10,7 @@ import ( type ping struct { conn net.Conn - latency chan string + latency chan time.Duration } // PingMsg ... diff --git a/pkg/visor/rpc.go b/pkg/visor/rpc.go index fd8f5229a..0a14976a2 100644 --- a/pkg/visor/rpc.go +++ b/pkg/visor/rpc.go @@ -644,7 +644,7 @@ func (r *RPC) DialPing(conf PingConfig, _ *struct{}) (err error) { } // Ping pings the connected route via DialPing. -func (r *RPC) Ping(conf PingConfig, out *[]string) (err error) { +func (r *RPC) Ping(conf PingConfig, out *[]time.Duration) (err error) { defer rpcutil.LogCall(r.log, "Ping", conf)(out, &err) *out, err = r.visor.Ping(conf) @@ -657,3 +657,11 @@ func (r *RPC) StopPing(pk *cipher.PubKey, _ *struct{}) (err error) { return r.visor.StopPing(*pk) } + +// TestVisor trying to test viosr by pinging to public visor. +func (r *RPC) TestVisor(conf PingConfig, out *[]TestResult) (err error) { + defer rpcutil.LogCall(r.log, "TestVisor", conf)(out, &err) + + *out, err = r.visor.TestVisor(conf) + return err +} diff --git a/pkg/visor/rpc_client.go b/pkg/visor/rpc_client.go index 14bd2c8cb..cb7f32265 100644 --- a/pkg/visor/rpc_client.go +++ b/pkg/visor/rpc_client.go @@ -483,8 +483,8 @@ func (rc *rpcClient) DialPing(conf PingConfig) error { } // Ping calls Ping. -func (rc *rpcClient) Ping(conf PingConfig) ([]string, error) { - var latencies []string +func (rc *rpcClient) Ping(conf PingConfig) ([]time.Duration, error) { + var latencies []time.Duration err := rc.Call("Ping", &conf, &latencies) return latencies, err } @@ -494,6 +494,13 @@ func (rc *rpcClient) StopPing(pk cipher.PubKey) error { return rc.Call("StopPing", &pk, &struct{}{}) } +// TestVisor calls TestVisor. +func (rc *rpcClient) TestVisor(conf PingConfig) ([]TestResult, error) { + var results []TestResult + err := rc.Call("TestVisor", &conf, &results) + return results, err +} + // MockRPCClient mocks API. type mockRPCClient struct { startedAt time.Time @@ -1108,11 +1115,16 @@ func (mc *mockRPCClient) DialPing(_ PingConfig) error { } // Ping implements API. -func (mc *mockRPCClient) Ping(_ PingConfig) ([]string, error) { - return []string{}, nil +func (mc *mockRPCClient) Ping(_ PingConfig) ([]time.Duration, error) { + return []time.Duration{}, nil } // StopPing implements API. func (mc *mockRPCClient) StopPing(_ cipher.PubKey) error { return nil } + +// TestVisor implements API. +func (mc *mockRPCClient) TestVisor(_ PingConfig) ([]TestResult, error) { + return []TestResult{}, nil +} From 2376c2beb496855b1868aedf5b6807f18bcaeaee Mon Sep 17 00:00:00 2001 From: MohammadReza Palide Date: Wed, 21 Dec 2022 02:59:20 +0330 Subject: [PATCH 27/30] improve logic --- pkg/visor/api.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/visor/api.go b/pkg/visor/api.go index c960fe06d..c681dc752 100644 --- a/pkg/visor/api.go +++ b/pkg/visor/api.go @@ -1031,7 +1031,7 @@ func (v *Visor) TestVisor(conf PingConfig) ([]TestResult, error) { } latencies, err := v.Ping(conf) if err != nil { - v.StopPing(conf.PK) //nolint + go v.StopPing(conf.PK) //nolint return result, err } var max, min, mean, sumLatency time.Duration From 02c8268d16476b3a12f1ed2c2204fe447ca77cf2 Mon Sep 17 00:00:00 2001 From: MohammadReza Palide Date: Tue, 27 Dec 2022 09:17:42 +0330 Subject: [PATCH 28/30] set limit on packet size in ping to 2KB --- pkg/visor/api.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/visor/api.go b/pkg/visor/api.go index c681dc752..4c30a13cd 100644 --- a/pkg/visor/api.go +++ b/pkg/visor/api.go @@ -971,9 +971,10 @@ func (v *Visor) Ping(conf PingConfig) ([]time.Duration, error) { v.pingConnMx.Lock() defer v.pingConnMx.Unlock() latencies := []time.Duration{} - // TODO (Mohammed): Arbitrary data size not work, should solve it later - // data := make([]byte, conf.PcktSize*1024) - data := make([]byte, 2*1024) + if conf.PcktSize > 2 { + conf.PcktSize = 2 + } + data := make([]byte, conf.PcktSize*1024) for i := 1; i <= conf.Tries; i++ { skywireConn := v.pingConns[conf.PK].conn msg := PingMsg{ From fe534412c99713e68d5c45577ee318f560b06dad Mon Sep 17 00:00:00 2001 From: ersonp Date: Tue, 27 Dec 2022 12:14:58 +0530 Subject: [PATCH 29/30] Add PingSizeMsg --- pkg/visor/ping.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/visor/ping.go b/pkg/visor/ping.go index ba5f36ca2..45263ef9b 100644 --- a/pkg/visor/ping.go +++ b/pkg/visor/ping.go @@ -13,9 +13,14 @@ type ping struct { latency chan time.Duration } -// PingMsg ... +// PingMsg is used to calculate the ping to a remote visor type PingMsg struct { Timestamp time.Time PingPk cipher.PubKey Data []byte } + +// PingSizeMsg contains the size of the PingMsg to be sent +type PingSizeMsg struct { + Size int +} From e62c1b128713b241d16842b7b3c149bf97f761cf Mon Sep 17 00:00:00 2001 From: ersonp Date: Tue, 27 Dec 2022 12:19:43 +0530 Subject: [PATCH 30/30] Fix ping data limit This commit fixes the issue with the limited data size of read of app conn by first sending the total size of the PingMsg in the new struct PingSizeMsg. Then in the ping module we read the size of the incoming PingMsg and read on the conn until we have received all the ping data packets then then unmarshall it. This way the ping still remains backwards compatable without needing to change the size of noise and break backwards compatability. --- pkg/visor/api.go | 27 ++++++++++++++++++++++----- pkg/visor/init.go | 25 ++++++++++++++++++++++++- 2 files changed, 46 insertions(+), 6 deletions(-) diff --git a/pkg/visor/api.go b/pkg/visor/api.go index 4c30a13cd..3ef4c081f 100644 --- a/pkg/visor/api.go +++ b/pkg/visor/api.go @@ -7,6 +7,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "net" "net/http" "os" @@ -971,9 +972,6 @@ func (v *Visor) Ping(conf PingConfig) ([]time.Duration, error) { v.pingConnMx.Lock() defer v.pingConnMx.Unlock() latencies := []time.Duration{} - if conf.PcktSize > 2 { - conf.PcktSize = 2 - } data := make([]byte, conf.PcktSize*1024) for i := 1; i <= conf.Tries; i++ { skywireConn := v.pingConns[conf.PK].conn @@ -982,11 +980,30 @@ func (v *Visor) Ping(conf PingConfig) ([]time.Duration, error) { PingPk: conf.PK, Data: data, } - b, err := json.Marshal(msg) + ping, err := json.Marshal(msg) + if err != nil { + return latencies, err + } + pingSizeMsg := PingSizeMsg{ + Size: len(ping), + } + size, err := json.Marshal(pingSizeMsg) + if err != nil { + return latencies, err + } + _, err = skywireConn.Write(size) if err != nil { return latencies, err } - _, err = skywireConn.Write(b) + + buf := make([]byte, 32*1024) + _, err = skywireConn.Read(buf) + if err != nil { + if !errors.Is(err, io.EOF) { + return latencies, err + } + } + _, err = skywireConn.Write(ping) if err != nil { return latencies, err } diff --git a/pkg/visor/init.go b/pkg/visor/init.go index c56cb7250..f67264880 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -622,8 +622,31 @@ func handlePingConn(log *logging.Logger, remoteConn net.Conn, v *Visor) { } return } + var size PingSizeMsg + err = json.Unmarshal(buf[:n], &size) + if err != nil { + log.WithError(err).Error("Failed to unmarshal json") + return + } + + _, err = remoteConn.Write([]byte("ok")) + if err != nil { + log.WithError(err).Error("Failed to write message") + return + } + var ping []byte + for len(ping) != size.Size { + n, err = remoteConn.Read(buf) + if err != nil { + if !errors.Is(err, io.EOF) { + log.WithError(err).Error("Failed to read packet") + } + return + } + ping = append(ping, buf[:n]...) + } var msg PingMsg - err = json.Unmarshal(buf[:n], &msg) + err = json.Unmarshal(ping, &msg) if err != nil { log.WithError(err).Error("Failed to unmarshal json") return