Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: enforce minimum msg size when reading from the diskqueue #600

Merged
merged 1 commit into from
Jul 13, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ func NewChannel(topicName string, channelName string, ctx *context,
c.backend = newDiskQueue(backendName,
ctx.nsqd.opts.DataPath,
ctx.nsqd.opts.MaxBytesPerFile,
int32(minValidMsgLength),
int32(ctx.nsqd.opts.MaxMsgSize),
ctx.nsqd.opts.SyncEvery,
ctx.nsqd.opts.SyncTimeout,
ctx.nsqd.opts.Logger)
Expand Down
23 changes: 20 additions & 3 deletions nsqd/diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ type diskQueue struct {
// instantiation time metadata
name string
dataPath string
maxBytesPerFile int64 // currently this cannot change once created
maxBytesPerFile int64 // currently this cannot change once created
minMsgSize int32
maxMsgSize int32
syncEvery int64 // number of writes per fsync
syncTimeout time.Duration // duration of time per fsync
exitFlag int32
Expand Down Expand Up @@ -65,12 +67,15 @@ type diskQueue struct {
// newDiskQueue instantiates a new instance of diskQueue, retrieving metadata
// from the filesystem and starting the read ahead goroutine
func newDiskQueue(name string, dataPath string, maxBytesPerFile int64,
minMsgSize int32, maxMsgSize int32,
syncEvery int64, syncTimeout time.Duration,
logger logger) BackendQueue {
d := diskQueue{
name: name,
dataPath: dataPath,
maxBytesPerFile: maxBytesPerFile,
minMsgSize: minMsgSize,
maxMsgSize: maxMsgSize,
readChan: make(chan []byte),
writeChan: make(chan []byte),
writeResponseChan: make(chan error),
Expand Down Expand Up @@ -261,6 +266,14 @@ func (d *diskQueue) readOne() ([]byte, error) {
return nil, err
}

if msgSize < d.minMsgSize || msgSize > d.maxMsgSize {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if msgSize > d.maxMsgSize means it's corrupt? It should skip it, but does it need to discard the whole segment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

msgSize could be > d.maxMsgSize because of either

a) a corrupt message, or
b) a person changed the flag of their maxMsgSize between nsqd restarts

If it's a corrupt message, then we'd skip forward in the file and then have no guarantee of what we're reading. If a person changed their maxMsgSize, they'd miss this message and be fine reading the next. I don't think it's too common to reduce MaxMsgSize between nsqd restarts other than when a person is trying to figure out parameters, and if they're fine with losing any large messages when tuning, I can't see why they wouldn't be fine losing an entire segment.

I favor discarding the file but I could go either way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, you've sold me - all of this falls under "reasonable behavior" without actually knowing if it's corrupt.

// this file is corrupt and we have no reasonable guarantee on
// where a new message should begin
d.readFile.Close()
d.readFile = nil
return nil, fmt.Errorf("invalid message read size (%d)", msgSize)
}

readBuf := make([]byte, msgSize)
_, err = io.ReadFull(d.reader, readBuf)
if err != nil {
Expand Down Expand Up @@ -316,10 +329,14 @@ func (d *diskQueue) writeOne(data []byte) error {
}
}

dataLen := len(data)
dataLen := int32(len(data))

if dataLen < d.minMsgSize || dataLen > d.maxMsgSize {
return fmt.Errorf("invalid message write size (%d)", dataLen)
}

d.writeBuf.Reset()
err = binary.Write(&d.writeBuf, binary.BigEndian, int32(dataLen))
err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
if err != nil {
return err
}
Expand Down
41 changes: 26 additions & 15 deletions nsqd/diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nsqd

import (
"bufio"
"bytes"
"fmt"
"io/ioutil"
"os"
Expand All @@ -22,7 +23,7 @@ func TestDiskQueue(t *testing.T) {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := newDiskQueue(dqName, tmpDir, 1024, 2500, 2*time.Second, l)
dq := newDiskQueue(dqName, tmpDir, 1024, 4, 1<<10, 2500, 2*time.Second, l)
nequal(t, dq, nil)
equal(t, dq.Depth(), int64(0))

Expand All @@ -43,19 +44,20 @@ func TestDiskQueueRoll(t *testing.T) {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := newDiskQueue(dqName, tmpDir, 100, 2500, 2*time.Second, l)
msg := bytes.Repeat([]byte{0}, 10)
ml := int64(len(msg))
dq := newDiskQueue(dqName, tmpDir, 9*(ml+4), int32(ml), 1<<10, 2500, 2*time.Second, l)
nequal(t, dq, nil)
equal(t, dq.Depth(), int64(0))

msg := []byte("aaaaaaaaaa")
for i := 0; i < 10; i++ {
err := dq.Put(msg)
equal(t, err, nil)
equal(t, dq.Depth(), int64(i+1))
}

equal(t, dq.(*diskQueue).writeFileNum, int64(1))
equal(t, dq.(*diskQueue).writePos, int64(28))
equal(t, dq.(*diskQueue).writePos, int64(0))
}

func assertFileNotExist(t *testing.T, fn string) {
Expand All @@ -72,12 +74,11 @@ func TestDiskQueueEmpty(t *testing.T) {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := newDiskQueue(dqName, tmpDir, 100, 2500, 2*time.Second, l)
msg := bytes.Repeat([]byte{0}, 10)
dq := newDiskQueue(dqName, tmpDir, 100, 0, 1<<10, 2500, 2*time.Second, l)
nequal(t, dq, nil)
equal(t, dq.Depth(), int64(0))

msg := []byte("aaaaaaaaaa")

for i := 0; i < 100; i++ {
err := dq.Put(msg)
equal(t, err, nil)
Expand Down Expand Up @@ -140,9 +141,10 @@ func TestDiskQueueCorruption(t *testing.T) {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := newDiskQueue(dqName, tmpDir, 1000, 5, 2*time.Second, l)
// require a non-zero message length for the corrupt (len 0) test below
dq := newDiskQueue(dqName, tmpDir, 1000, 10, 1<<10, 5, 2*time.Second, l)

msg := make([]byte, 123)
msg := make([]byte, 123) // 127 bytes per message, 8 (1016 bytes) messages per file
for i := 0; i < 25; i++ {
dq.Put(msg)
}
Expand All @@ -151,16 +153,25 @@ func TestDiskQueueCorruption(t *testing.T) {

// corrupt the 2nd file
dqFn := dq.(*diskQueue).fileName(1)
os.Truncate(dqFn, 500)
os.Truncate(dqFn, 500) // 3 valid messages, 5 corrupted

for i := 0; i < 19; i++ {
for i := 0; i < 19; i++ { // 1 message leftover in 4th file
equal(t, <-dq.ReadChan(), msg)
}

// corrupt the 4th (current) file
dqFn = dq.(*diskQueue).fileName(3)
os.Truncate(dqFn, 100)

dq.Put(msg) // in 5th file

equal(t, <-dq.ReadChan(), msg)

// write a corrupt (len 0) message at the 5th (current) file
dq.(*diskQueue).writeFile.Write([]byte{0, 0, 0, 0})

// force a new 6th file - put into 5th, then readOne errors, then put into 6th
dq.Put(msg)
dq.Put(msg)

equal(t, <-dq.ReadChan(), msg)
Expand All @@ -176,7 +187,7 @@ func TestDiskQueueTorture(t *testing.T) {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := newDiskQueue(dqName, tmpDir, 262144, 2500, 2*time.Second, l)
dq := newDiskQueue(dqName, tmpDir, 262144, 0, 1<<10, 2500, 2*time.Second, l)
nequal(t, dq, nil)
equal(t, dq.Depth(), int64(0))

Expand Down Expand Up @@ -217,7 +228,7 @@ func TestDiskQueueTorture(t *testing.T) {

t.Logf("restarting diskqueue")

dq = newDiskQueue(dqName, tmpDir, 262144, 2500, 2*time.Second, l)
dq = newDiskQueue(dqName, tmpDir, 262144, 0, 1<<10, 2500, 2*time.Second, l)
nequal(t, dq, nil)
equal(t, dq.Depth(), depth)

Expand Down Expand Up @@ -265,7 +276,7 @@ func BenchmarkDiskQueuePut(b *testing.B) {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := newDiskQueue(dqName, tmpDir, 1024768*100, 2500, 2*time.Second, l)
dq := newDiskQueue(dqName, tmpDir, 1024768*100, 0, 1<<10, 2500, 2*time.Second, l)
size := 1024
b.SetBytes(int64(size))
data := make([]byte, size)
Expand Down Expand Up @@ -333,7 +344,7 @@ func BenchmarkDiskQueueGet(b *testing.B) {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := newDiskQueue(dqName, tmpDir, 1024768, 2500, 2*time.Second, l)
dq := newDiskQueue(dqName, tmpDir, 1024768, 0, 1<<10, 2500, 2*time.Second, l)
for i := 0; i < b.N; i++ {
dq.Put([]byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa"))
}
Expand Down
7 changes: 5 additions & 2 deletions nsqd/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (
"time"
)

const MsgIDLength = 16
const (
MsgIDLength = 16
minValidMsgLength = MsgIDLength + 8 + 2 // Timestamp + Attempts
)

type MessageID [MsgIDLength]byte

Expand Down Expand Up @@ -66,7 +69,7 @@ func (m *Message) WriteTo(w io.Writer) (int64, error) {
func decodeMessage(b []byte) (*Message, error) {
var msg Message

if len(b) < 26 {
if len(b) < minValidMsgLength {
return nil, fmt.Errorf("invalid message buffer size (%d)", len(b))
}

Expand Down
2 changes: 2 additions & 0 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi
t.backend = newDiskQueue(topicName,
ctx.nsqd.opts.DataPath,
ctx.nsqd.opts.MaxBytesPerFile,
int32(minValidMsgLength),
int32(ctx.nsqd.opts.MaxMsgSize),
ctx.nsqd.opts.SyncEvery,
ctx.nsqd.opts.SyncTimeout,
ctx.nsqd.opts.Logger)
Expand Down