Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use protobuffer for API storenode queries #1248

Merged
merged 2 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions waku/v2/api/common/storenode_requestor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package common

import (
"context"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
)

type StorenodeRequestor interface {
Query(ctx context.Context, peerID peer.ID, query *pb.StoreQueryRequest) (StoreRequestResult, error)
}
24 changes: 18 additions & 6 deletions waku/v2/api/history/cycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,14 +393,10 @@ func (m *StorenodeCycle) SetStorenodeConfigProvider(provider StorenodeConfigProv
m.storenodeConfigProvider = provider
}

func (m *StorenodeCycle) WaitForAvailableStoreNode(ctx context.Context, timeout time.Duration) bool {
// Add 1 second to timeout, because the storenode cycle has 1 second ticker, which doesn't tick on start.
func (m *StorenodeCycle) WaitForAvailableStoreNode(ctx context.Context) bool {
// Note: Add 1 second to timeout, because the storenode cycle has 1 second ticker, which doesn't tick on start.
// This can be improved after merging https://github.com/status-im/status-go/pull/4380.
// NOTE: https://stackoverflow.com/questions/32705582/how-to-get-time-tick-to-tick-immediately
timeout += time.Second

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

wg := sync.WaitGroup{}
wg.Add(1)
Expand All @@ -410,14 +406,30 @@ func (m *StorenodeCycle) WaitForAvailableStoreNode(ctx context.Context, timeout
select {
case <-m.StorenodeAvailableOneshotEmitter.Subscribe():
case <-ctx.Done():
if errors.Is(ctx.Err(), context.Canceled) {
return
}

// Wait for an additional second, but handle cancellation
select {
case <-time.After(1 * time.Second):
case <-ctx.Done(): // context was cancelled
}

return

}
}
}()

select {
case <-waitForWaitGroup(&wg):
case <-ctx.Done():
// Wait for an additional second, but handle cancellation
select {
case <-time.After(1 * time.Second):
case <-ctx.Done(): // context was cancelled o
}
}

return m.IsStorenodeAvailable(m.activeStorenode)
Expand Down
32 changes: 19 additions & 13 deletions waku/v2/api/history/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package history

import (
"context"
"encoding/hex"
"errors"
"math"
"sync"
Expand All @@ -10,8 +11,12 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/api/common"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"google.golang.org/protobuf/proto"

"go.uber.org/zap"
)

Expand All @@ -25,7 +30,7 @@ type work struct {
}

type HistoryRetriever struct {
store Store
store common.StorenodeRequestor
logger *zap.Logger
historyProcessor HistoryProcessor
}
Expand All @@ -35,11 +40,7 @@ type HistoryProcessor interface {
OnRequestFailed(requestID []byte, peerID peer.ID, err error)
}

type Store interface {
Query(ctx context.Context, criteria store.FilterCriteria, opts ...store.RequestOption) (store.Result, error)
}

func NewHistoryRetriever(store Store, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever {
func NewHistoryRetriever(store common.StorenodeRequestor, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever {
return &HistoryRetriever{
store: store,
logger: logger.Named("history-retriever"),
Expand Down Expand Up @@ -257,12 +258,6 @@ func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID pee
requestID := protocol.GenerateRequestID()
logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerID))

opts := []store.RequestOption{
store.WithPaging(false, limit),
store.WithRequestID(requestID),
store.WithPeer(peerID),
store.WithCursor(cursor)}

logger.Debug("store.query",
logging.Timep("startTime", criteria.TimeStart),
logging.Timep("endTime", criteria.TimeEnd),
Expand All @@ -271,8 +266,19 @@ func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID pee
zap.String("cursor", hexutil.Encode(cursor)),
)

storeQueryRequest := &pb.StoreQueryRequest{
RequestId: hex.EncodeToString(requestID),
IncludeData: true,
PubsubTopic: &criteria.PubsubTopic,
ContentTopics: criteria.ContentTopicsList(),
TimeStart: criteria.TimeStart,
TimeEnd: criteria.TimeEnd,
PaginationCursor: cursor,
PaginationLimit: proto.Uint64(limit),
}

queryStart := time.Now()
result, err := hr.store.Query(ctx, criteria, opts...)
result, err := hr.store.Query(ctx, peerID, storeQueryRequest)
queryDuration := time.Since(queryStart)
if err != nil {
logger.Error("error querying storenode", zap.Error(err))
Expand Down
13 changes: 5 additions & 8 deletions waku/v2/api/history/history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/api/common"
"github.com/waku-org/go-waku/waku/v2/protocol"
proto_pb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
Expand Down Expand Up @@ -91,22 +92,18 @@ func getInitialResponseKey(contentTopics []string) string {
return hex.EncodeToString(append([]byte("start"), []byte(contentTopics[0])...))
}

func (t *mockStore) Query(ctx context.Context, criteria store.FilterCriteria, opts ...store.RequestOption) (store.Result, error) {
params := store.Parameters{}
for _, opt := range opts {
_ = opt(&params)
}
func (t *mockStore) Query(ctx context.Context, peerID peer.ID, storeQueryRequest *pb.StoreQueryRequest) (common.StoreRequestResult, error) {
result := &mockResult{}
if params.Cursor() == nil {
initialResponse := getInitialResponseKey(criteria.ContentTopicsList())
if len(storeQueryRequest.GetPaginationCursor()) == 0 {
initialResponse := getInitialResponseKey(storeQueryRequest.GetContentTopics())
response := t.queryResponses[initialResponse]
if response.err != nil {
return nil, response.err
}
result.cursor = response.cursor
result.messages = response.messages
} else {
response := t.queryResponses[hex.EncodeToString(params.Cursor())]
response := t.queryResponses[hex.EncodeToString(storeQueryRequest.GetPaginationCursor())]
if response.err != nil {
return nil, response.err
}
Expand Down
12 changes: 4 additions & 8 deletions waku/v2/api/missing/default_requestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (

"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/api/common"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
)

func NewDefaultStorenodeRequestor(store *store.WakuStore) StorenodeRequestor {
func NewDefaultStorenodeRequestor(store *store.WakuStore) common.StorenodeRequestor {
return &defaultStorenodeRequestor{
store: store,
}
Expand All @@ -24,10 +24,6 @@ func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerI
return d.store.QueryByHash(ctx, messageHashes, store.WithPeer(peerID), store.WithPaging(false, pageSize))
}

func (d *defaultStorenodeRequestor) QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.StoreRequestResult, error) {
return d.store.Query(ctx, store.FilterCriteria{
ContentFilter: protocol.NewContentFilter(pubsubTopic, contentTopics...),
TimeStart: from,
TimeEnd: to,
}, store.WithPeer(peerID), store.WithPaging(false, pageSize), store.IncludeData(false))
func (d *defaultStorenodeRequestor) Query(ctx context.Context, peerID peer.ID, storeQueryRequest *storepb.StoreQueryRequest) (common.StoreRequestResult, error) {
return d.store.RequestRaw(ctx, peerID, storeQueryRequest)
}
42 changes: 28 additions & 14 deletions waku/v2/api/missing/missing_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/api/common"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
Expand All @@ -31,17 +32,12 @@ type MessageTracker interface {
MessageExists(pb.MessageHash) (bool, error)
}

type StorenodeRequestor interface {
GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error)
QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.StoreRequestResult, error)
}

// MissingMessageVerifier is used to periodically retrieve missing messages from store nodes that have some specific criteria
type MissingMessageVerifier struct {
ctx context.Context
params missingMessageVerifierParams

storenodeRequestor StorenodeRequestor
storenodeRequestor common.StorenodeRequestor
messageTracker MessageTracker

criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages
Expand All @@ -54,7 +50,7 @@ type MissingMessageVerifier struct {
}

// NewMissingMessageVerifier creates an instance of a MissingMessageVerifier
func NewMissingMessageVerifier(storenodeRequester StorenodeRequestor, messageTracker MessageTracker, timesource timesource.Timesource, logger *zap.Logger, options ...MissingMessageVerifierOption) *MissingMessageVerifier {
func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, messageTracker MessageTracker, timesource timesource.Timesource, logger *zap.Logger, options ...MissingMessageVerifierOption) *MissingMessageVerifier {
options = append(defaultMissingMessagesVerifierOptions, options...)
params := missingMessageVerifierParams{}
for _, opt := range options {
Expand Down Expand Up @@ -219,14 +215,19 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
)

result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) {
return m.storenodeRequestor.QueryWithCriteria(
storeQueryRequest := &storepb.StoreQueryRequest{
RequestId: hex.EncodeToString(protocol.GenerateRequestID()),
PubsubTopic: &interest.contentFilter.PubsubTopic,
ContentTopics: contentTopics[batchFrom:batchTo],
TimeStart: proto.Int64(interest.lastChecked.Add(-m.params.delay).UnixNano()),
TimeEnd: proto.Int64(now.Add(-m.params.delay).UnixNano()),
PaginationLimit: proto.Uint64(messageFetchPageSize),
}

return m.storenodeRequestor.Query(
ctx,
interest.peerID,
messageFetchPageSize,
interest.contentFilter.PubsubTopic,
contentTopics[batchFrom:batchTo],
proto.Int64(interest.lastChecked.Add(-m.params.delay).UnixNano()),
proto.Int64(now.Add(-m.params.delay).UnixNano()),
storeQueryRequest,
)
}, logger, "retrieving history to check for missing messages")
if err != nil {
Expand Down Expand Up @@ -295,7 +296,20 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) {
queryCtx, cancel := context.WithTimeout(ctx, m.params.storeQueryTimeout)
defer cancel()
return m.storenodeRequestor.GetMessagesByHash(queryCtx, interest.peerID, maxMsgHashesPerRequest, messageHashes)

var messageHashesBytes [][]byte
for _, m := range messageHashes {
messageHashesBytes = append(messageHashesBytes, m.Bytes())
}

storeQueryRequest := &storepb.StoreQueryRequest{
RequestId: hex.EncodeToString(protocol.GenerateRequestID()),
IncludeData: true,
MessageHashes: messageHashesBytes,
PaginationLimit: proto.Uint64(maxMsgHashesPerRequest),
}

return m.storenodeRequestor.Query(queryCtx, interest.peerID, storeQueryRequest)
}, logger, "retrieving missing messages")
if err != nil {
if !errors.Is(err, context.Canceled) {
Expand Down
29 changes: 29 additions & 0 deletions waku/v2/protocol/store/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,35 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ
return result, nil
}

func (s *WakuStore) RequestRaw(ctx context.Context, peerID peer.ID, storeRequest *pb.StoreQueryRequest) (Result, error) {
err := storeRequest.Validate()
if err != nil {
return nil, err
}

var params Parameters
params.selectedPeer = peerID
if params.selectedPeer == "" {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am guessing peer-selection would be added later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I want to get the store queries to work first as they do now, with nwaku

return nil, ErrMustSelectPeer
}

response, err := s.queryFrom(ctx, storeRequest, &params)
if err != nil {
return nil, err
}

result := &resultImpl{
store: s,
messages: response.Messages,
storeRequest: storeRequest,
storeResponse: response,
peerID: params.selectedPeer,
cursor: response.PaginationCursor,
}

return result, nil
}

// Query retrieves all the messages that match a criteria. Use the options to indicate whether to return the message themselves or not.
func (s *WakuStore) Query(ctx context.Context, criteria FilterCriteria, opts ...RequestOption) (Result, error) {
return s.Request(ctx, criteria, opts...)
Expand Down
Loading