Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: validate protobuffer for filter #833

Merged
merged 2 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 23 additions & 18 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea

stream.Close()

if err = messagePush.Validate(); err != nil {
logger.Warn("received invalid messagepush")
return
}

pubSubTopic := ""
//For now returning failure, this will get addressed with autosharding changes for filter.
if messagePush.PubsubTopic == nil {
Expand Down Expand Up @@ -178,6 +183,18 @@ func (wf *WakuFilterLightNode) notify(remotePeerID peer.ID, pubsubTopic string,

func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscribeParameters,
reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter) error {
request := &pb.FilterSubscribeRequest{
RequestId: hex.EncodeToString(params.requestID),
FilterSubscribeType: reqType,
PubsubTopic: &contentFilter.PubsubTopic,
ContentTopics: contentFilter.ContentTopicsList(),
}

err := request.Validate()
if err != nil {
return err
}

stream, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1)
if err != nil {
wf.metrics.RecordError(dialFailure)
Expand All @@ -187,13 +204,6 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr
writer := pbio.NewDelimitedWriter(stream)
reader := pbio.NewDelimitedReader(stream, math.MaxInt32)

request := &pb.FilterSubscribeRequest{
RequestId: hex.EncodeToString(params.requestID),
FilterSubscribeType: reqType,
PubsubTopic: &contentFilter.PubsubTopic,
ContentTopics: contentFilter.ContentTopicsList(),
}

wf.log.Debug("sending FilterSubscribeRequest", zap.Stringer("request", request))
err = writer.WriteMsg(request)
if err != nil {
Expand All @@ -218,6 +228,12 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr

stream.Close()

if err = filterSubscribeResponse.Validate(); err != nil {
wf.metrics.RecordError(decodeRPCFailure)
return err

}

if filterSubscribeResponse.RequestId != request.RequestId {
wf.log.Error("requestID mismatch", zap.String("expected", request.RequestId), zap.String("received", filterSubscribeResponse.RequestId))
wf.metrics.RecordError(requestIDMismatch)
Expand Down Expand Up @@ -245,17 +261,6 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
return nil, err
}

if len(contentFilter.ContentTopics) == 0 {
return nil, errors.New("at least one content topic is required")
}
if slices.Contains[string](contentFilter.ContentTopicsList(), "") {
return nil, errors.New("one or more content topics specified is empty")
}

if len(contentFilter.ContentTopics) > MaxContentTopicsPerRequest {
return nil, fmt.Errorf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest)
}

params := new(FilterSubscribeParameters)
params.log = wf.log
params.host = wf.h
Expand Down
60 changes: 60 additions & 0 deletions waku/v2/protocol/filter/pb/validation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package pb

import (
"errors"
"fmt"

"golang.org/x/exp/slices"
)

const MaxContentTopicsPerRequest = 30

var (
errMissingRequestID = errors.New("missing RequestId field")
errMissingPubsubTopic = errors.New("missing PubsubTopic field")
errNoContentTopics = errors.New("at least one contenttopic should be specified")
errMaxContentTopics = fmt.Errorf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest)
errEmptyContentTopics = errors.New("one or more content topics specified is empty")
errMissingMessage = errors.New("missing WakuMessage field")
)

func (x *FilterSubscribeRequest) Validate() error {
if x.RequestId == "" {
return errMissingRequestID
}

if x.FilterSubscribeType == FilterSubscribeRequest_SUBSCRIBE || x.FilterSubscribeType == FilterSubscribeRequest_UNSUBSCRIBE {
if x.PubsubTopic == nil || *x.PubsubTopic == "" {
return errMissingPubsubTopic
}

if len(x.ContentTopics) == 0 {
return errNoContentTopics
}

if slices.Contains[string](x.ContentTopics, "") {
return errEmptyContentTopics
}

if len(x.ContentTopics) > MaxContentTopicsPerRequest {
return errMaxContentTopics
}
}

return nil
}

func (x *FilterSubscribeResponse) Validate() error {
if x.RequestId == "" {
return errMissingRequestID
}

return nil
}

func (x *MessagePushV2) Validate() error {
if x.WakuMessage == nil {
return errMissingMessage
}
return x.WakuMessage.Validate()
}
41 changes: 41 additions & 0 deletions waku/v2/protocol/filter/pb/validation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package pb

import (
"testing"

"github.com/stretchr/testify/require"
pb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
)

func TestValidateRequest(t *testing.T) {
request := &FilterSubscribeRequest{}
require.ErrorIs(t, request.Validate(), errMissingRequestID)
request.RequestId = "test"
request.FilterSubscribeType = FilterSubscribeRequest_SUBSCRIBE
require.ErrorIs(t, request.Validate(), errMissingPubsubTopic)
pubsubTopic := "test"
request.PubsubTopic = &pubsubTopic
require.ErrorIs(t, request.Validate(), errNoContentTopics)
request.ContentTopics = []string{""}
require.ErrorIs(t, request.Validate(), errEmptyContentTopics)
request.ContentTopics[0] = "test"
require.NoError(t, request.Validate())
}

func TestValidateResponse(t *testing.T) {
response := FilterSubscribeResponse{}
require.ErrorIs(t, response.Validate(), errMissingRequestID)
response.RequestId = "test"
require.NoError(t, response.Validate())

}

func TestValidateMessagePush(t *testing.T) {
msgPush := &MessagePushV2{}
require.ErrorIs(t, msgPush.Validate(), errMissingMessage)
msgPush.WakuMessage = &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "test",
}
require.NoError(t, msgPush.Validate())
}
51 changes: 13 additions & 38 deletions waku/v2/protocol/filter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package filter
import (
"context"
"errors"
"fmt"
"math"
"net/http"
"time"
Expand Down Expand Up @@ -104,15 +103,19 @@ func (wf *WakuFilterFullNode) onRequest(ctx context.Context) func(network.Stream

start := time.Now()

switch subscribeRequest.FilterSubscribeType {
case pb.FilterSubscribeRequest_SUBSCRIBE:
wf.subscribe(ctx, stream, subscribeRequest)
case pb.FilterSubscribeRequest_SUBSCRIBER_PING:
wf.ping(ctx, stream, subscribeRequest)
case pb.FilterSubscribeRequest_UNSUBSCRIBE:
wf.unsubscribe(ctx, stream, subscribeRequest)
case pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL:
wf.unsubscribeAll(ctx, stream, subscribeRequest)
if err := subscribeRequest.Validate(); err != nil {
wf.reply(ctx, stream, subscribeRequest, http.StatusBadRequest, err.Error())
} else {
switch subscribeRequest.FilterSubscribeType {
case pb.FilterSubscribeRequest_SUBSCRIBE:
wf.subscribe(ctx, stream, subscribeRequest)
case pb.FilterSubscribeRequest_SUBSCRIBER_PING:
wf.ping(ctx, stream, subscribeRequest)
case pb.FilterSubscribeRequest_UNSUBSCRIBE:
wf.unsubscribe(ctx, stream, subscribeRequest)
case pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL:
wf.unsubscribeAll(ctx, stream, subscribeRequest)
}
}

stream.Close()
Expand Down Expand Up @@ -157,20 +160,6 @@ func (wf *WakuFilterFullNode) ping(ctx context.Context, stream network.Stream, r
}

func (wf *WakuFilterFullNode) subscribe(ctx context.Context, stream network.Stream, request *pb.FilterSubscribeRequest) {
if request.PubsubTopic == nil {
wf.reply(ctx, stream, request, http.StatusBadRequest, "pubsubtopic can't be empty")
return
}

if len(request.ContentTopics) == 0 {
wf.reply(ctx, stream, request, http.StatusBadRequest, "at least one contenttopic should be specified")
return
}

if len(request.ContentTopics) > MaxContentTopicsPerRequest {
wf.reply(ctx, stream, request, http.StatusBadRequest, fmt.Sprintf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest))
}

if wf.subscriptions.Count() >= wf.maxSubscriptions {
wf.reply(ctx, stream, request, http.StatusServiceUnavailable, "node has reached maximum number of subscriptions")
return
Expand All @@ -197,20 +186,6 @@ func (wf *WakuFilterFullNode) subscribe(ctx context.Context, stream network.Stre
}

func (wf *WakuFilterFullNode) unsubscribe(ctx context.Context, stream network.Stream, request *pb.FilterSubscribeRequest) {
if request.PubsubTopic == nil {
wf.reply(ctx, stream, request, http.StatusBadRequest, "pubsubtopic can't be empty")
return
}

if len(request.ContentTopics) == 0 {
wf.reply(ctx, stream, request, http.StatusBadRequest, "at least one contenttopic should be specified")
return
}

if len(request.ContentTopics) > MaxContentTopicsPerRequest {
wf.reply(ctx, stream, request, http.StatusBadRequest, fmt.Sprintf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest))
}

err := wf.subscriptions.Delete(stream.Conn().RemotePeer(), *request.PubsubTopic, request.ContentTopics)
if err != nil {
wf.reply(ctx, stream, request, http.StatusNotFound, peerHasNoSubscription)
Expand Down