Skip to content

Commit

Permalink
Merge pull request #1238 from ersonp/fix/dmsg-tracker
Browse files Browse the repository at this point in the history
Fix dmsg tracker
  • Loading branch information
mrpalide authored Jun 7, 2022
2 parents 33d1ee2 + 5846e6d commit 6418d81
Show file tree
Hide file tree
Showing 18 changed files with 254 additions and 339 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/pkg/profile v1.5.0
github.com/shirou/gopsutil/v3 v3.21.4
github.com/sirupsen/logrus v1.8.1
github.com/skycoin/dmsg v0.0.0-20220401080257-ba4de44b7666
github.com/skycoin/dmsg v0.0.0-20220607114207-d4a85dc351ce
github.com/skycoin/skycoin v0.27.1
github.com/skycoin/yamux v0.0.0-20200803175205-571ceb89da9f
github.com/songgao/water v0.0.0-20200317203138-2b4b6d7c09d8
Expand Down Expand Up @@ -48,7 +48,7 @@ require (
github.com/go-chi/chi/v5 v5.0.8-0.20220103230436-7dbe9a0bd10f
github.com/ivanpirog/coloredcobra v1.0.0
github.com/james-barrow/golang-ipc v0.0.0-20210227130457-95e7cc81f5e2
github.com/skycoin/skywire-utilities v0.0.0-20220331141811-c29ff9ab891e
github.com/skycoin/skywire-utilities v0.0.0-20220511053113-3d492e0048c4
github.com/skycoin/systray v1.10.1-0.20220509091536-c90eecafd3fd
)

Expand Down
9 changes: 5 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -476,14 +476,15 @@ github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMB
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/skycoin/dmsg v0.0.0-20220401080257-ba4de44b7666 h1:v7xCo4I6WsaztanDG11QdtWB0jx/r7n/yO39nKNKnk4=
github.com/skycoin/dmsg v0.0.0-20220401080257-ba4de44b7666/go.mod h1:VPXJkm5DtSpK3cmbg9M9lXZaUjPeyOpJgxURp8HIMkU=
github.com/skycoin/dmsg v0.0.0-20220607114207-d4a85dc351ce h1:a0VlsBW5Ac7D3LflMhv52TWRcC1nbkn3ZRCNEE3hqUE=
github.com/skycoin/dmsg v0.0.0-20220607114207-d4a85dc351ce/go.mod h1:naR6dKKUhc9iDxSUEEw95KO8SEQ1ppJoaHC3osQ5m9k=
github.com/skycoin/noise v0.0.0-20180327030543-2492fe189ae6 h1:1Nc5EBY6pjfw1kwW0duwyG+7WliWz5u9kgk1h5MnLuA=
github.com/skycoin/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:UXghlricA7J3aRD/k7p/zBObQfmBawwCxIVPVjz2Q3o=
github.com/skycoin/skycoin v0.27.1 h1:HatxsRwVSPaV4qxH6290xPBmkH/HgiuAoY2qC+e8C9I=
github.com/skycoin/skycoin v0.27.1/go.mod h1:78nHjQzd8KG0jJJVL/j0xMmrihXi70ti63fh8vXScJw=
github.com/skycoin/skywire-utilities v0.0.0-20220331141811-c29ff9ab891e h1:2LOmMQqYEk2XX4H8A/MXQmYIIl7opIVYmY3k76D1+Oc=
github.com/skycoin/skywire-utilities v0.0.0-20220331141811-c29ff9ab891e/go.mod h1:9fOsut+rqCBd33NPF3cuolXrw8uQESYshSvdYMaZNEo=
github.com/skycoin/skywire-utilities v0.0.0-20220412111240-e8921109e514/go.mod h1:9fOsut+rqCBd33NPF3cuolXrw8uQESYshSvdYMaZNEo=
github.com/skycoin/skywire-utilities v0.0.0-20220511053113-3d492e0048c4 h1:1wnQREqhD+eOek0isH3m1RTKGT+o2aDZtIoiMk4kROw=
github.com/skycoin/skywire-utilities v0.0.0-20220511053113-3d492e0048c4/go.mod h1:9fOsut+rqCBd33NPF3cuolXrw8uQESYshSvdYMaZNEo=
github.com/skycoin/systray v1.10.1-0.20220509091536-c90eecafd3fd h1:z2aytJ9vsBpsBrC6nfHuOTS+AUoEbaKs257+AiigUjk=
github.com/skycoin/systray v1.10.1-0.20220509091536-c90eecafd3fd/go.mod h1:LvRiIPZbFo8Qqp7Q9C9LXK02CqoVmc9EUGihU6kTRWc=
github.com/skycoin/yamux v0.0.0-20200803175205-571ceb89da9f h1:A5dEM1OE9YhN3LciZU9qPjo7fJ46JeHNi3JCroDkK0Y=
Expand Down
7 changes: 2 additions & 5 deletions pkg/visor/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,10 @@ func (v *Visor) Summary() (*Summary, error) {
}

