From 6c1a7731592fab03a32b734276b1e2ec91b7c282 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Fri, 17 Nov 2023 09:11:01 -0400 Subject: [PATCH 1/4] fix(waku2): use a cancelable context for initial bootnode discovery --- wakuv2/waku.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 7510a286977..d5688bf6513 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -205,6 +205,8 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s logger.Info("starting wakuv2 with config", zap.Any("config", cfg)) + ctx, cancel := context.WithCancel(context.Background()) + waku := &Waku{ appDB: appDB, cfg: cfg, @@ -216,6 +218,8 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s sendQueue: make(chan *protocol.Envelope, 1000), connStatusChan: make(chan node.ConnStatus, 100), connStatusSubscriptions: make(map[string]*types.ConnStatusSubscription), + ctx: ctx, + cancel: cancel, wg: sync.WaitGroup{}, dnsAddressCache: make(map[string][]dnsdisc.DiscoveredNode), dnsAddressCacheLock: &sync.RWMutex{}, @@ -284,7 +288,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s } if cfg.EnableDiscV5 { - bootnodes, err := waku.getDiscV5BootstrapNodes(context.Background(), cfg.DiscV5BootstrapNodes) + bootnodes, err := waku.getDiscV5BootstrapNodes(waku.ctx, cfg.DiscV5BootstrapNodes) if err != nil { logger.Error("failed to get bootstrap nodes", zap.Error(err)) return nil, err @@ -1018,10 +1022,10 @@ func (w *Waku) broadcast() { logger := w.logger.With(zap.String("envelopeHash", hexutil.Encode(envelope.Hash())), zap.String("pubsubTopic", pubsubTopic), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().Timestamp)) if w.settings.LightClient { logger.Info("publishing message via lightpush") - _, err = w.node.Lightpush().Publish(context.Background(), envelope.Message(), lightpush.WithPubSubTopic(pubsubTopic)) + _, err = w.node.Lightpush().Publish(w.ctx, envelope.Message(), lightpush.WithPubSubTopic(pubsubTopic)) } else { logger.Info("publishing message via relay") - _, err = w.node.Relay().Publish(context.Background(), envelope.Message(), relay.WithPubSubTopic(pubsubTopic)) + _, err = w.node.Relay().Publish(w.ctx, envelope.Message(), relay.WithPubSubTopic(pubsubTopic)) } if err != nil { @@ -1145,11 +1149,7 @@ func (w *Waku) Start() error { w.connectionChanged = make(chan struct{}) - ctx, cancel := context.WithCancel(context.Background()) - w.ctx = ctx - w.cancel = cancel - - if err = w.node.Start(ctx); err != nil { + if err = w.node.Start(w.ctx); err != nil { return fmt.Errorf("failed to start go-waku node: %v", err) } @@ -1162,12 +1162,12 @@ func (w *Waku) Start() error { w.identifyService = idService - if err = w.addWakuV2Peers(ctx, w.cfg); err != nil { + if err = w.addWakuV2Peers(w.ctx, w.cfg); err != nil { return fmt.Errorf("failed to add wakuv2 peers: %v", err) } if w.cfg.EnableDiscV5 { - err := w.node.DiscV5().Start(ctx) + err := w.node.DiscV5().Start(w.ctx) if err != nil { return err } @@ -1209,7 +1209,7 @@ func (w *Waku) Start() error { w.logger.Info("Restarting DiscV5: online and is not connected") isConnected = true if w.node.DiscV5().ErrOnNotRunning() != nil { - err := w.node.DiscV5().Start(ctx) + err := w.node.DiscV5().Start(w.ctx) if err != nil { w.logger.Error("Could not start DiscV5", zap.Error(err)) } From 7a792c2092becd29e836e537645219db119f0244 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Fri, 17 Nov 2023 09:23:57 -0400 Subject: [PATCH 2/4] fix: start idService and add wg.Done to fnApply --- wakuv2/waku.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/wakuv2/waku.go b/wakuv2/waku.go index d5688bf6513..6b538a79515 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -426,6 +426,7 @@ func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnA func (w *Waku) addWakuV2Peers(ctx context.Context, cfg *Config) error { fnApply := func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) { + defer wg.Done() if len(d.PeerInfo.Addrs) != 0 { go w.identifyAndConnect(ctx, w.settings.LightClient, d.PeerInfo) } @@ -1159,6 +1160,7 @@ func (w *Waku) Start() error { if err != nil { return err } + idService.Start() w.identifyService = idService From 76f55bceb2431b90467c11b41c76bf7fa327b456 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Fri, 17 Nov 2023 09:35:42 -0400 Subject: [PATCH 3/4] fix: memory aliasing in loop --- protocol/transport/filters_manager.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/protocol/transport/filters_manager.go b/protocol/transport/filters_manager.go index d371a4a95ae..3255b62748a 100644 --- a/protocol/transport/filters_manager.go +++ b/protocol/transport/filters_manager.go @@ -168,7 +168,8 @@ func (f *FiltersManager) InitCommunityFilters(communityFiltersToInitialize []Com // TODO: requests to join / cancels are currently being sent into the default waku topic. // They must be sent into an specific non protected shard for _, pubsubTopic := range topics { - identityStr := PublicKeyToStr(&cf.PrivKey.PublicKey) + pk := &cf.PrivKey.PublicKey + identityStr := PublicKeyToStr(pk) rawFilter, err := f.addAsymmetric(identityStr, pubsubTopic, cf.PrivKey, true) if err != nil { f.logger.Debug("could not register community filter", zap.Error(err)) From 4f1c4e48bd95abb269dd1e1070e397417b05196f Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Fri, 17 Nov 2023 10:31:43 -0400 Subject: [PATCH 4/4] fix: reset context on stop --- wakuv2/waku.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 6b538a79515..0f754183989 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -1142,6 +1142,9 @@ func (w *Waku) Query(ctx context.Context, peerID peer.ID, pubsubTopic string, to // Start implements node.Service, starting the background data propagation thread // of the Waku protocol. func (w *Waku) Start() error { + if w.ctx == nil { + w.ctx, w.cancel = context.WithCancel(context.Background()) + } var err error if w.node, err = node.New(w.settings.Options...); err != nil { @@ -1303,6 +1306,10 @@ func (w *Waku) Stop() error { close(w.connectionChanged) w.wg.Wait() + + w.ctx = nil + w.cancel = nil + return nil }