diff --git a/src/metrics/encoding/protobuf/unaggregated_iterator.go b/src/metrics/encoding/protobuf/unaggregated_iterator.go index 81d9038f76..d5f8a567f5 100644 --- a/src/metrics/encoding/protobuf/unaggregated_iterator.go +++ b/src/metrics/encoding/protobuf/unaggregated_iterator.go @@ -102,6 +102,10 @@ func (it *unaggregatedIterator) Next() bool { it.err = fmt.Errorf("decoded message size %d is larger than supported max message size %d", size, it.maxMessageSize) return false } + if size <= 0 { + it.err = fmt.Errorf("decoded message size %d is zero or negative", size) + return false + } it.ensureBufferSize(size) if err := it.decodeMessage(size); err != nil { return false diff --git a/src/metrics/encoding/protobuf/unaggregated_iterator_test.go b/src/metrics/encoding/protobuf/unaggregated_iterator_test.go index 523e923b19..4aa96d7785 100644 --- a/src/metrics/encoding/protobuf/unaggregated_iterator_test.go +++ b/src/metrics/encoding/protobuf/unaggregated_iterator_test.go @@ -22,6 +22,7 @@ package protobuf import ( "bytes" + "encoding/binary" "io" "strings" "testing" @@ -103,7 +104,7 @@ func TestUnaggregatedIteratorDecodeBatchTimerWithMetadatas(t *testing.T) { enc := NewUnaggregatedEncoder(NewUnaggregatedOptions()) for _, input := range inputs { require.NoError(t, enc.EncodeMessage(encoding.UnaggregatedMessageUnion{ - Type: encoding.BatchTimerWithMetadatasType, + Type: encoding.BatchTimerWithMetadatasType, BatchTimerWithMetadatas: input, })) } @@ -195,7 +196,7 @@ func TestUnaggregatedIteratorDecodeForwardedMetricWithMetadata(t *testing.T) { enc := NewUnaggregatedEncoder(NewUnaggregatedOptions()) for _, input := range inputs { require.NoError(t, enc.EncodeMessage(encoding.UnaggregatedMessageUnion{ - Type: encoding.ForwardedMetricWithMetadataType, + Type: encoding.ForwardedMetricWithMetadataType, ForwardedMetricWithMetadata: input, })) } @@ -286,7 +287,7 @@ func TestUnaggregatedIteratorDecodeTimedMetricWithMetadata(t *testing.T) { enc := NewUnaggregatedEncoder(NewUnaggregatedOptions()) for _, input := range inputs { require.NoError(t, enc.EncodeMessage(encoding.UnaggregatedMessageUnion{ - Type: encoding.TimedMetricWithMetadataType, + Type: encoding.TimedMetricWithMetadataType, TimedMetricWithMetadata: input, })) } @@ -406,7 +407,7 @@ func TestUnaggregatedIteratorDecodeStress(t *testing.T) { } case unaggregated.BatchTimerWithMetadatas: msg = encoding.UnaggregatedMessageUnion{ - Type: encoding.BatchTimerWithMetadatasType, + Type: encoding.BatchTimerWithMetadatasType, BatchTimerWithMetadatas: input, } case unaggregated.GaugeWithMetadatas: @@ -416,17 +417,17 @@ func TestUnaggregatedIteratorDecodeStress(t *testing.T) { } case aggregated.ForwardedMetricWithMetadata: msg = encoding.UnaggregatedMessageUnion{ - Type: encoding.ForwardedMetricWithMetadataType, + Type: encoding.ForwardedMetricWithMetadataType, ForwardedMetricWithMetadata: input, } case aggregated.TimedMetricWithMetadata: msg = encoding.UnaggregatedMessageUnion{ - Type: encoding.TimedMetricWithMetadataType, + Type: encoding.TimedMetricWithMetadataType, TimedMetricWithMetadata: input, } case aggregated.PassthroughMetricWithMetadata: msg = encoding.UnaggregatedMessageUnion{ - Type: encoding.PassthroughMetricWithMetadataType, + Type: encoding.PassthroughMetricWithMetadataType, PassthroughMetricWithMetadata: input, } default: @@ -554,3 +555,20 @@ func TestUnaggregatedIteratorNextOnClose(t *testing.T) { // Verify that closing a second time is a no op. it.Close() } + +func TestUnaggregatedIteratorNextOnInvalid(t *testing.T) { + buf := make([]byte, 32) + binary.PutVarint(buf, 0) + stream := bytes.NewReader(buf) + + it := NewUnaggregatedIterator(stream, NewUnaggregatedOptions()) + require.False(t, it.Next()) + require.False(t, it.Next()) + + buf = make([]byte, 32) + binary.PutVarint(buf, -1234) + stream = bytes.NewReader(buf) + it = NewUnaggregatedIterator(stream, NewUnaggregatedOptions()) + require.False(t, it.Next()) + require.False(t, it.Next()) +}