Skip to content

Commit

Permalink
Merge branch 'master' into feat/relay-msg-timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem authored Dec 1, 2023
2 parents 2df3282 + 6bd85a1 commit 40c0bb2
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 31 deletions.
2 changes: 1 addition & 1 deletion waku/metrics/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewMetricsServer(address string, port int, log *zap.Logger) *Server {

// Start executes the HTTP server in the background.
func (p *Server) Start() {
p.log.Info("server stopped ", zap.Error(p.server.ListenAndServe()))
p.log.Info("server started ", zap.Error(p.server.ListenAndServe()))
}

// Stop shuts down the prometheus server
Expand Down
41 changes: 29 additions & 12 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ import (
const FilterPushID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter-push/2.0.0-beta1")

var (
ErrNoPeersAvailable = errors.New("no suitable remote peers")
ErrNoPeersAvailable = errors.New("no suitable remote peers")
ErrSubscriptionNotFound = errors.New("subscription not found")
)

type WakuFilterLightNode struct {
Expand Down Expand Up @@ -110,19 +111,21 @@ func (wf *WakuFilterLightNode) start() error {
func (wf *WakuFilterLightNode) Stop() {
wf.CommonService.Stop(func() {
wf.h.RemoveStreamHandler(FilterPushID_v20beta1)
res, err := wf.unsubscribeAll(wf.Context())
if err != nil {
wf.log.Warn("unsubscribing from full nodes", zap.Error(err))
}

for _, r := range res.Errors() {
if r.Err != nil {
wf.log.Warn("unsubscribing from full nodes", zap.Error(r.Err), logging.HostID("peerID", r.PeerID))
if wf.subscriptions.Count() > 0 {
res, err := wf.unsubscribeAll(wf.Context())
if err != nil {
wf.log.Warn("unsubscribing from full nodes", zap.Error(err))
}

for _, r := range res.Errors() {
if r.Err != nil {
wf.log.Warn("unsubscribing from full nodes", zap.Error(r.Err), logging.HostID("peerID", r.PeerID))
}

}
//
wf.subscriptions.Clear()
}
//
wf.subscriptions.Clear()
})
}

Expand Down Expand Up @@ -485,6 +488,13 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr

peers := make(map[peer.ID]struct{})
subs := wf.subscriptions.GetSubscription(params.selectedPeer, cFilter)
if len(subs) == 0 {
result.Add(WakuFilterPushError{
Err: ErrSubscriptionNotFound,
PeerID: params.selectedPeer,
})
continue
}
for _, sub := range subs {
sub.Remove(cTopics...)
peers[sub.PeerID] = struct{}{}
Expand Down Expand Up @@ -583,14 +593,21 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte
if err != nil {
return nil, err
}
result := &WakuFilterPushResult{}

peers := make(map[peer.ID]struct{})
subs := wf.subscriptions.GetSubscription(params.selectedPeer, protocol.ContentFilter{})
if len(subs) == 0 && params.selectedPeer != "" {
result.Add(WakuFilterPushError{
Err: err,
PeerID: params.selectedPeer,
})
return result, ErrSubscriptionNotFound
}
for _, sub := range subs {
sub.Close()
peers[sub.PeerID] = struct{}{}
}
result := &WakuFilterPushResult{}
if params.wg != nil {
params.wg.Add(len(peers))
}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (s *FilterTestSuite) waitForMessages(fn func(), subs []*subscription.Subscr
contentTopic: env.Message().GetContentTopic(),
payload: string(env.Message().GetPayload()),
}
s.log.Info("received message ", zap.String("pubSubTopic", received.pubSubTopic), zap.String("contentTopic", received.contentTopic), zap.String("payload", received.payload))
s.log.Debug("received message ", zap.String("pubSubTopic", received.pubSubTopic), zap.String("contentTopic", received.contentTopic), zap.String("payload", received.payload))
if matchOneOfManyMsg(received, expected) {
found++
}
Expand Down
4 changes: 4 additions & 0 deletions waku/v2/protocol/relay/waku_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts .
return nil, err
}

if len(out) > pubsub.DefaultMaxMessageSize {
return nil, errors.New("message size exceeds gossipsub max message size")
}

err = pubSubTopic.Publish(ctx, out)
if err != nil {
return nil, err
Expand Down
18 changes: 2 additions & 16 deletions waku/v2/protocol/store/waku_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,23 +452,9 @@ func (store *WakuStore) Next(ctx context.Context, r *Result) (*Result, error) {

historyRequest := &pb.HistoryRPC{
RequestId: hex.EncodeToString(protocol.GenerateRequestID()),
Query: &pb.HistoryQuery{
PubsubTopic: r.Query().PubsubTopic,
ContentFilters: r.Query().ContentFilters,
StartTime: r.Query().StartTime,
EndTime: r.Query().EndTime,
PagingInfo: &pb.PagingInfo{
PageSize: r.Query().PagingInfo.PageSize,
Direction: r.Query().PagingInfo.Direction,
Cursor: &pb.Index{
Digest: r.Cursor().Digest,
ReceiverTime: r.Cursor().ReceiverTime,
SenderTime: r.Cursor().SenderTime,
PubsubTopic: r.Cursor().PubsubTopic,
},
},
},
Query: r.Query(),
}
historyRequest.Query.PagingInfo.Cursor = r.Cursor()

response, err := store.queryFrom(ctx, historyRequest, r.PeerID())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/store/waku_store_protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) {

require.NoError(t, err)
require.Len(t, response.Messages, 1)
require.Equal(t, msg, response.Messages[0])
require.True(t, proto.Equal(msg, response.Messages[0]))
}

func TestWakuStoreProtocolLocalQuery(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions waku/v2/protocol/subscription/subscriptions_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ func NewSubscriptionMap(logger *zap.Logger) *SubscriptionsMap {
}
}

func (m *SubscriptionsMap) Count() int {
m.RLock()
defer m.RUnlock()
return len(m.items)
}

func (m *SubscriptionsMap) IsListening(pubsubTopic, contentTopic string) bool {
m.RLock()
defer m.RUnlock()
Expand Down

0 comments on commit 40c0bb2

Please sign in to comment.