Skip to content

Commit

Permalink
Merge pull request #945 from alexadhy/feature/improve-redialing-publi…
Browse files Browse the repository at this point in the history
…c-autoconnect

Feature/improve redialing public autoconnect
  • Loading branch information
jdknives authored Oct 19, 2021
2 parents ec83590 + c5c27f3 commit 85a3667
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 16 deletions.
49 changes: 34 additions & 15 deletions pkg/servicedisc/autoconnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ const (
// PublicServiceDelay defines a delay before adding transports to public services.
PublicServiceDelay = 10 * time.Second

fetchServicesDelay = 2 * time.Second
fetchServicesDelay = 5 * time.Second
maxFailedAddressRetryAttempt = 2
)

// ConnectFn provides a way to connect to remote service
Expand Down Expand Up @@ -46,25 +47,43 @@ func MakeConnector(conf Config, maxConns int, tm *transport.Manager, log *loggin
}

// Run implements Autoconnector interface
func (a *autoconnector) Run(ctx context.Context) error {
func (a *autoconnector) Run(ctx context.Context) (err error) {
// failed addresses will be populated everytime any failed attempt at establishing transport occurs.
failedAddresses := map[cipher.PubKey]int{}

for {
time.Sleep(PublicServiceDelay)
a.log.Infof("Fetching public visors")
addresses, err := a.fetchPubAddresses(ctx)

// successfully established transports
tps := a.tm.GetTransportsByLabel(transport.LabelAutomatic)

// don't fetch public addresses if there are more or equal to the number of maximum transport defined.
if len(tps) >= a.maxConns {
a.log.Debugln("autoconnect: maximum number of established transports reached: ", a.maxConns)
return err
}

a.log.Infoln("Fetching public visors")
addrs, err := a.fetchPubAddresses(ctx)
if err != nil {
a.log.Errorf("Cannot fetch public services: %s", err)
}

tps := a.tm.GetTransportsByLabel(transport.LabelAutomatic)
absent := a.filterDuplicates(addresses, tps)
// filter out any established transports
absent := a.filterDuplicates(addrs, tps)

for _, pk := range absent {
a.log.WithField("pk", pk).Infoln("Adding transport to public visor")
logger := a.log.WithField("pk", pk).WithField("type", string(network.STCPR))
if _, err := a.tm.SaveTransport(ctx, pk, network.STCPR, transport.LabelAutomatic); err != nil {
logger.WithError(err).Warnln("Failed to add transport to public visor")
continue
val, ok := failedAddresses[pk]
if !ok || val < maxFailedAddressRetryAttempt {
a.log.WithField("pk", pk).WithField("attempt", val).Debugln("Trying to add transport to public visor")
logger := a.log.WithField("pk", pk).WithField("type", string(network.STCPR))
if _, err := a.tm.SaveTransport(ctx, pk, network.STCPR, transport.LabelAutomatic); err != nil {
logger.WithError(err).Warnln("Failed to add transport to public visor")
failedAddresses[pk]++
continue
}
logger.Infoln("Added transport to public visor")
}
logger.Infoln("Added transport to public visor")
}
}
}
Expand All @@ -83,9 +102,9 @@ func (a *autoconnector) fetchPubAddresses(ctx context.Context) ([]cipher.PubKey,
if err := retrier.Do(fetch); err != nil {
return nil, err
}
var pks []cipher.PubKey
for _, service := range services {
pks = append(pks, service.Addr.PubKey())
pks := make([]cipher.PubKey, len(services))
for i, service := range services {
pks[i] = service.Addr.PubKey()
}
return pks, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/visor/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ func initPublicAutoconnect(ctx context.Context, v *Visor, log *logging.Logger) e
Port: uint16(0),
DiscAddr: serviceDisc,
}
connector := servicedisc.MakeConnector(conf, 5, v.tpM, log)
connector := servicedisc.MakeConnector(conf, 3, v.tpM, log)
go connector.Run(ctx) //nolint:errcheck

return nil
Expand Down

0 comments on commit 85a3667

Please sign in to comment.