diff --git a/Char b/Char new file mode 100644 index 000000000..e69de29bb diff --git a/internal/vpn/os_client_linux.go b/internal/vpn/os_client_linux.go index 95a8ce708..de7f96f28 100644 --- a/internal/vpn/os_client_linux.go +++ b/internal/vpn/os_client_linux.go @@ -46,7 +46,8 @@ func DefaultNetworkGateway() (net.IP, error) { var setupClientOnce sync.Once -func setupClientSysPrivileges() (suid int, err error) { +func setupClientSysPrivileges() (int, error) { + var err error setupClientOnce.Do(func() { var caps capability.Capabilities @@ -64,15 +65,18 @@ func setupClientSysPrivileges() (suid int, err error) { // set `CAP_NET_ADMIN` capability to needed caps sets. caps.Set(capability.CAPS|capability.BOUNDS|capability.AMBIENT, capability.CAP_NET_ADMIN) - if e := caps.Apply(capability.CAPS | capability.BOUNDS | capability.AMBIENT); e != nil { - err = fmt.Errorf("failed to apply capabilties: %w", e) + err = caps.Apply(capability.CAPS | capability.BOUNDS | capability.AMBIENT) + if err != nil { + err = fmt.Errorf("failed to apply capabilties: %w", err) + return } // let child process keep caps sets from the parent, so we may do calls to // system utilities with these caps. - if e := unix.Prctl(unix.PR_SET_KEEPCAPS, 1, 0, 0, 0); e != nil { - err = fmt.Errorf("failed to set PR_SET_KEEPCAPS: %w", e) + err = unix.Prctl(unix.PR_SET_KEEPCAPS, 1, 0, 0, 0) + if err != nil { + err = fmt.Errorf("failed to set PR_SET_KEEPCAPS: %w", err) return } }) diff --git a/pkg/servicedisc/autoconnect.go b/pkg/servicedisc/autoconnect.go new file mode 100644 index 000000000..0012d9fc7 --- /dev/null +++ b/pkg/servicedisc/autoconnect.go @@ -0,0 +1,124 @@ +package servicedisc + +import ( + "context" + "time" + + "github.com/skycoin/dmsg/cipher" + "github.com/skycoin/skycoin/src/util/logging" + + "github.com/skycoin/skywire/internal/netutil" + "github.com/skycoin/skywire/pkg/snet/directtp/tptypes" + "github.com/skycoin/skywire/pkg/transport" +) + +const ( + // PublicServiceDelay defines a delay before adding transports to public services. + PublicServiceDelay = 10 * time.Second + + fetchServicesDelay = 2 * time.Second +) + +// ConnectFn provides a way to connect to remote service +type ConnectFn func(context.Context, cipher.PubKey) error + +// Autoconnector continuously tries to connect to services +type Autoconnector interface { + Run(context.Context) error +} + +type autoconnector struct { + client *HTTPClient + maxConns int + log *logging.Logger + tm *transport.Manager +} + +// MakeConnector returns a new connector that will try to connect to at most maxConns +// services +func MakeConnector(conf Config, maxConns int, tm *transport.Manager, log *logging.Logger) Autoconnector { + connector := &autoconnector{} + connector.client = NewClient(log, conf) + connector.maxConns = maxConns + connector.log = log + connector.tm = tm + return connector +} + +// Run implements Autoconnector interface +func (a *autoconnector) Run(ctx context.Context) error { + for { + time.Sleep(PublicServiceDelay) + a.log.Infof("Fetching public visors") + addresses, err := a.fetchPubAddresses(ctx) + if err != nil { + a.log.Errorf("Cannot fetch public services: %s", err) + } + + tps := a.updateTransports() + absent := a.filterDuplicates(addresses, tps) + for _, pk := range absent { + a.log.WithField("pk", pk).Infoln("Adding transport to public visor") + logger := a.log.WithField("pk", pk).WithField("type", tptypes.STCPR) + if _, err := a.tm.SaveTransport(ctx, pk, tptypes.STCPR, transport.LabelAutomatic); err != nil { + logger.WithError(err).Warnln("Failed to add transport to public visor") + continue + } + logger.Infoln("Added transport to public visor") + } + } +} + +// Remove all inactive automatic transports and return all active +// automatic transports +func (a *autoconnector) updateTransports() []*transport.ManagedTransport { + tps := a.tm.GetTransportsByLabel(transport.LabelAutomatic) + var tpsActive []*transport.ManagedTransport + for _, tr := range tps { + if !tr.IsUp() { + a.tm.DeleteTransport(tr.Entry.ID) + } else { + tpsActive = append(tpsActive, tr) + } + } + return tpsActive +} + +func (a *autoconnector) fetchPubAddresses(ctx context.Context) ([]cipher.PubKey, error) { + retrier := netutil.NewRetrier(fetchServicesDelay, 0, 2) + var services []Service + fetch := func() (err error) { + // "return" services up from the closure + services, err = a.client.Services(ctx, a.maxConns) + if err != nil { + return err + } + return nil + } + if err := retrier.Do(fetch); err != nil { + return nil, err + } + var pks []cipher.PubKey + for _, service := range services { + pks = append(pks, service.Addr.PubKey()) + } + return pks, nil +} + +// return public keys from pks that are absent in given list of transports +func (a *autoconnector) filterDuplicates(pks []cipher.PubKey, trs []*transport.ManagedTransport) []cipher.PubKey { + var absent []cipher.PubKey + for _, pk := range pks { + found := false + for _, tr := range trs { + if tr.Entry.HasEdge(pk) { + found = true + break + } + } + if !found { + absent = append(absent, pk) + } + } + return absent +} diff --git a/pkg/servicedisc/client.go b/pkg/servicedisc/client.go index 407b3edbc..553bc7a62 100644 --- a/pkg/servicedisc/client.go +++ b/pkg/servicedisc/client.go @@ -8,7 +8,8 @@ import ( "fmt" "io/ioutil" "net/http" - "strings" + "net/url" + "strconv" "sync" "time" @@ -16,12 +17,18 @@ import ( "github.com/skycoin/dmsg/cipher" "github.com/skycoin/skywire/internal/httpauth" + nu "github.com/skycoin/skywire/internal/netutil" "github.com/skycoin/skywire/pkg/util/buildinfo" + "github.com/skycoin/skywire/pkg/util/netutil" ) -var ( - // ErrVisorUnreachable is returned when visor is unreachable. - ErrVisorUnreachable = errors.New("visor is unreachable") +// ErrVisorUnreachable is returned when visor is not reachable +var ErrVisorUnreachable = errors.New("visor is unreachable") + +const ( + updateRetryDelay = 5 * time.Second + discServiceTypeParam = "type" + discServiceQtyParam = "quantity" ) // Config configures the HTTPClient. @@ -62,14 +69,22 @@ func NewClient(log logrus.FieldLogger, conf Config) *HTTPClient { } } -func (c *HTTPClient) addr(path string, sType string) string { - addr := c.conf.DiscAddr + path - - if sType != "" { - addr += "?type=" + sType +func (c *HTTPClient) addr(path, serviceType string, quantity int) (string, error) { + addr := c.conf.DiscAddr + url, err := url.Parse(addr) + if err != nil { + return "", errors.New("invalid service discovery address in config: " + addr) } - - return addr + url.Path = path + q := url.Query() + if serviceType != "" { + q.Set(discServiceTypeParam, serviceType) + } + if quantity > 1 { + q.Set(discServiceQtyParam, strconv.Itoa(quantity)) + } + url.RawQuery = q.Encode() + return url.String(), nil } var ( @@ -98,8 +113,12 @@ func (c *HTTPClient) Auth(ctx context.Context) (*httpauth.Client, error) { } // Services calls 'GET /api/services'. -func (c *HTTPClient) Services(ctx context.Context) (out []Service, err error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.addr("/api/services", c.entry.Type), nil) +func (c *HTTPClient) Services(ctx context.Context, quantity int) (out []Service, err error) { + url, err := c.addr("/api/services", c.entry.Type, quantity) + if err != nil { + return nil, err + } + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return nil, err } @@ -125,30 +144,61 @@ func (c *HTTPClient) Services(ctx context.Context) (out []Service, err error) { return nil, &hErr } err = json.NewDecoder(resp.Body).Decode(&out) - return + return out, err } -// UpdateEntry calls 'POST /api/services'. -func (c *HTTPClient) UpdateEntry(ctx context.Context) (*Service, error) { +// UpdateEntry calls 'POST /api/services', retrieves the entry +// and updates local field with the result +// if there are no ip addresses in the entry it also tries to fetch those +// from local config +func (c *HTTPClient) UpdateEntry(ctx context.Context) error { + c.entryMx.Lock() + defer c.entryMx.Unlock() + if c.conf.Type == ServiceTypeVisor && len(c.entry.LocalIPs) == 0 { + ips, err := netutil.DefaultNetworkInterfaceIPs() + if err != nil { + return err + } + c.entry.LocalIPs = make([]string, 0, len(ips)) + for _, ip := range ips { + c.entry.LocalIPs = append(c.entry.LocalIPs, ip.String()) + } + } + c.entry.Addr = NewSWAddr(c.conf.PK, c.conf.Port) // Just in case. + + entry, err := c.postEntry(ctx) + if err != nil { + return err + } + c.entry = entry + return nil +} + +// postEntry calls 'POST /api/services' and sends current service entry +// as the payload +func (c *HTTPClient) postEntry(ctx context.Context) (Service, error) { auth, err := c.Auth(ctx) if err != nil { - return nil, err + return Service{}, err } - c.entry.Addr = NewSWAddr(c.conf.PK, c.conf.Port) // Just in case. + url, err := c.addr("/api/services", "", 1) + if err != nil { + return Service{}, nil + } raw, err := json.Marshal(&c.entry) if err != nil { - return nil, err + return Service{}, err } - req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.addr("/api/services", ""), bytes.NewReader(raw)) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(raw)) if err != nil { - return nil, err + return Service{}, err } resp, err := auth.Do(req) if err != nil { - return nil, err + return Service{}, err } if resp != nil { defer func() { @@ -161,19 +211,20 @@ func (c *HTTPClient) UpdateEntry(ctx context.Context) (*Service, error) { if resp.StatusCode != http.StatusOK { respBody, err := ioutil.ReadAll(resp.Body) if err != nil { - return nil, fmt.Errorf("read response body: %w", err) + return Service{}, fmt.Errorf("read response body: %w", err) } var hErr HTTPResponse if err = json.Unmarshal(respBody, &hErr); err != nil { - return nil, err + return Service{}, err } - return nil, hErr.Error + return Service{}, hErr.Error } - err = json.NewDecoder(resp.Body).Decode(&c.entry) - return &c.entry, err + var entry Service + err = json.NewDecoder(resp.Body).Decode(&entry) + return entry, err } // DeleteEntry calls 'DELETE /api/services/{entry_addr}'. @@ -183,7 +234,12 @@ func (c *HTTPClient) DeleteEntry(ctx context.Context) (err error) { return err } - req, err := http.NewRequestWithContext(ctx, http.MethodDelete, c.addr("/api/services/"+c.entry.Addr.String(), c.entry.Type), nil) + url, err := c.addr("/api/services/"+c.entry.Addr.String(), c.entry.Type, 1) + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil) if err != nil { return err } @@ -214,51 +270,54 @@ func (c *HTTPClient) DeleteEntry(ctx context.Context) (err error) { func (c *HTTPClient) UpdateLoop(ctx context.Context, updateInterval time.Duration) { defer func() { _ = c.DeleteEntry(context.Background()) }() //nolint:errcheck - update := func() { - for { - c.entryMx.Lock() - entry, err := c.UpdateEntry(ctx) - c.entryMx.Unlock() - - if err != nil { - if strings.Contains(err.Error(), ErrVisorUnreachable.Error()) { - c.log.Errorf("Unable to register visor as public trusted as it's unreachable from WAN") - return - } - - c.log.WithError(err).Warn("Failed to update service entry in discovery. Retrying...") - time.Sleep(time.Second * 10) // TODO(evanlinjin): Exponential backoff. - continue - } - - c.entryMx.Lock() - j, err := json.Marshal(entry) - c.entryMx.Unlock() - - if err != nil { - panic(err) - } + ticker := time.NewTicker(updateInterval) + defer ticker.Stop() - c.log.WithField("entry", string(j)).Debug("Entry updated.") + for { + if err := c.Update(ctx); errors.Is(err, ErrVisorUnreachable) { + return + } + c.entryMx.Lock() + j, err := json.Marshal(c.entry) + c.entryMx.Unlock() + logger := c.log.WithField("entry", string(j)) + if err == nil { + logger.Debug("Entry updated.") + } else { + logger.Errorf("Service returned malformed entry, error: %s", err) return } - } - - // Run initial update. - update() - ticker := time.NewTicker(updateInterval) - for { select { case <-ctx.Done(): - ticker.Stop() return case <-ticker.C: - update() } } } +// Update calls 'POST /api/services' to update service discovery entry +// it performs exponential backoff in case of errors during update, unless +// the error is unrecoverable from +func (c *HTTPClient) Update(ctx context.Context) error { + retrier := nu.NewRetrier(updateRetryDelay, 0, 2).WithErrWhitelist(ErrVisorUnreachable) + run := func() error { + err := c.UpdateEntry(ctx) + + if errors.Is(err, ErrVisorUnreachable) { + c.log.Errorf("Unable to register visor as public trusted as it's unreachable from WAN") + return err + } + + if err != nil { + c.log.WithError(err).Warn("Failed to update service entry in discovery. Retrying...") + return err + } + return nil + } + return retrier.Do(run) +} + // UpdateStats updates the stats field of the internal service entry state. func (c *HTTPClient) UpdateStats(stats Stats) { c.entryMx.Lock() diff --git a/pkg/servicedisc/types.go b/pkg/servicedisc/types.go index 66641ff8f..ab838d4c9 100644 --- a/pkg/servicedisc/types.go +++ b/pkg/servicedisc/types.go @@ -104,11 +104,12 @@ type Stats struct { // Service represents a service entry in service-discovery. type Service struct { - Addr SWAddr `json:"address"` - Type string `json:"type"` - Stats *Stats `json:"stats,omitempty"` // TODO: Have this implemented. - Geo *GeoLocation `json:"geo,omitempty"` - Version string `json:"version,omitempty"` + Addr SWAddr `json:"address"` + Type string `json:"type"` + Stats *Stats `json:"stats,omitempty"` + Geo *GeoLocation `json:"geo,omitempty"` + Version string `json:"version,omitempty"` + LocalIPs []string `json:"local_ips,omitempty"` } // MarshalBinary implements encoding.BinaryMarshaller diff --git a/pkg/snet/network.go b/pkg/snet/network.go index 094bc5aa5..33d5dd07b 100644 --- a/pkg/snet/network.go +++ b/pkg/snet/network.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "net" - "strconv" "strings" "sync" "time" @@ -15,7 +14,6 @@ import ( "github.com/skycoin/dmsg/disc" "github.com/skycoin/skycoin/src/util/logging" - "github.com/skycoin/skywire/pkg/app/appdisc" "github.com/skycoin/skywire/pkg/app/appevent" "github.com/skycoin/skywire/pkg/snet/arclient" "github.com/skycoin/skywire/pkg/snet/directtp" @@ -75,8 +73,6 @@ type Config struct { SecKey cipher.SecKey ARClient arclient.APIClient NetworkConfigs NetworkConfigs - ServiceDisc appdisc.Factory - PublicTrusted bool } // NetworkConfigs represents all network configs. @@ -93,11 +89,10 @@ type NetworkClients struct { // Network represents a network between nodes in Skywire. type Network struct { - conf Config - netsMu sync.RWMutex - nets map[string]struct{} // networks to be used with transports - clients NetworkClients - visorUpdater appdisc.Updater + conf Config + netsMu sync.RWMutex + nets map[string]struct{} // networks to be used with transports + clients NetworkClients onNewNetworkTypeMu sync.Mutex onNewNetworkType func(netType string) @@ -226,10 +221,6 @@ func (n *Network) Init() error { if err := client.Serve(); err != nil { return fmt.Errorf("failed to initiate 'stcpr': %w", err) } - - if n.conf.PublicTrusted { - go n.registerPublicTrusted(client) - } } else { log.Infof("No config found for stcpr") } @@ -246,33 +237,6 @@ func (n *Network) Init() error { return nil } -func (n *Network) registerPublicTrusted(client directtp.Client) { - log.Infof("Trying to register visor as public trusted") - - la, err := client.LocalAddr() - if err != nil { - log.WithError(err).Errorf("Failed to get STCPR local addr") - return - } - - _, portStr, err := net.SplitHostPort(la.String()) - if err != nil { - log.WithError(err).Errorf("Failed to extract port from addr %v", la.String()) - return - } - - port, err := strconv.Atoi(portStr) - if err != nil { - log.WithError(err).Errorf("Failed to convert port to int") - return - } - - n.visorUpdater = n.conf.ServiceDisc.VisorUpdater(uint16(port)) - n.visorUpdater.Start() - - log.Infof("Sent request to register visor as public trusted") -} - // OnNewNetworkType sets callback to be called when new network type is ready. func (n *Network) OnNewNetworkType(callback func(netType string)) { n.onNewNetworkTypeMu.Lock() @@ -293,10 +257,6 @@ func (n *Network) Close() error { n.netsMu.Lock() defer n.netsMu.Unlock() - if n.visorUpdater != nil { - n.visorUpdater.Stop() - } - wg := new(sync.WaitGroup) var dmsgErr error @@ -363,18 +323,23 @@ func (n *Network) TransportNetworks() []string { func (n *Network) Dmsg() *dmsg.Client { return n.clients.DmsgC } // STcp returns the underlying stcp.Client. -func (n *Network) STcp() directtp.Client { - return n.clients.Direct[tptypes.STCP] +func (n *Network) STcp() (directtp.Client, bool) { + return n.getClient(tptypes.STCP) } // STcpr returns the underlying stcpr.Client. -func (n *Network) STcpr() directtp.Client { - return n.clients.Direct[tptypes.STCPR] +func (n *Network) STcpr() (directtp.Client, bool) { + return n.getClient(tptypes.STCPR) } // SUdpH returns the underlying sudph.Client. -func (n *Network) SUdpH() directtp.Client { - return n.clients.Direct[tptypes.SUDPH] +func (n *Network) SUdpH() (directtp.Client, bool) { + return n.getClient(tptypes.SUDPH) +} + +func (n *Network) getClient(tpType string) (directtp.Client, bool) { + c, ok := n.clients.Direct[tpType] + return c, ok } // Dial dials a visor by its public key and returns a connection. diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index 50cd52a55..d664848ae 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -21,11 +21,6 @@ import ( "github.com/skycoin/skywire/pkg/snet/snettest" ) -const ( - // TrustedVisorsDelay defines a delay before adding transports to trusted visors. - TrustedVisorsDelay = 5 * time.Second -) - // TPCloseCallback triggers after a session is closed. type TPCloseCallback func(network, addr string) @@ -33,7 +28,6 @@ type TPCloseCallback func(network, addr string) type ManagerConfig struct { PubKey cipher.PubKey SecKey cipher.SecKey - DefaultVisors []cipher.PubKey // Visors to automatically connect to DiscoveryClient DiscoveryClient LogStore LogStore } @@ -260,6 +254,35 @@ func (tm *Manager) acceptTransport(ctx context.Context, lis *snet.Listener) erro return nil } +// GetTransport gets transport entity to the given remote +func (tm *Manager) GetTransport(remote cipher.PubKey, tpType string) (*ManagedTransport, error) { + tm.mx.RLock() + defer tm.mx.RUnlock() + if !snet.IsKnownNetwork(tpType) { + return nil, snet.ErrUnknownNetwork + } + + tpID := tm.tpIDFromPK(remote, tpType) + t, ok := tm.tps[tpID] + if !ok { + return nil, fmt.Errorf("transport to %s of type %s not found", remote, tpType) + } + return t, nil +} + +// GetTransportsByLabel returns all transports that have given label +func (tm *Manager) GetTransportsByLabel(label Label) []*ManagedTransport { + tm.mx.RLock() + defer tm.mx.RUnlock() + var trs []*ManagedTransport + for _, tr := range tm.tps { + if tr.Entry.Label == label { + trs = append(trs, tr) + } + } + return trs +} + // SaveTransport begins to attempt to establish data transports to the given 'remote' visor. func (tm *Manager) SaveTransport(ctx context.Context, remote cipher.PubKey, tpType string, label Label) (*ManagedTransport, error) { diff --git a/pkg/util/netutil/netutil.go b/pkg/util/netutil/net.go similarity index 74% rename from pkg/util/netutil/netutil.go rename to pkg/util/netutil/net.go index 18f5cd4d6..5d046f5d9 100644 --- a/pkg/util/netutil/netutil.go +++ b/pkg/util/netutil/net.go @@ -69,3 +69,24 @@ func localNetworkInterfaceIPs(ifcName string) ([]net.IP, []net.IP, error) { return ips, ifcIPs, nil } + +// IsPublicIP returns true if the provided IP is public. +// Obtained from: https://stackoverflow.com/questions/41670155/get-public-ip-in-golang +func IsPublicIP(IP net.IP) bool { + if IP.IsLoopback() || IP.IsLinkLocalMulticast() || IP.IsLinkLocalUnicast() { + return false + } + if ip4 := IP.To4(); ip4 != nil { + switch { + case ip4[0] == 10: + return false + case ip4[0] == 172 && ip4[1] >= 16 && ip4[1] <= 31: + return false + case ip4[0] == 192 && ip4[1] == 168: + return false + default: + return true + } + } + return false +} diff --git a/pkg/util/netutil/net_darwin.go b/pkg/util/netutil/net_darwin.go new file mode 100644 index 000000000..4c5a0349f --- /dev/null +++ b/pkg/util/netutil/net_darwin.go @@ -0,0 +1,26 @@ +//+build darwin + +package netutil + +import ( + "bytes" + "fmt" + "os/exec" +) + +const ( + defaultNetworkInterfaceCMD = "netstat -rn | sed -n '/Internet/,/Internet6/p' | grep default | awk '{print $4}'" +) + +// DefaultNetworkInterface fetches default network interface name. +func DefaultNetworkInterface() (string, error) { + outputBytes, err := exec.Command("sh", "-c", defaultNetworkInterfaceCMD).Output() + if err != nil { + return "", fmt.Errorf("error running command %s: %w", defaultNetworkInterfaceCMD, err) + } + + // just in case + outputBytes = bytes.TrimRight(outputBytes, "\n") + + return string(outputBytes), nil +} diff --git a/pkg/util/netutil/net_linux.go b/pkg/util/netutil/net_linux.go new file mode 100644 index 000000000..5b770abdf --- /dev/null +++ b/pkg/util/netutil/net_linux.go @@ -0,0 +1,40 @@ +//+build linux + +package netutil + +import ( + "bytes" + "fmt" + "net" + "os/exec" +) + +const ( + defaultNetworkInterfaceCMD = "ip r | awk '$1 == \"default\" {print $5}'" +) + +// DefaultNetworkInterface fetches default network interface name. +func DefaultNetworkInterface() (string, error) { + outputBytes, err := exec.Command("sh", "-c", defaultNetworkInterfaceCMD).Output() + if err != nil { + return "", fmt.Errorf("error running command %s: %w", defaultNetworkInterfaceCMD, err) + } + + // just in case + outputBytes = bytes.TrimRight(outputBytes, "\n") + + return string(outputBytes), nil +} + +// DefaultNetworkInterfaceIPs returns IP addresses for the default network interface +func DefaultNetworkInterfaceIPs() ([]net.IP, error) { + networkIfc, err := DefaultNetworkInterface() + if err != nil { + return nil, fmt.Errorf("failed to get default network interface: %w", err) + } + localIPs, err := NetworkInterfaceIPs(networkIfc) + if err != nil { + return nil, fmt.Errorf("failed to get IPs of %s: %w", networkIfc, err) + } + return localIPs, nil +} diff --git a/pkg/util/netutil/netutil_windows.go b/pkg/util/netutil/net_windows.go similarity index 78% rename from pkg/util/netutil/netutil_windows.go rename to pkg/util/netutil/net_windows.go index 22dc23eac..c5f6e9410 100644 --- a/pkg/util/netutil/netutil_windows.go +++ b/pkg/util/netutil/net_windows.go @@ -4,5 +4,5 @@ package netutil // DefaultNetworkInterface fetches default network interface name. func DefaultNetworkInterface() (string, error) { - return "", nil + return "", errServerMethodsNotSupported } diff --git a/pkg/util/netutil/netutil_darwin.go b/pkg/util/netutil/netutil_darwin.go deleted file mode 100644 index d4fc8c5b6..000000000 --- a/pkg/util/netutil/netutil_darwin.go +++ /dev/null @@ -1,25 +0,0 @@ -//+build darwin - -package netutil - -import ( - "bytes" - - "github.com/skycoin/skywire/pkg/util/osutil" -) - -const ( - activeNetworkInterfaceCMD = "netstat -rn | sed -n '/Internet/,/Internet6/p' | grep default | awk '{print $4}'" -) - -// DefaultNetworkInterface fetches default network interface name. -func DefaultNetworkInterface() (string, error) { - stdout, err := osutil.RunWithResult("sh", "-c", activeNetworkInterfaceCMD) - if err != nil { - return "", err - } - - stdout = bytes.TrimSpace(stdout) - - return string(stdout), nil -} diff --git a/pkg/util/netutil/netutil_linux.go b/pkg/util/netutil/netutil_linux.go deleted file mode 100644 index bcaebf347..000000000 --- a/pkg/util/netutil/netutil_linux.go +++ /dev/null @@ -1,26 +0,0 @@ -//+build linux - -package netutil - -import ( - "bytes" - - "github.com/skycoin/skywire/pkg/util/osutil" -) - -const ( - defaultNetworkInterfaceCMD = "ip r | awk '$1 == \"default\" {print $5}'" -) - -// DefaultNetworkInterface fetches default network interface name. -func DefaultNetworkInterface() (string, error) { - outputBytes, err := osutil.RunWithResult("sh", "-c", defaultNetworkInterfaceCMD) - if err != nil { - return "", err - } - - // just in case - outputBytes = bytes.TrimRight(outputBytes, "\n") - - return string(outputBytes), nil -} diff --git a/pkg/visor/api.go b/pkg/visor/api.go index 6fa4aead6..0b05465ea 100644 --- a/pkg/visor/api.go +++ b/pkg/visor/api.go @@ -105,16 +105,6 @@ func (v *Visor) Overview() (*Overview, error) { return true }) - defaultNetworkIfc, err := netutil.DefaultNetworkInterface() - if err != nil { - return nil, fmt.Errorf("failed to get default network interface: %w", err) - } - - localIPs, err := netutil.NetworkInterfaceIPs(defaultNetworkIfc) - if err != nil { - return nil, fmt.Errorf("failed to get IPs of interface %s: %w", defaultNetworkIfc, err) - } - overview := &Overview{ PubKey: v.conf.PK, BuildInfo: buildinfo.Get(), @@ -124,6 +114,11 @@ func (v *Visor) Overview() (*Overview, error) { RoutesCount: v.router.RoutesCount(), } + localIPs, err := netutil.DefaultNetworkInterfaceIPs() + if err != nil { + return nil, err + } + if len(localIPs) > 0 { // should be okay to have the first one, in the case of // active network interface, there's usually just a single IP diff --git a/pkg/visor/init.go b/pkg/visor/init.go index ef629103e..e1e325c5e 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -6,6 +6,7 @@ import ( "fmt" "net" "net/http" + "strconv" "sync" "time" @@ -25,6 +26,7 @@ import ( "github.com/skycoin/skywire/pkg/app/launcher" "github.com/skycoin/skywire/pkg/routefinder/rfclient" "github.com/skycoin/skywire/pkg/router" + "github.com/skycoin/skywire/pkg/servicedisc" "github.com/skycoin/skywire/pkg/setup/setupclient" "github.com/skycoin/skywire/pkg/skyenv" "github.com/skycoin/skywire/pkg/snet" @@ -32,6 +34,7 @@ import ( "github.com/skycoin/skywire/pkg/snet/directtp/tptypes" "github.com/skycoin/skywire/pkg/transport" "github.com/skycoin/skywire/pkg/transport/tpdclient" + "github.com/skycoin/skywire/pkg/util/netutil" "github.com/skycoin/skywire/pkg/util/updater" "github.com/skycoin/skywire/pkg/visor/visorconfig" vinit "github.com/skycoin/skywire/pkg/visor/visorinit" @@ -75,8 +78,10 @@ var ( hvs vinit.Module // Uptime tracker ut vinit.Module - // Trusted visors - trv vinit.Module + // Public visors: automatically establish connections to public visors + pvs vinit.Module + // Public visor: advertise current visor as public + pv vinit.Module // hypervisor module hv vinit.Module // dmsg ctrl @@ -106,10 +111,10 @@ func registerModules(logger *logging.MasterLogger) { cli = maker("cli", initCLI) hvs = maker("hypervisors", initHypervisors, &sn) ut = maker("uptime_tracker", initUptimeTracker) - trv = maker("trusted_visors", initTrustedVisors, &tr) - + pv = maker("public_visors", initPublicVisors, &tr) + pvs = maker("public_visor", initPublicVisor, &sn, &ar, &disc) vis = vinit.MakeModule("visor", vinit.DoNothing, logger, &up, &ebc, &ar, &disc, &sn, &pty, - &tr, &rt, &launch, &cli, &hvs, &ut, &trv, &dmsgCtrl) + &tr, &rt, &launch, &cli, &hvs, &ut, &pv, &pvs, &dmsgCtrl) hv = maker("hypervisor", initHypervisor, &vis) } @@ -183,8 +188,6 @@ func initSNet(ctx context.Context, v *Visor, log *logging.Logger) error { SecKey: v.conf.SK, ARClient: v.arClient, NetworkConfigs: nc, - ServiceDisc: v.serviceDisc, - PublicTrusted: v.conf.PublicTrustedVisor, } n, err := snet.New(conf, v.ebc) @@ -255,7 +258,6 @@ func initTransport(ctx context.Context, v *Visor, log *logging.Logger) error { tpMConf := transport.ManagerConfig{ PubKey: v.conf.PK, SecKey: v.conf.SK, - DefaultVisors: conf.TrustedVisors, DiscoveryClient: tpdC, LogStore: logS, } @@ -565,28 +567,83 @@ func initUptimeTracker(ctx context.Context, v *Visor, log *logging.Logger) error return nil } -func initTrustedVisors(ctx context.Context, v *Visor, log *logging.Logger) error { - const trustedVisorsTransportType = tptypes.STCPR +// this service is not considered critical and always returns true +// advertise this visor as public in service discovery +func initPublicVisor(_ context.Context, v *Visor, log *logging.Logger) error { + if !v.conf.IsPublic { + return nil + } - go func() { - time.Sleep(transport.TrustedVisorsDelay) - for _, pk := range v.tpM.Conf.DefaultVisors { - v.log.WithField("pk", pk).Infof("Adding trusted visor") - - if _, err := v.tpM.SaveTransport(context.Background(), pk, trustedVisorsTransportType, transport.LabelAutomatic); err != nil { - v.log. - WithError(err). - WithField("pk", pk). - WithField("type", trustedVisorsTransportType). - Warnf("Failed to add transport to trusted visor via") - } else { - v.log. - WithField("pk", pk). - WithField("type", trustedVisorsTransportType). - Infof("Added transport to trusted visor") - } + // retrieve interface IPs and check if at least one is public + defaultIPs, err := netutil.DefaultNetworkInterfaceIPs() + if err != nil { + return nil + } + var found bool + for _, IP := range defaultIPs { + if netutil.IsPublicIP(IP) { + found = true + break } - }() + } + if !found { + return nil + } + + // todo: consider moving this to transport into some helper function + stcpr, ok := v.net.STcpr() + if !ok { + return nil + } + la, err := stcpr.LocalAddr() + if err != nil { + log.WithError(err).Errorln("Failed to get STCPR local addr") + return nil + } + _, portStr, err := net.SplitHostPort(la.String()) + if err != nil { + log.WithError(err).Errorf("Failed to extract port from addr %v", la.String()) + return nil + } + port, err := strconv.Atoi(portStr) + if err != nil { + log.WithError(err).Errorf("Failed to convert port to int") + return nil + } + + visorUpdater := v.serviceDisc.VisorUpdater(uint16(port)) + visorUpdater.Start() + + v.log.Infof("Sent request to register visor as public") + v.pushCloseStack("visor updater", func() error { + visorUpdater.Stop() + return nil + }) + return nil +} + +func initPublicVisors(ctx context.Context, v *Visor, log *logging.Logger) error { + if !v.conf.Transport.AutoconnectPublic { + return nil + } + proxyDisc := v.conf.Launcher.Discovery.ServiceDisc + if proxyDisc == "" { + proxyDisc = skyenv.DefaultServiceDiscAddr + } + + // todo: refactor appdisc: split connecting to services in appdisc and + // advertising oneself as a service. Currently, config is tailored to + // advertising oneself and requires things like port that are not used + // in connecting to services + conf := servicedisc.Config{ + Type: servicedisc.ServiceTypeVisor, + PK: v.conf.PK, + SK: v.conf.SK, + Port: uint16(0), + DiscAddr: proxyDisc, + } + connector := servicedisc.MakeConnector(conf, 5, v.tpM, log) + go connector.Run(ctx) //nolint:errcheck return nil } diff --git a/pkg/visor/visor_test.go b/pkg/visor/visor_test.go index 7182f3b74..72efe2110 100644 --- a/pkg/visor/visor_test.go +++ b/pkg/visor/visor_test.go @@ -55,13 +55,17 @@ func TestNewVisor(t *testing.T) { Discovery: skyenv.DefaultDmsgDiscAddr, SessionsCount: 10, }, + STCP: &snet.STCPConfig{ + PKTable: nil, + LocalAddr: "localhost:7777", + }, Transport: &visorconfig.V1Transport{ Discovery: srv.URL, AddressResolver: skyenv.DefaultAddressResolverAddr, LogStore: &visorconfig.V1LogStore{ Type: visorconfig.MemoryLogStore, }, - TrustedVisors: nil, + AutoconnectPublic: false, }, Routing: &visorconfig.V1Routing{ SetupNodes: nil, @@ -75,6 +79,10 @@ func TestNewVisor(t *testing.T) { {Name: "foo", Port: 1}, {Name: "bar", AutoStart: true, Port: 2}, }, + Discovery: &visorconfig.V1AppDisc{ + UpdateInterval: 0, + ServiceDisc: "", + }, }, } diff --git a/pkg/visor/visorconfig/parse.go b/pkg/visor/visorconfig/parse.go index 5306e4782..1c915da06 100644 --- a/pkg/visor/visorconfig/parse.go +++ b/pkg/visor/visorconfig/parse.go @@ -35,6 +35,8 @@ func Parse(log *logging.MasterLogger, path string, raw []byte) (*V1, error) { switch cc.Version { // parse any v1-compatible version with v1 parse procedure + case V111Name: + fallthrough case V110Name: fallthrough case V100Name: @@ -105,7 +107,6 @@ func parseV0(cc *Common, raw []byte) (*V1, error) { conf.Transport.LogStore = old.Transport.LogStore } - conf.Transport.TrustedVisors = old.TrustedVisors if old.Routing != nil { conf.Routing = old.Routing } diff --git a/pkg/visor/visorconfig/v1.go b/pkg/visor/visorconfig/v1.go index e473ecd4d..59ec1fdd9 100644 --- a/pkg/visor/visorconfig/v1.go +++ b/pkg/visor/visorconfig/v1.go @@ -21,8 +21,15 @@ const V100Name = "v1.0.0" // Added MinHops field to V1Routing section of config const V110Name = "v1.1.0" +// V111Name is the semantic version string for v1.1.1. +// Removed public_trusted_visor field from root section +// Removed trusted_visors field from transport section +// Added is_public field to root section +// Added public_autoconnect field to transport section +const V111Name = "v1.1.1" + // V1Name is the semantic version string for the most recent version of V1. -const V1Name = V110Name +const V1Name = V111Name // V1 is visor config v1.0.0 type V1 struct { @@ -43,8 +50,7 @@ type V1 struct { LogLevel string `json:"log_level"` ShutdownTimeout Duration `json:"shutdown_timeout,omitempty"` // time value, examples: 10s, 1m, etc RestartCheckDelay Duration `json:"restart_check_delay,omitempty"` // time value, examples: 10s, 1m, etc - - PublicTrustedVisor bool `json:"public_trusted_visor,omitempty"` + IsPublic bool `json:"is_public"` Hypervisor *hypervisorconfig.Config `json:"hypervisor,omitempty"` } @@ -59,10 +65,10 @@ type V1Dmsgpty struct { // V1Transport defines a transport config. type V1Transport struct { - Discovery string `json:"discovery"` - AddressResolver string `json:"address_resolver"` - LogStore *V1LogStore `json:"log_store"` - TrustedVisors []cipher.PubKey `json:"trusted_visors"` + Discovery string `json:"discovery"` + AddressResolver string `json:"address_resolver"` + LogStore *V1LogStore `json:"log_store"` + AutoconnectPublic bool `json:"public_autoconnect"` } // V1LogStore configures a LogStore.