diff --git a/pkg/servicedisc/autoconnect.go b/pkg/servicedisc/autoconnect.go index 847f4303f5..708067c352 100644 --- a/pkg/servicedisc/autoconnect.go +++ b/pkg/servicedisc/autoconnect.go @@ -16,7 +16,8 @@ const ( // PublicServiceDelay defines a delay before adding transports to public services. PublicServiceDelay = 10 * time.Second - fetchServicesDelay = 2 * time.Second + fetchServicesDelay = 5 * time.Second + maxFailedAddressRetryAttempt = 2 ) // ConnectFn provides a way to connect to remote service @@ -46,25 +47,43 @@ func MakeConnector(conf Config, maxConns int, tm *transport.Manager, log *loggin } // Run implements Autoconnector interface -func (a *autoconnector) Run(ctx context.Context) error { +func (a *autoconnector) Run(ctx context.Context) (err error) { + // failed addresses will be populated everytime any failed attempt at establishing transport occurs. + failedAddresses := map[cipher.PubKey]int{} + for { time.Sleep(PublicServiceDelay) - a.log.Infof("Fetching public visors") - addresses, err := a.fetchPubAddresses(ctx) + + // successfully established transports + tps := a.tm.GetTransportsByLabel(transport.LabelAutomatic) + + // don't fetch public addresses if there are more or equal to the number of maximum transport defined. + if len(tps) >= a.maxConns { + a.log.Debugln("autoconnect: maximum number of established transports reached: ", a.maxConns) + return err + } + + a.log.Infoln("Fetching public visors") + addrs, err := a.fetchPubAddresses(ctx) if err != nil { a.log.Errorf("Cannot fetch public services: %s", err) } - tps := a.tm.GetTransportsByLabel(transport.LabelAutomatic) - absent := a.filterDuplicates(addresses, tps) + // filter out any established transports + absent := a.filterDuplicates(addrs, tps) + for _, pk := range absent { - a.log.WithField("pk", pk).Infoln("Adding transport to public visor") - logger := a.log.WithField("pk", pk).WithField("type", string(network.STCPR)) - if _, err := a.tm.SaveTransport(ctx, pk, network.STCPR, transport.LabelAutomatic); err != nil { - logger.WithError(err).Warnln("Failed to add transport to public visor") - continue + val, ok := failedAddresses[pk] + if !ok || val < maxFailedAddressRetryAttempt { + a.log.WithField("pk", pk).WithField("attempt", val).Debugln("Trying to add transport to public visor") + logger := a.log.WithField("pk", pk).WithField("type", string(network.STCPR)) + if _, err := a.tm.SaveTransport(ctx, pk, network.STCPR, transport.LabelAutomatic); err != nil { + logger.WithError(err).Warnln("Failed to add transport to public visor") + failedAddresses[pk]++ + continue + } + logger.Infoln("Added transport to public visor") } - logger.Infoln("Added transport to public visor") } } } @@ -83,9 +102,9 @@ func (a *autoconnector) fetchPubAddresses(ctx context.Context) ([]cipher.PubKey, if err := retrier.Do(fetch); err != nil { return nil, err } - var pks []cipher.PubKey - for _, service := range services { - pks = append(pks, service.Addr.PubKey()) + pks := make([]cipher.PubKey, len(services)) + for i, service := range services { + pks[i] = service.Addr.PubKey() } return pks, nil } diff --git a/pkg/visor/init.go b/pkg/visor/init.go index 80b41f4e92..1a9ab622a3 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -686,7 +686,7 @@ func initPublicAutoconnect(ctx context.Context, v *Visor, log *logging.Logger) e Port: uint16(0), DiscAddr: serviceDisc, } - connector := servicedisc.MakeConnector(conf, 5, v.tpM, log) + connector := servicedisc.MakeConnector(conf, 3, v.tpM, log) go connector.Run(ctx) //nolint:errcheck return nil