dmsgStatValue := &dmsgtracker.DmsgClientSummary{}
v.initLock.Lock()
if v.trackersReady {
ctx := context.TODO()
dmsgTracker, _ := v.trackers.MustGet(ctx, v.conf.PK) //nolint
if v.isDTMReady() {
dmsgTracker, _ := v.dtm.Get(v.conf.PK) //nolint
dmsgStatValue = &dmsgTracker
}
v.initLock.Unlock()

summary := &Summary{
Overview: overview,
Expand Down
132 changes: 77 additions & 55 deletions pkg/visor/dmsgtracker/dmsg_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ package dmsgtracker
import (
"context"
"io"
"runtime"
"sort"
"sync"
"time"

"github.com/sirupsen/logrus"
"github.com/skycoin/dmsg/pkg/dmsg"
"github.com/skycoin/dmsg/pkg/dmsgctrl"
"github.com/skycoin/skycoin/src/util/logging"
Expand Down Expand Up @@ -83,9 +81,9 @@ type Manager struct {
updateInterval time.Duration
updateTimeout time.Duration

log logrus.FieldLogger
log *logging.Logger
dc *dmsg.Client
dm map[cipher.PubKey]*DmsgTracker
dts map[cipher.PubKey]*DmsgTracker
mx sync.Mutex

done chan struct{}
Expand All @@ -94,7 +92,7 @@ type Manager struct {

// NewDmsgTrackerManager creates a new dmsg tracker manager.
func NewDmsgTrackerManager(mLog *logging.MasterLogger, dc *dmsg.Client, updateInterval, updateTimeout time.Duration) *Manager {
log := mLog.PackageLogger("dmsg_trackers")
log := mLog.PackageLogger("dmsg_tracker_manager")
if updateInterval == 0 {
updateInterval = DefaultDTMUpdateInterval
}
Expand All @@ -107,7 +105,7 @@ func NewDmsgTrackerManager(mLog *logging.MasterLogger, dc *dmsg.Client, updateIn
updateTimeout: updateTimeout,
log: log,
dc: dc,
dm: make(map[cipher.PubKey]*DmsgTracker),
dts: make(map[cipher.PubKey]*DmsgTracker),
done: make(chan struct{}),
}

Expand All @@ -134,33 +132,34 @@ func (dtm *Manager) serve() {
case <-dtm.done:
return
case <-t.C:
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(dtm.updateTimeout))
dtm.mx.Lock()
dtm.updateAllTrackers(ctx, dtm.dm)
dtm.mx.Unlock()
cancel()
dtm.updateAllTrackers(ctx)
}
}
}

func (dtm *Manager) updateAllTrackers(ctx context.Context, dts map[cipher.PubKey]*DmsgTracker) {
log := dtm.log.WithField("func", funcName())
func (dtm *Manager) updateAllTrackers(ctx context.Context) {
dtm.mx.Lock()
defer dtm.mx.Unlock()

cancelCtx, cancel := context.WithDeadline(ctx, time.Now().Add(dtm.updateTimeout))
defer cancel()

log := dtm.log.WithField("func", "dtm.updateAllTrackers")

type errReport struct {
pk cipher.PubKey
err error
}

dtsLen := len(dts)
dtsLen := len(dtm.dts)
errCh := make(chan errReport, dtsLen)
defer close(errCh)

for _, te := range dts {
te := te

for _, dt := range dtm.dts {
dt := dt
go func() {
err := te.Update(ctx)
errCh <- errReport{pk: te.sum.PK, err: err}
err := dt.Update(cancelCtx)
errCh <- errReport{pk: dt.sum.PK, err: err}
}()
}

Expand All @@ -169,32 +168,28 @@ func (dtm *Manager) updateAllTrackers(ctx context.Context, dts map[cipher.PubKey
log.WithError(r.err).
WithField("client_pk", r.pk).
Warn("Removing dmsg client tracker.")
delete(dts, r.pk)
delete(dtm.dts, r.pk)
}
}
}

// MustGet obtains a DmsgClientSummary of the client of given pk.
// If one is not found internally, a new tracker stream is to be established, returning error on failure.
func (dtm *Manager) MustGet(ctx context.Context, pk cipher.PubKey) (DmsgClientSummary, error) {
// ShouldGet obtains a DmsgClientSummary of the client of given pk.
// If one are not found internally, a new goroutine of tracker stream is to be established.
func (dtm *Manager) ShouldGet(ctx context.Context, pk cipher.PubKey) (DmsgClientSummary, error) {
dtm.mx.Lock()
defer dtm.mx.Unlock()

if isDone(dtm.done) {
return DmsgClientSummary{}, io.ErrClosedPipe
}

if e, ok := dtm.dm[pk]; ok && !isDone(e.ctrl.Done()) {
if e, ok := dtm.dts[pk]; ok && !isDone(e.ctrl.Done()) {
return e.sum, nil
}

dt, err := dtm.mustEstablishTracker(pk)
if err != nil {
return DmsgClientSummary{}, err
}
go dtm.establishTracker(ctx, pk)

dtm.dm[pk] = dt
return dt.sum, nil
return DmsgClientSummary{}, nil
}

// Get obtains a DmsgClientSummary of the client with given public key.
Expand All @@ -210,30 +205,63 @@ func (dtm *Manager) Get(pk cipher.PubKey) (DmsgClientSummary, bool) {
}

// mustEstablishTracker creates / re-creates tracker when dmsgTrackerMap entry got deleted, and reconnected.
func (dtm *Manager) mustEstablishTracker(pk cipher.PubKey) (*DmsgTracker, error) {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(dtm.updateTimeout))
// It is ment to be used as a goroutine and saves the new DmsgTracker to dtm.dts.
func (dtm *Manager) establishTracker(ctx context.Context, pk cipher.PubKey) {
log := dtm.log.WithField("func", "dtm.establishTracker")

type errReport struct {
pk cipher.PubKey
err error
}

errCh := make(chan errReport)
defer close(errCh)
doneCh := make(chan struct{})

dCtx, cancel := context.WithDeadline(ctx, time.Now().Add(dtm.updateTimeout))
defer cancel()
return newDmsgTracker(ctx, dtm.dc, pk)
go func() {
dt, err := newDmsgTracker(dCtx, dtm.dc, pk)
if err != nil {
select {
case <-ctx.Done():
return
default:
errCh <- errReport{pk: pk, err: err}
}
}
dtm.mx.Lock()
if dt != nil {
dtm.dts[pk] = dt
}
dtm.mx.Unlock()
close(doneCh)
}()

select {
case r := <-errCh:
if r.err != nil {
log.WithError(r.err).WithField("client_pk", r.pk).Warn("Failed to re-create dmsgtracker client.")
}
case <-ctx.Done():
log.WithError(ctx.Err()).WithField("client_pk", pk).Warn("Failed to re-create dmsgtracker client.")
case <-doneCh:
log.WithField("client_pk", pk).Debug("Dmsgtracker client Established.")
}
}

// GetBulk obtains bulk dmsg client summaries.
func (dtm *Manager) GetBulk(pks []cipher.PubKey) []DmsgClientSummary {
dtm.mx.Lock()
defer dtm.mx.Unlock()

var err error
out := make([]DmsgClientSummary, 0, len(pks))
// If one are not found internally, a new goroutine of tracker stream is to be established.
func (dtm *Manager) GetBulk(ctx context.Context, pks []cipher.PubKey) []DmsgClientSummary {
out := make([]DmsgClientSummary, 0)

for _, pk := range pks {
dt, ok := dtm.dm[pk]
ds, ok := dtm.Get(pk)
if !ok {
dt, err = dtm.mustEstablishTracker(pk)
if err != nil {
dtm.log.WithError(err).Infoln("failed to re-create dmsgtracker client")
continue
}
// we establish tracker if there is none
go dtm.establishTracker(ctx, pk)
}
out = append(out, dt.sum)
out = append(out, ds)
}

sort.Slice(out, func(i, j int) bool {
Expand All @@ -246,17 +274,16 @@ func (dtm *Manager) GetBulk(pks []cipher.PubKey) []DmsgClientSummary {
}

func (dtm *Manager) get(pk cipher.PubKey) (DmsgClientSummary, bool) {
dt, ok := dtm.dm[pk]
dt, ok := dtm.dts[pk]
if !ok {
return DmsgClientSummary{}, false
}

return dt.sum, true
}

// Close implements io.Closer
func (dtm *Manager) Close() error {
log := dtm.log.WithField("func", funcName())
log := dtm.log.WithField("func", "dtm.Close")

dtm.mx.Lock()
defer dtm.mx.Unlock()
Expand All @@ -267,7 +294,7 @@ func (dtm *Manager) Close() error {
closed = true
close(dtm.done)

for pk, dt := range dtm.dm {
for pk, dt := range dtm.dts {
if err := dt.ctrl.Close(); err != nil {
log.WithError(err).
WithField("client_pk", pk).
Expand All @@ -291,8 +318,3 @@ func isDone(done <-chan struct{}) bool {
return false
}
}

func funcName() string {
pc, _, _, _ := runtime.Caller(1)
return runtime.FuncForPC(pc).Name()
}
Loading

0 comments on commit 6418d81

Please sign in to comment.