Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove heartbeat | Add Deregister #884

Merged
merged 17 commits into from
Sep 22, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/skywire-visor/commands/systray.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package commands

import (
"context"

"github.com/getlantern/systray"
"github.com/skycoin/skycoin/src/util/logging"

Expand Down
11 changes: 2 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V
github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/go-redis/redis v6.15.8+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
Expand Down Expand Up @@ -184,12 +185,7 @@ github.com/klauspost/reedsolomon v1.9.9/go.mod h1:O7yFFHiQwDR6b2t63KPUpccPtNdp5A
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
<<<<<<< HEAD
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
=======
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
>>>>>>> upstream/develop
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
Expand Down Expand Up @@ -474,11 +470,8 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201231184435-2d18734c6014/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
<<<<<<< HEAD
golang.org/x/sys v0.0.0-20210217105451-b926d437f341/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
=======
golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
>>>>>>> upstream/develop
golang.org/x/sys v0.0.0-20210217105451-b926d437f341/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210608053332-aa57babbf139 h1:C+AwYEtBp/VQwoLntUmQ/yx3MS9vmZaKNdw5eOpoQe8=
Expand Down
2 changes: 1 addition & 1 deletion internal/gui/gui_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const (
pngIconPath = "/Applications/Skywire.app/Contents/Resources/icon.png"
iconPath = "/Applications/Skywire.app/Contents/Resources/tray_icon.tiff"
deinstallerPath = "/Applications/Skywire.app/Contents/deinstaller"
appPath = "/Applications/Skywire.app"
appPath = "/Applications/Skywire.app"
)

func preReadIcon() error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/app/updatedisc/const.go → pkg/app/appdisc/const.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package updatedisc
package appdisc

// ChangeValue keys. Each key changes a different value for serviceUpdater.ChangeValue
const (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package updatedisc
package appdisc
jdknives marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"strconv"
"sync"
"time"

"github.com/skycoin/skywire/pkg/servicedisc"
)

// Updater updates the associated app discovery
// Updater update the associated app discovery
ersonp marked this conversation as resolved.
Show resolved Hide resolved
type Updater interface {

// Start starts the updater.
Expand All @@ -31,43 +30,28 @@ func (emptyUpdater) ChangeValue(name string, v []byte) error { return nil }

// serviceUpdater updates service-discovery entry of locally running App.
type serviceUpdater struct {
client *servicedisc.HTTPClient
interval time.Duration

cancel context.CancelFunc
wg sync.WaitGroup
client *servicedisc.HTTPClient
mu sync.Mutex
jdknives marked this conversation as resolved.
Show resolved Hide resolved
}

func (u *serviceUpdater) Start() {
u.mu.Lock()
defer u.mu.Unlock()

if u.cancel != nil {
ctx := context.Background()
if err := u.client.RegisterEntry(ctx); err != nil {
return
}

ctx, cancel := context.WithCancel(context.Background())
u.cancel = cancel

u.wg.Add(1)
go func() {
u.client.UpdateLoop(ctx, u.interval)
u.wg.Done()
}()
}

func (u *serviceUpdater) Stop() {
u.mu.Lock()
defer u.mu.Unlock()

if u.cancel == nil {
ctx := context.Background()
if err := u.client.DeleteEntry(ctx); err != nil {
return
}

u.cancel()
u.cancel = nil
u.wg.Wait()
}

func (u *serviceUpdater) ChangeValue(name string, v []byte) error {
Expand Down
22 changes: 7 additions & 15 deletions pkg/app/updatedisc/factory.go → pkg/app/appdisc/factory.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package updatedisc
package appdisc

import (
"time"

"github.com/sirupsen/logrus"
"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/skycoin/src/util/logging"
Expand All @@ -14,20 +12,16 @@ import (

// Factory creates appdisc.Updater instances based on the app name.
type Factory struct {
Log logrus.FieldLogger
PK cipher.PubKey
SK cipher.SecKey
UpdateInterval time.Duration
ServiceDisc string // Address of service-discovery
Log logrus.FieldLogger
PK cipher.PubKey
SK cipher.SecKey
ServiceDisc string // Address of service-discovery
}

func (f *Factory) setDefaults() {
if f.Log == nil {
f.Log = logging.MustGetLogger("appdisc")
}
if f.UpdateInterval == 0 {
f.UpdateInterval = skyenv.ServiceDiscUpdateInterval
}
if f.ServiceDisc == "" {
f.ServiceDisc = skyenv.DefaultServiceDiscAddr
}
Expand All @@ -49,8 +43,7 @@ func (f *Factory) VisorUpdater(port uint16) Updater {
}

return &serviceUpdater{
client: servicedisc.NewClient(f.Log, conf),
interval: f.UpdateInterval,
client: servicedisc.NewClient(f.Log, conf),
}
}

Expand Down Expand Up @@ -81,8 +74,7 @@ func (f *Factory) AppUpdater(conf appcommon.ProcConfig) (Updater, bool) {
switch conf.AppName {
case skyenv.VPNServerName:
jdknives marked this conversation as resolved.
Show resolved Hide resolved
return &serviceUpdater{
client: servicedisc.NewClient(log, getServiceDiscConf(conf, servicedisc.ServiceTypeVPN)),
interval: f.UpdateInterval,
client: servicedisc.NewClient(log, getServiceDiscConf(conf, servicedisc.ServiceTypeVPN)),
}, true
default:
return &emptyUpdater{}, false
Expand Down
17 changes: 10 additions & 7 deletions pkg/app/appserver/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
"github.com/skycoin/skycoin/src/util/logging"

"github.com/skycoin/skywire/pkg/app/appcommon"
"github.com/skycoin/skywire/pkg/app/appdisc"
"github.com/skycoin/skywire/pkg/app/appnet"
"github.com/skycoin/skywire/pkg/app/updatedisc"
)

var (
Expand All @@ -29,7 +29,7 @@ var (
// communication.
// TODO(evanlinjin): In the future, we will implement the ability to run multiple instances (procs) of a single app.
type Proc struct {
disc updatedisc.Updater // app discovery client
disc appdisc.Updater // app discovery client
conf appcommon.ProcConfig
log *logging.Logger

Expand Down Expand Up @@ -57,7 +57,7 @@ type Proc struct {
}

// NewProc constructs `Proc`.
func NewProc(mLog *logging.MasterLogger, conf appcommon.ProcConfig, disc updatedisc.Updater, m ProcManager,
func NewProc(mLog *logging.MasterLogger, conf appcommon.ProcConfig, disc appdisc.Updater, m ProcManager,
appName string) *Proc {
if mLog == nil {
mLog = logging.NewMasterLogger()
Expand Down Expand Up @@ -139,8 +139,8 @@ func (p *Proc) awaitConn() bool {
connDelta := p.rpcGW.cm.AddDeltaInformer()
go func() {
for n := range connDelta.Chan() {
if err := p.disc.ChangeValue(updatedisc.ConnCountValue, []byte(strconv.Itoa(n))); err != nil {
p.log.WithError(err).WithField("value", updatedisc.ConnCountValue).
if err := p.disc.ChangeValue(appdisc.ConnCountValue, []byte(strconv.Itoa(n))); err != nil {
p.log.WithError(err).WithField("value", appdisc.ConnCountValue).
Error("Failed to change app discovery value.")
}
}
Expand All @@ -149,8 +149,8 @@ func (p *Proc) awaitConn() bool {
lisDelta := p.rpcGW.lm.AddDeltaInformer()
go func() {
for n := range lisDelta.Chan() {
if err := p.disc.ChangeValue(updatedisc.ListenerCountValue, []byte(strconv.Itoa(n))); err != nil {
p.log.WithError(err).WithField("value", updatedisc.ListenerCountValue).
if err := p.disc.ChangeValue(appdisc.ListenerCountValue, []byte(strconv.Itoa(n))); err != nil {
p.log.WithError(err).WithField("value", appdisc.ListenerCountValue).
Error("Failed to change app discovery value.")
}
}
Expand Down Expand Up @@ -259,6 +259,9 @@ func (p *Proc) Stop() error {
}
}

// deregister discovery service
p.disc.Stop()
jdknives marked this conversation as resolved.
Show resolved Hide resolved

// the lock will be acquired as soon as the cmd finishes its work
p.waitMx.Lock()
defer func() {
Expand Down
8 changes: 4 additions & 4 deletions pkg/app/appserver/proc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/skycoin/skycoin/src/util/logging"

"github.com/skycoin/skywire/pkg/app/appcommon"
"github.com/skycoin/skywire/pkg/app/appdisc"
"github.com/skycoin/skywire/pkg/app/appevent"
"github.com/skycoin/skywire/pkg/app/updatedisc"
)

//go:generate mockery -name ProcManager -case underscore -inpkg
Expand Down Expand Up @@ -57,7 +57,7 @@ type procManager struct {
conns map[string]net.Conn
connsWG sync.WaitGroup

discF *updatedisc.Factory
discF *appdisc.Factory
procs map[string]*Proc
procsByKey map[appcommon.ProcKey]*Proc

Expand All @@ -69,12 +69,12 @@ type procManager struct {
}

// NewProcManager constructs `ProcManager`.
func NewProcManager(mLog *logging.MasterLogger, discF *updatedisc.Factory, eb *appevent.Broadcaster, addr string) (ProcManager, error) {
func NewProcManager(mLog *logging.MasterLogger, discF *appdisc.Factory, eb *appevent.Broadcaster, addr string) (ProcManager, error) {
if mLog == nil {
mLog = logging.NewMasterLogger()
}
if discF == nil {
discF = new(updatedisc.Factory)
discF = new(appdisc.Factory)
}
if eb == nil {
eb = appevent.NewBroadcaster(mLog.PackageLogger("event_broadcaster"), time.Second)
Expand Down
38 changes: 4 additions & 34 deletions pkg/servicedisc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,40 +269,10 @@ func (c *HTTPClient) DeleteEntry(ctx context.Context) (err error) {
return nil
}

// UpdateLoop repetitively calls 'POST /api/services' to update entry.
func (c *HTTPClient) UpdateLoop(ctx context.Context, updateInterval time.Duration) {
defer func() { _ = c.DeleteEntry(context.Background()) }() //nolint:errcheck

ticker := time.NewTicker(updateInterval)
defer ticker.Stop()

for {
if err := c.Update(ctx); errors.Is(err, ErrVisorUnreachable) {
return
}
c.entryMx.Lock()
j, err := json.Marshal(c.entry)
c.entryMx.Unlock()
logger := c.log.WithField("entry", string(j))
if err == nil {
logger.Debug("Entry updated.")
} else {
logger.Errorf("Service returned malformed entry, error: %s", err)
return
}

select {
case <-ctx.Done():
return
case <-ticker.C:
}
}
}

// Update calls 'POST /api/services' to update service discovery entry
// it performs exponential backoff in case of errors during update, unless
// RegisterEntry calls 'POST /api/services' to register service discovery entry
// it performs exponential backoff in case of errors during register, unless
// the error is unrecoverable from
func (c *HTTPClient) Update(ctx context.Context) error {
func (c *HTTPClient) RegisterEntry(ctx context.Context) error {
jdknives marked this conversation as resolved.
Show resolved Hide resolved
retrier := nu.NewRetrier(updateRetryDelay, 0, 2).WithErrWhitelist(ErrVisorUnreachable)
run := func() error {
err := c.UpdateEntry(ctx)
Expand All @@ -313,7 +283,7 @@ func (c *HTTPClient) Update(ctx context.Context) error {
}

if err != nil {
c.log.WithError(err).Warn("Failed to update service entry in discovery. Retrying...")
c.log.WithError(err).Warn("Failed to register service entry in discovery. Retrying...")
return err
}
return nil
Expand Down
7 changes: 3 additions & 4 deletions pkg/visor/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (

"github.com/skycoin/skywire/internal/utclient"
"github.com/skycoin/skywire/internal/vpn"
"github.com/skycoin/skywire/pkg/app/appdisc"
"github.com/skycoin/skywire/pkg/app/appevent"
"github.com/skycoin/skywire/pkg/app/appserver"
"github.com/skycoin/skywire/pkg/app/launcher"
"github.com/skycoin/skywire/pkg/app/updatedisc"
"github.com/skycoin/skywire/pkg/dmsgc"
"github.com/skycoin/skywire/pkg/routefinder/rfclient"
"github.com/skycoin/skywire/pkg/router"
Expand Down Expand Up @@ -180,7 +180,7 @@ func initAddressResolver(ctx context.Context, v *Visor, log *logging.Logger) err

func initDiscovery(ctx context.Context, v *Visor, log *logging.Logger) error {
// Prepare app discovery factory.
factory := updatedisc.Factory{
factory := appdisc.Factory{
Log: v.MasterLogger().PackageLogger("app_discovery"),
}

Expand All @@ -189,7 +189,6 @@ func initDiscovery(ctx context.Context, v *Visor, log *logging.Logger) error {
if conf.Discovery != nil {
factory.PK = v.conf.PK
factory.SK = v.conf.SK
factory.UpdateInterval = time.Duration(conf.Discovery.UpdateInterval)
factory.ServiceDisc = conf.Discovery.ServiceDisc
}
v.initLock.Lock()
Expand Down Expand Up @@ -647,7 +646,7 @@ func initPublicVisor(_ context.Context, v *Visor, log *logging.Logger) error {
visorUpdater.Start()

v.log.Infof("Sent request to register visor as public")
v.pushCloseStack("visor updater", func() error {
v.pushCloseStack("visor manager", func() error {
jdknives marked this conversation as resolved.
Show resolved Hide resolved
visorUpdater.Stop()
return nil
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/visor/visor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (
"github.com/skycoin/skycoin/src/util/logging"

"github.com/skycoin/skywire/internal/utclient"
"github.com/skycoin/skywire/pkg/app/appdisc"
"github.com/skycoin/skywire/pkg/app/appevent"
"github.com/skycoin/skywire/pkg/app/appserver"
"github.com/skycoin/skywire/pkg/app/launcher"
"github.com/skycoin/skywire/pkg/app/updatedisc"
"github.com/skycoin/skywire/pkg/restart"
"github.com/skycoin/skywire/pkg/routefinder/rfclient"
"github.com/skycoin/skywire/pkg/router"
Expand Down Expand Up @@ -66,7 +66,7 @@ type Visor struct {

procM appserver.ProcManager // proc manager
appL *launcher.Launcher // app launcher
serviceDisc updatedisc.Factory
serviceDisc appdisc.Factory
initLock *sync.Mutex
// when module is failed it pushes its error to this channel
// used by init and shutdown to show/check for any residual errors
Expand Down
Loading