From 0835cce43e74d885811158d1827ee666cfe8a98f Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Tue, 24 Oct 2023 17:43:21 -0400 Subject: [PATCH] refactor: validate protobuffer for filter --- waku/v2/protocol/filter/client.go | 41 +++++++------ waku/v2/protocol/filter/pb/validation.go | 60 +++++++++++++++++++ waku/v2/protocol/filter/pb/validation_test.go | 41 +++++++++++++ waku/v2/protocol/filter/server.go | 51 ++++------------ 4 files changed, 137 insertions(+), 56 deletions(-) create mode 100644 waku/v2/protocol/filter/pb/validation.go create mode 100644 waku/v2/protocol/filter/pb/validation_test.go diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 6c43a7e48..bca7ec36f 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -134,6 +134,11 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea stream.Close() + if err = messagePush.Validate(); err != nil { + logger.Warn("received invalid messagepush") + return + } + pubSubTopic := "" //For now returning failure, this will get addressed with autosharding changes for filter. if messagePush.PubsubTopic == nil { @@ -178,6 +183,18 @@ func (wf *WakuFilterLightNode) notify(remotePeerID peer.ID, pubsubTopic string, func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscribeParameters, reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter) error { + request := &pb.FilterSubscribeRequest{ + RequestId: hex.EncodeToString(params.requestID), + FilterSubscribeType: reqType, + PubsubTopic: &contentFilter.PubsubTopic, + ContentTopics: contentFilter.ContentTopicsList(), + } + + err := request.Validate() + if err != nil { + return err + } + stream, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1) if err != nil { wf.metrics.RecordError(dialFailure) @@ -187,13 +204,6 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr writer := pbio.NewDelimitedWriter(stream) reader := pbio.NewDelimitedReader(stream, math.MaxInt32) - request := &pb.FilterSubscribeRequest{ - RequestId: hex.EncodeToString(params.requestID), - FilterSubscribeType: reqType, - PubsubTopic: &contentFilter.PubsubTopic, - ContentTopics: contentFilter.ContentTopicsList(), - } - wf.log.Debug("sending FilterSubscribeRequest", zap.Stringer("request", request)) err = writer.WriteMsg(request) if err != nil { @@ -218,6 +228,12 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr stream.Close() + if err = filterSubscribeResponse.Validate(); err != nil { + wf.metrics.RecordError(decodeRPCFailure) + return err + + } + if filterSubscribeResponse.RequestId != request.RequestId { wf.log.Error("requestID mismatch", zap.String("expected", request.RequestId), zap.String("received", filterSubscribeResponse.RequestId)) wf.metrics.RecordError(requestIDMismatch) @@ -245,17 +261,6 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot return nil, err } - if len(contentFilter.ContentTopics) == 0 { - return nil, errors.New("at least one content topic is required") - } - if slices.Contains[string](contentFilter.ContentTopicsList(), "") { - return nil, errors.New("one or more content topics specified is empty") - } - - if len(contentFilter.ContentTopics) > MaxContentTopicsPerRequest { - return nil, fmt.Errorf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest) - } - params := new(FilterSubscribeParameters) params.log = wf.log params.host = wf.h diff --git a/waku/v2/protocol/filter/pb/validation.go b/waku/v2/protocol/filter/pb/validation.go new file mode 100644 index 000000000..725143cc1 --- /dev/null +++ b/waku/v2/protocol/filter/pb/validation.go @@ -0,0 +1,60 @@ +package pb + +import ( + "errors" + "fmt" + + "golang.org/x/exp/slices" +) + +const MaxContentTopicsPerRequest = 30 + +var ( + errMissingRequestID = errors.New("missing RequestId field") + errMissingPubsubTopic = errors.New("missing PubsubTopic field") + errNoContentTopics = errors.New("at least one contenttopic should be specified") + errMaxContentTopics = fmt.Errorf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest) + errEmptyContentTopics = errors.New("one or more content topics specified is empty") + errMissingMessage = errors.New("missing WakuMessage field") +) + +func (x *FilterSubscribeRequest) Validate() error { + if x.RequestId == "" { + return errMissingRequestID + } + + if x.FilterSubscribeType == FilterSubscribeRequest_SUBSCRIBE || x.FilterSubscribeType == FilterSubscribeRequest_UNSUBSCRIBE { + if x.PubsubTopic == nil || *x.PubsubTopic == "" { + return errMissingPubsubTopic + } + + if len(x.ContentTopics) == 0 { + return errNoContentTopics + } + + if slices.Contains[string](x.ContentTopics, "") { + return errEmptyContentTopics + } + + if len(x.ContentTopics) > MaxContentTopicsPerRequest { + return errMaxContentTopics + } + } + + return nil +} + +func (x *FilterSubscribeResponse) Validate() error { + if x.RequestId == "" { + return errMissingRequestID + } + + return nil +} + +func (x *MessagePushV2) Validate() error { + if x.WakuMessage == nil { + return errMissingMessage + } + return x.WakuMessage.Validate() +} diff --git a/waku/v2/protocol/filter/pb/validation_test.go b/waku/v2/protocol/filter/pb/validation_test.go new file mode 100644 index 000000000..df1301331 --- /dev/null +++ b/waku/v2/protocol/filter/pb/validation_test.go @@ -0,0 +1,41 @@ +package pb + +import ( + "testing" + + "github.com/stretchr/testify/require" + pb "github.com/waku-org/go-waku/waku/v2/protocol/pb" +) + +func TestValidateRequest(t *testing.T) { + request := &FilterSubscribeRequest{} + require.ErrorIs(t, request.Validate(), errMissingRequestID) + request.RequestId = "test" + request.FilterSubscribeType = FilterSubscribeRequest_SUBSCRIBE + require.ErrorIs(t, request.Validate(), errMissingPubsubTopic) + pubsubTopic := "test" + request.PubsubTopic = &pubsubTopic + require.ErrorIs(t, request.Validate(), errNoContentTopics) + request.ContentTopics = []string{""} + require.ErrorIs(t, request.Validate(), errEmptyContentTopics) + request.ContentTopics[0] = "test" + require.NoError(t, request.Validate()) +} + +func TestValidateResponse(t *testing.T) { + response := FilterSubscribeResponse{} + require.ErrorIs(t, response.Validate(), errMissingRequestID) + response.RequestId = "test" + require.NoError(t, response.Validate()) + +} + +func TestValidateMessagePush(t *testing.T) { + msgPush := &MessagePushV2{} + require.ErrorIs(t, msgPush.Validate(), errMissingMessage) + msgPush.WakuMessage = &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: "test", + } + require.NoError(t, msgPush.Validate()) +} diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index 324bb3d11..bed0503d0 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -3,7 +3,6 @@ package filter import ( "context" "errors" - "fmt" "math" "net/http" "time" @@ -104,15 +103,19 @@ func (wf *WakuFilterFullNode) onRequest(ctx context.Context) func(network.Stream start := time.Now() - switch subscribeRequest.FilterSubscribeType { - case pb.FilterSubscribeRequest_SUBSCRIBE: - wf.subscribe(ctx, stream, subscribeRequest) - case pb.FilterSubscribeRequest_SUBSCRIBER_PING: - wf.ping(ctx, stream, subscribeRequest) - case pb.FilterSubscribeRequest_UNSUBSCRIBE: - wf.unsubscribe(ctx, stream, subscribeRequest) - case pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL: - wf.unsubscribeAll(ctx, stream, subscribeRequest) + if err := subscribeRequest.Validate(); err != nil { + wf.reply(ctx, stream, subscribeRequest, http.StatusBadRequest, err.Error()) + } else { + switch subscribeRequest.FilterSubscribeType { + case pb.FilterSubscribeRequest_SUBSCRIBE: + wf.subscribe(ctx, stream, subscribeRequest) + case pb.FilterSubscribeRequest_SUBSCRIBER_PING: + wf.ping(ctx, stream, subscribeRequest) + case pb.FilterSubscribeRequest_UNSUBSCRIBE: + wf.unsubscribe(ctx, stream, subscribeRequest) + case pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL: + wf.unsubscribeAll(ctx, stream, subscribeRequest) + } } stream.Close() @@ -157,20 +160,6 @@ func (wf *WakuFilterFullNode) ping(ctx context.Context, stream network.Stream, r } func (wf *WakuFilterFullNode) subscribe(ctx context.Context, stream network.Stream, request *pb.FilterSubscribeRequest) { - if request.PubsubTopic == nil { - wf.reply(ctx, stream, request, http.StatusBadRequest, "pubsubtopic can't be empty") - return - } - - if len(request.ContentTopics) == 0 { - wf.reply(ctx, stream, request, http.StatusBadRequest, "at least one contenttopic should be specified") - return - } - - if len(request.ContentTopics) > MaxContentTopicsPerRequest { - wf.reply(ctx, stream, request, http.StatusBadRequest, fmt.Sprintf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest)) - } - if wf.subscriptions.Count() >= wf.maxSubscriptions { wf.reply(ctx, stream, request, http.StatusServiceUnavailable, "node has reached maximum number of subscriptions") return @@ -197,20 +186,6 @@ func (wf *WakuFilterFullNode) subscribe(ctx context.Context, stream network.Stre } func (wf *WakuFilterFullNode) unsubscribe(ctx context.Context, stream network.Stream, request *pb.FilterSubscribeRequest) { - if request.PubsubTopic == nil { - wf.reply(ctx, stream, request, http.StatusBadRequest, "pubsubtopic can't be empty") - return - } - - if len(request.ContentTopics) == 0 { - wf.reply(ctx, stream, request, http.StatusBadRequest, "at least one contenttopic should be specified") - return - } - - if len(request.ContentTopics) > MaxContentTopicsPerRequest { - wf.reply(ctx, stream, request, http.StatusBadRequest, fmt.Sprintf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest)) - } - err := wf.subscriptions.Delete(stream.Conn().RemotePeer(), *request.PubsubTopic, request.ContentTopics) if err != nil { wf.reply(ctx, stream, request, http.StatusNotFound, peerHasNoSubscription)