diff --git a/eth-node/bridge/geth/waku.go b/eth-node/bridge/geth/waku.go index 6a8cefb34d3..5cbca7a0040 100644 --- a/eth-node/bridge/geth/waku.go +++ b/eth-node/bridge/geth/waku.go @@ -291,7 +291,7 @@ func (w *GethWakuWrapper) MarkP2PMessageAsProcessed(hash common.Hash) { w.waku.MarkP2PMessageAsProcessed(hash) } -func (w *GethWakuWrapper) RequestStoreMessages(ctx context.Context, peerID []byte, r types.MessagesRequest, processEnvelopes bool) (*types.StoreRequestCursor, int, error) { +func (w *GethWakuWrapper) RequestStoreMessages(ctx context.Context, peerID []byte, r types.MessagesRequest, processEnvelopes bool) (types.StoreRequestCursor, int, error) { return nil, 0, errors.New("not implemented") } diff --git a/eth-node/bridge/geth/wakuv2.go b/eth-node/bridge/geth/wakuv2.go index 4bdc8390a3d..06239ae1d18 100644 --- a/eth-node/bridge/geth/wakuv2.go +++ b/eth-node/bridge/geth/wakuv2.go @@ -9,8 +9,8 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "google.golang.org/protobuf/proto" - "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" - storepb "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/ethereum/go-ethereum/common" "github.com/status-im/status-go/connection" @@ -177,35 +177,32 @@ func (w *gethWakuV2Wrapper) SendMessagesRequest(peerID []byte, r types.MessagesR return errors.New("DEPRECATED") } -func (w *gethWakuV2Wrapper) RequestStoreMessages(ctx context.Context, peerID []byte, r types.MessagesRequest, processEnvelopes bool) (*types.StoreRequestCursor, int, error) { - var options []legacy_store.HistoryRequestOption +func (w *gethWakuV2Wrapper) RequestStoreMessages(ctx context.Context, peerID []byte, r types.MessagesRequest, processEnvelopes bool) (types.StoreRequestCursor, int, error) { + var options []store.RequestOption peer, err := peer.Decode(string(peerID)) if err != nil { return nil, 0, err } - options = []legacy_store.HistoryRequestOption{ - legacy_store.WithPaging(false, uint64(r.Limit)), + options = []store.RequestOption{ + store.WithPaging(false, uint64(r.Limit)), } - var cursor *storepb.Index + var cursor []byte if r.StoreCursor != nil { - cursor = &storepb.Index{ - Digest: r.StoreCursor.Digest, - ReceiverTime: r.StoreCursor.ReceiverTime, - SenderTime: r.StoreCursor.SenderTime, - PubsubTopic: r.StoreCursor.PubsubTopic, - } + cursor = r.StoreCursor } - query := legacy_store.Query{ - StartTime: proto.Int64(int64(r.From) * int64(time.Second)), - EndTime: proto.Int64(int64(r.To) * int64(time.Second)), - PubsubTopic: w.waku.GetPubsubTopic(r.PubsubTopic), - } + contentTopics := []string{} for _, topic := range r.ContentTopics { - query.ContentTopics = append(query.ContentTopics, wakucommon.BytesToTopic(topic).ContentTopic()) + contentTopics = append(contentTopics, wakucommon.BytesToTopic(topic).ContentTopic()) + } + + query := store.FilterCriteria{ + TimeStart: proto.Int64(int64(r.From) * int64(time.Second)), + TimeEnd: proto.Int64(int64(r.To) * int64(time.Second)), + ContentFilter: protocol.NewContentFilter(w.waku.GetPubsubTopic(r.PubsubTopic), contentTopics...), } pbCursor, envelopesCount, err := w.waku.Query(ctx, peer, query, cursor, options, processEnvelopes) @@ -214,12 +211,7 @@ func (w *gethWakuV2Wrapper) RequestStoreMessages(ctx context.Context, peerID []b } if pbCursor != nil { - return &types.StoreRequestCursor{ - Digest: pbCursor.Digest, - ReceiverTime: pbCursor.ReceiverTime, - SenderTime: pbCursor.SenderTime, - PubsubTopic: pbCursor.PubsubTopic, - }, envelopesCount, nil + return pbCursor, envelopesCount, nil } return nil, envelopesCount, nil diff --git a/eth-node/types/mailserver.go b/eth-node/types/mailserver.go index ee9ee1dca80..2ae7877ceac 100644 --- a/eth-node/types/mailserver.go +++ b/eth-node/types/mailserver.go @@ -25,7 +25,7 @@ type MessagesRequest struct { // Cursor is used as starting point for paginated requests. Cursor []byte `json:"cursor"` // StoreCursor is used as starting point for WAKUV2 paginatedRequests - StoreCursor *StoreRequestCursor `json:"storeCursor"` + StoreCursor StoreRequestCursor `json:"storeCursor"` // Bloom is a filter to match requested messages. Bloom []byte `json:"bloom"` // PubsubTopic is the gossipsub topic on which the message was broadcasted @@ -35,12 +35,7 @@ type MessagesRequest struct { ContentTopics [][]byte `json:"contentTopics"` } -type StoreRequestCursor struct { - Digest []byte `json:"digest"` - ReceiverTime int64 `json:"receiverTime"` - SenderTime int64 `json:"senderTime"` - PubsubTopic string `json:"pubsubTopic"` -} +type StoreRequestCursor []byte // SetDefaults sets the From and To defaults func (r *MessagesRequest) SetDefaults(now time.Time) { diff --git a/eth-node/types/waku.go b/eth-node/types/waku.go index e3615c6455d..991a4343ed0 100644 --- a/eth-node/types/waku.go +++ b/eth-node/types/waku.go @@ -15,9 +15,8 @@ import ( ) type ConnStatus struct { - IsOnline bool `json:"isOnline"` - HasHistory bool `json:"hasHistory"` - Peers map[string]WakuV2Peer `json:"peers"` + IsOnline bool `json:"isOnline"` + Peers map[string]WakuV2Peer `json:"peers"` } type WakuV2Peer struct { @@ -176,7 +175,7 @@ type Waku interface { SendMessagesRequest(peerID []byte, request MessagesRequest) error // RequestStoreMessages uses the WAKU2-STORE protocol to request historic messages - RequestStoreMessages(ctx context.Context, peerID []byte, request MessagesRequest, processEnvelopes bool) (*StoreRequestCursor, int, error) + RequestStoreMessages(ctx context.Context, peerID []byte, request MessagesRequest, processEnvelopes bool) (StoreRequestCursor, int, error) // ProcessingP2PMessages indicates whether there are in-flight p2p messages ProcessingP2PMessages() bool diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index 4290790b1e5..256cc082a69 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -688,7 +688,7 @@ type work struct { pubsubTopic string contentTopics []types.TopicType cursor []byte - storeCursor *types.StoreRequestCursor + storeCursor types.StoreRequestCursor limit uint32 } @@ -698,13 +698,13 @@ type messageRequester interface { peerID []byte, from, to uint32, previousCursor []byte, - previousStoreCursor *types.StoreRequestCursor, + previousStoreCursor types.StoreRequestCursor, pubsubTopic string, contentTopics []types.TopicType, limit uint32, waitForResponse bool, processEnvelopes bool, - ) (cursor []byte, storeCursor *types.StoreRequestCursor, envelopesCount int, err error) + ) (cursor []byte, storeCursor types.StoreRequestCursor, envelopesCount int, err error) } func processMailserverBatch( diff --git a/protocol/messenger_mailserver_processMailserverBatch_test.go b/protocol/messenger_mailserver_processMailserverBatch_test.go index c78ac6757ab..c881d81b5c8 100644 --- a/protocol/messenger_mailserver_processMailserverBatch_test.go +++ b/protocol/messenger_mailserver_processMailserverBatch_test.go @@ -40,13 +40,13 @@ func (t *mockTransport) SendMessagesRequestForTopics( peerID []byte, from, to uint32, previousCursor []byte, - previousStoreCursor *types.StoreRequestCursor, + previousStoreCursor types.StoreRequestCursor, pubsubTopic string, contentTopics []types.TopicType, limit uint32, waitForResponse bool, processEnvelopes bool, -) (cursor []byte, storeCursor *types.StoreRequestCursor, envelopesCount int, err error) { +) (cursor []byte, storeCursor types.StoreRequestCursor, envelopesCount int, err error) { var response queryResponse if previousCursor == nil { initialResponse := getInitialResponseKey(contentTopics) diff --git a/protocol/messenger_storenode_comunity_test.go b/protocol/messenger_storenode_comunity_test.go index ad0e2f92132..81e7ac1e8a1 100644 --- a/protocol/messenger_storenode_comunity_test.go +++ b/protocol/messenger_storenode_comunity_test.go @@ -30,6 +30,7 @@ import ( ) func TestMessengerStoreNodeCommunitySuite(t *testing.T) { + t.Skip("requires storev3 node") suite.Run(t, new(MessengerStoreNodeCommunitySuite)) } @@ -283,7 +284,8 @@ func (s *MessengerStoreNodeCommunitySuite) TestSetCommunityStorenodesAndFetch() } func (s *MessengerStoreNodeCommunitySuite) TestSetStorenodeForCommunity_fetchMessagesFromNewStorenode() { - s.T().Skip("flaky test") + s.T().Skip("flaky") + err := s.owner.DialPeer(s.storeNodeAddress) s.Require().NoError(err) err = s.bob.DialPeer(s.storeNodeAddress) diff --git a/protocol/messenger_storenode_request_test.go b/protocol/messenger_storenode_request_test.go index d28ef9e48e7..1e706c66940 100644 --- a/protocol/messenger_storenode_request_test.go +++ b/protocol/messenger_storenode_request_test.go @@ -44,6 +44,7 @@ const ( ) func TestMessengerStoreNodeRequestSuite(t *testing.T) { + t.Skip("requires storev3 node") suite.Run(t, new(MessengerStoreNodeRequestSuite)) } @@ -382,7 +383,7 @@ func (s *MessengerStoreNodeRequestSuite) ensureStoreNodeEnvelopes(contentTopic * PubsubTopic: "", ContentTopics: []string{contentTopic.ContentTopic()}, } - result, err := s.wakuStoreNode.StoreNode().Query(context.Background(), query, queryOptions...) + result, err := s.wakuStoreNode.LegacyStoreNode().Query(context.Background(), query, queryOptions...) s.Require().NoError(err) s.Require().GreaterOrEqual(len(result.Messages), minimumCount) s.logger.Debug("store node query result", zap.Int("messagesCount", len(result.Messages))) diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index 37c860f7695..c099a94d3ad 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -500,18 +500,18 @@ func (t *Transport) createMessagesRequestV2( ctx context.Context, peerID []byte, from, to uint32, - previousStoreCursor *types.StoreRequestCursor, + previousStoreCursor types.StoreRequestCursor, pubsubTopic string, contentTopics []types.TopicType, limit uint32, waitForResponse bool, processEnvelopes bool, -) (storeCursor *types.StoreRequestCursor, envelopesCount int, err error) { +) (storeCursor types.StoreRequestCursor, envelopesCount int, err error) { r := createMessagesRequest(from, to, nil, previousStoreCursor, pubsubTopic, contentTopics, limit) if waitForResponse { resultCh := make(chan struct { - storeCursor *types.StoreRequestCursor + storeCursor types.StoreRequestCursor envelopesCount int err error }) @@ -519,7 +519,7 @@ func (t *Transport) createMessagesRequestV2( go func() { storeCursor, envelopesCount, err = t.waku.RequestStoreMessages(ctx, peerID, r, processEnvelopes) resultCh <- struct { - storeCursor *types.StoreRequestCursor + storeCursor types.StoreRequestCursor envelopesCount int err error }{storeCursor, envelopesCount, err} @@ -548,13 +548,13 @@ func (t *Transport) SendMessagesRequestForTopics( peerID []byte, from, to uint32, previousCursor []byte, - previousStoreCursor *types.StoreRequestCursor, + previousStoreCursor types.StoreRequestCursor, pubsubTopic string, contentTopics []types.TopicType, limit uint32, waitForResponse bool, processEnvelopes bool, -) (cursor []byte, storeCursor *types.StoreRequestCursor, envelopesCount int, err error) { +) (cursor []byte, storeCursor types.StoreRequestCursor, envelopesCount int, err error) { switch t.waku.Version() { case 2: storeCursor, envelopesCount, err = t.createMessagesRequestV2(ctx, peerID, from, to, previousStoreCursor, pubsubTopic, contentTopics, limit, waitForResponse, processEnvelopes) @@ -566,7 +566,7 @@ func (t *Transport) SendMessagesRequestForTopics( return } -func createMessagesRequest(from, to uint32, cursor []byte, storeCursor *types.StoreRequestCursor, pubsubTopic string, topics []types.TopicType, limit uint32) types.MessagesRequest { +func createMessagesRequest(from, to uint32, cursor []byte, storeCursor types.StoreRequestCursor, pubsubTopic string, topics []types.TopicType, limit uint32) types.MessagesRequest { aUUID := uuid.New() // uuid is 16 bytes, converted to hex it's 32 bytes as expected by types.MessagesRequest id := []byte(hex.EncodeToString(aUUID[:])) diff --git a/wakuv2/waku.go b/wakuv2/waku.go index efc0cc9ae2e..27742fe416a 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -62,7 +62,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" - storepb "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange" "github.com/waku-org/go-waku/waku/v2/protocol/relay" @@ -529,7 +528,7 @@ func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) { w.bandwidthCounter.Reset() } - storeStats := w.bandwidthCounter.GetBandwidthForProtocol(legacy_store.StoreID_v20beta4) + storeStats := w.bandwidthCounter.GetBandwidthForProtocol(store.StoreQueryID_v300) relayStats := w.bandwidthCounter.GetBandwidthForProtocol(relay.WakuRelayID_v200) go telemetry.PushProtocolStats(relayStats, storeStats) } @@ -1263,26 +1262,26 @@ func (w *Waku) messageHashBasedQuery(ctx context.Context, hashes []gethcommon.Ha return append(ackHashes, missedHashes...) } -func (w *Waku) Query(ctx context.Context, peerID peer.ID, query legacy_store.Query, cursor *storepb.Index, opts []legacy_store.HistoryRequestOption, processEnvelopes bool) (*storepb.Index, int, error) { +func (w *Waku) Query(ctx context.Context, peerID peer.ID, query store.FilterCriteria, cursor []byte, opts []store.RequestOption, processEnvelopes bool) ([]byte, int, error) { requestID := protocol.GenerateRequestID() opts = append(opts, - legacy_store.WithRequestID(requestID), - legacy_store.WithPeer(peerID), - legacy_store.WithCursor(cursor)) + store.WithRequestID(requestID), + store.WithPeer(peerID), + store.WithCursor(cursor)) logger := w.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerID)) logger.Debug("store.query", - logutils.WakuMessageTimestamp("startTime", query.StartTime), - logutils.WakuMessageTimestamp("endTime", query.EndTime), - zap.Strings("contentTopics", query.ContentTopics), + logutils.WakuMessageTimestamp("startTime", query.TimeStart), + logutils.WakuMessageTimestamp("endTime", query.TimeEnd), + zap.Strings("contentTopics", query.ContentTopics.ToList()), zap.String("pubsubTopic", query.PubsubTopic), - zap.Stringer("cursor", cursor), + zap.String("cursor", hexutil.Encode(cursor)), ) queryStart := time.Now() - result, err := w.node.LegacyStore().Query(ctx, query, opts...) + result, err := w.node.Store().Query(ctx, query, opts...) queryDuration := time.Since(queryStart) if err != nil { logger.Error("error querying storenode", zap.Error(err)) @@ -1293,15 +1292,15 @@ func (w *Waku) Query(ctx context.Context, peerID peer.ID, query legacy_store.Que return nil, 0, err } - logger.Debug("store.query response", - zap.Duration("queryDuration", queryDuration), - zap.Int("numMessages", len(result.Messages)), - zap.Stringer("cursor", result.Cursor())) + messages := result.Messages() + envelopesCount := len(messages) + w.logger.Debug("store.query response", zap.Duration("queryDuration", queryDuration), zap.Int("numMessages", envelopesCount), zap.Bool("hasCursor", result.IsComplete() && result.Cursor() != nil)) + for _, mkv := range messages { + msg := mkv.Message - for _, msg := range result.Messages { // Temporarily setting RateLimitProof to nil so it matches the WakuMessage protobuffer we are sending // See https://github.com/vacp2p/rfc/issues/563 - msg.RateLimitProof = nil + mkv.Message.RateLimitProof = nil envelope := protocol.NewEnvelope(msg, msg.GetTimestamp(), query.PubsubTopic) logger.Info("received waku2 store message", @@ -1316,7 +1315,7 @@ func (w *Waku) Query(ctx context.Context, peerID peer.ID, query legacy_store.Que } } - return result.Cursor(), len(result.Messages), nil + return result.Cursor(), envelopesCount, nil } // Start implements node.Service, starting the background data propagation thread @@ -1929,7 +1928,7 @@ func (w *Waku) AddStorePeer(address string) (peer.ID, error) { return "", err } - peerID, err := w.node.AddPeer(addr, wps.Static, w.cfg.DefaultShardedPubsubTopics, legacy_store.StoreID_v20beta4) + peerID, err := w.node.AddPeer(addr, wps.Static, w.cfg.DefaultShardedPubsubTopics, store.StoreQueryID_v300) if err != nil { return "", err } @@ -2057,6 +2056,10 @@ func FormatPeerStats(wakuNode *node.WakuNode) map[string]types.WakuV2Peer { return p } -func (w *Waku) StoreNode() legacy_store.Store { +func (w *Waku) StoreNode() *store.WakuStore { + return w.node.Store() +} + +func (w *Waku) LegacyStoreNode() legacy_store.Store { return w.node.LegacyStore() } diff --git a/wakuv2/waku_test.go b/wakuv2/waku_test.go index 4e6651458d2..84bdb5d2fb3 100644 --- a/wakuv2/waku_test.go +++ b/wakuv2/waku_test.go @@ -26,9 +26,10 @@ import ( "github.com/waku-org/go-waku/waku/v2/dnsdisc" wps "github.com/waku-org/go-waku/waku/v2/peerstore" + "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" - "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/status-im/status-go/appdatabase" "github.com/status-im/status-go/connection" @@ -232,14 +233,13 @@ func TestBasicWakuV2(t *testing.T) { _, envelopeCount, err := w.Query( context.Background(), storeNode.PeerID, - legacy_store.Query{ - PubsubTopic: config.DefaultShardPubsubTopic, - ContentTopics: []string{contentTopic.ContentTopic()}, - StartTime: proto.Int64((timestampInSeconds - int64(marginInSeconds)) * int64(time.Second)), - EndTime: proto.Int64((timestampInSeconds + int64(marginInSeconds)) * int64(time.Second)), + store.FilterCriteria{ + ContentFilter: protocol.NewContentFilter(config.DefaultShardPubsubTopic, contentTopic.ContentTopic()), + TimeStart: proto.Int64((timestampInSeconds - int64(marginInSeconds)) * int64(time.Second)), + TimeEnd: proto.Int64((timestampInSeconds + int64(marginInSeconds)) * int64(time.Second)), }, nil, - []legacy_store.HistoryRequestOption{}, + nil, false, ) if err != nil || envelopeCount == 0 { @@ -538,14 +538,13 @@ func TestWakuV2Store(t *testing.T) { _, envelopeCount, err := w1.Query( context.Background(), w2.node.Host().ID(), - legacy_store.Query{ - PubsubTopic: config1.DefaultShardPubsubTopic, - ContentTopics: []string{contentTopic.ContentTopic()}, - StartTime: proto.Int64((timestampInSeconds - int64(marginInSeconds)) * int64(time.Second)), - EndTime: proto.Int64((timestampInSeconds + int64(marginInSeconds)) * int64(time.Second)), + store.FilterCriteria{ + TimeStart: proto.Int64((timestampInSeconds - int64(marginInSeconds)) * int64(time.Second)), + TimeEnd: proto.Int64((timestampInSeconds + int64(marginInSeconds)) * int64(time.Second)), + ContentFilter: protocol.NewContentFilter(config1.DefaultShardPubsubTopic, contentTopic.ContentTopic()), }, nil, - []legacy_store.HistoryRequestOption{}, + nil, false, ) require.NoError(t, err)