Skip to content

Commit

Permalink
feat(nwaku)_: message publisher and sent verifier (#5966)
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos authored Oct 23, 2024
1 parent 8198ac1 commit d6079c7
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 96 deletions.
34 changes: 20 additions & 14 deletions timesource/timesource.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package timesource

import (
"bytes"
"context"
"errors"
"sort"
"sync"
Expand Down Expand Up @@ -144,8 +145,8 @@ type NTPTimeSource struct {
timeQuery ntpQuery // for ease of testing
now func() time.Time

quit chan struct{}
started bool
cancel context.CancelFunc

mu sync.RWMutex
latestOffset time.Duration
Expand Down Expand Up @@ -175,7 +176,7 @@ func (s *NTPTimeSource) updateOffset() error {

// runPeriodically runs periodically the given function based on NTPTimeSource
// synchronization limits (fastNTPSyncPeriod / slowNTPSyncPeriod)
func (s *NTPTimeSource) runPeriodically(fn func() error, starWithSlowSyncPeriod bool) {
func (s *NTPTimeSource) runPeriodically(ctx context.Context, fn func() error, starWithSlowSyncPeriod bool) {
if s.started {
return
}
Expand All @@ -184,7 +185,7 @@ func (s *NTPTimeSource) runPeriodically(fn func() error, starWithSlowSyncPeriod
if starWithSlowSyncPeriod {
period = s.slowNTPSyncPeriod
}
s.quit = make(chan struct{})

go func() {
defer common.LogOnPanic()
for {
Expand All @@ -196,19 +197,21 @@ func (s *NTPTimeSource) runPeriodically(fn func() error, starWithSlowSyncPeriod
period = s.fastNTPSyncPeriod
}

case <-s.quit:
case <-ctx.Done():
return
}
}
}()
}

// Start initializes the local offset and starts a goroutine that periodically updates the local offset.
func (s *NTPTimeSource) Start() {
func (s *NTPTimeSource) Start(ctx context.Context) error {
if s.started {
return
return nil
}

ctx, cancel := context.WithCancel(ctx)

// Attempt to update the offset synchronously so that user can have reliable messages right away
err := s.updateOffset()
if err != nil {
Expand All @@ -217,23 +220,26 @@ func (s *NTPTimeSource) Start() {
log.Error("failed to update offset", err)
}

s.runPeriodically(s.updateOffset, err == nil)
s.runPeriodically(ctx, s.updateOffset, err == nil)

s.started = true
s.cancel = cancel

return nil
}

// Stop goroutine that updates time source.
func (s *NTPTimeSource) Stop() error {
if s.quit == nil {
return nil
func (s *NTPTimeSource) Stop() {
if s.cancel == nil {
return
}
close(s.quit)

s.cancel()
s.started = false
return nil
}

func (s *NTPTimeSource) GetCurrentTime() time.Time {
s.Start()
s.Start(context.Background())
return s.Now()
}

Expand All @@ -243,7 +249,7 @@ func (s *NTPTimeSource) GetCurrentTimeInMillis() uint64 {

func GetCurrentTime() time.Time {
ts := Default()
ts.Start()
ts.Start(context.Background())
return ts.Now()
}

Expand Down
6 changes: 2 additions & 4 deletions timesource/timesource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,14 +277,12 @@ func TestGetCurrentTimeInMillis(t *testing.T) {
// test repeat invoke GetCurrentTimeInMillis
n = ts.GetCurrentTimeInMillis()
require.Equal(t, expectedTime, n)
e := ts.Stop()
require.NoError(t, e)
ts.Stop()

// test invoke after stop
n = ts.GetCurrentTimeInMillis()
require.Equal(t, expectedTime, n)
e = ts.Stop()
require.NoError(t, e)
ts.Stop()
}

func TestGetCurrentTimeOffline(t *testing.T) {
Expand Down
16 changes: 0 additions & 16 deletions wakuv2/gowaku.go
Original file line number Diff line number Diff line change
Expand Up @@ -1951,22 +1951,6 @@ func (w *Waku) LegacyStoreNode() legacy_store.Store {
return w.node.LegacyStore()
}

func (w *Waku) WakuLightpushPublish(message *pb.WakuMessage, pubsubTopic string) (string, error) {
msgHash, err := w.node.Lightpush().Publish(w.ctx, message, lightpush.WithPubSubTopic(pubsubTopic))
if err != nil {
return "", err
}
return msgHash.String(), nil
}

func (w *Waku) WakuRelayPublish(message *pb.WakuMessage, pubsubTopic string) (string, error) {
msgHash, err := w.node.Relay().Publish(w.ctx, message, relay.WithPubSubTopic(pubsubTopic))
if err != nil {
return "", err
}
return msgHash.String(), nil
}

func (w *Waku) ListPeersInMesh(pubsubTopic string) (int, error) {
listPeers := w.node.Relay().PubSub().ListPeers(pubsubTopic)
return len(listPeers), nil
Expand Down
4 changes: 2 additions & 2 deletions wakuv2/message_publishing.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package wakuv2

/* TODO-nwaku
import (
"errors"

Expand Down Expand Up @@ -93,13 +92,15 @@ func (w *Waku) publishEnvelope(envelope *protocol.Envelope) {
err = w.messageSender.Send(publish.NewRequest(w.ctx, envelope))
}

/* TODO-nwaku
if w.statusTelemetryClient != nil {
if err == nil {
w.statusTelemetryClient.PushSentEnvelope(w.ctx, SentEnvelope{Envelope: envelope, PublishMethod: w.messageSender.PublishMethod()})
} else {
w.statusTelemetryClient.PushErrorSendingEnvelope(w.ctx, ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: envelope, PublishMethod: w.messageSender.PublishMethod()}})
}
}
*/

if err != nil {
logger.Error("could not send message", zap.Error(err))
Expand All @@ -117,4 +118,3 @@ func (w *Waku) publishEnvelope(envelope *protocol.Envelope) {
})
}
}
*/
Loading

0 comments on commit d6079c7

Please sign in to comment.