diff --git a/Godeps b/Godeps index a2784e09d..cfcf8ca0b 100644 --- a/Godeps +++ b/Godeps @@ -8,3 +8,4 @@ github.com/bitly/timer_metrics afad1794bb13e2a094720aeb27c088aa64564895 github.com/blang/semver 9bf7bff48b0388cb75991e58c6df7d13e982f1f2 github.com/julienschmidt/httprouter 6aacfd5ab513e34f7e64ea9627ab9670371b34e7 github.com/judwhite/go-svc/svc 63c12402f579f0bdf022653c821a1aa5d7544f01 +github.com/nsqio/go-diskqueue d7805f889b023afcd5b8933e815ce9ba36327333 diff --git a/nsqd/channel.go b/nsqd/channel.go index 42eaa9368..174a9881e 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -10,6 +10,7 @@ import ( "sync/atomic" "time" + "github.com/nsqio/go-diskqueue" "github.com/nsqio/nsq/internal/pqueue" "github.com/nsqio/nsq/internal/quantile" ) @@ -95,7 +96,7 @@ func NewChannel(topicName string, channelName string, ctx *context, } else { // backend names, for uniqueness, automatically include the topic... backendName := getBackendName(topicName, channelName) - c.backend = newDiskQueue(backendName, + c.backend = diskqueue.New(backendName, ctx.nsqd.getOpts().DataPath, ctx.nsqd.getOpts().MaxBytesPerFile, int32(minValidMsgLength), diff --git a/nsqd/channel_test.go b/nsqd/channel_test.go index bccd8b582..b98d35339 100644 --- a/nsqd/channel_test.go +++ b/nsqd/channel_test.go @@ -59,20 +59,6 @@ func TestPutMessage2Chan(t *testing.T) { test.Equal(t, msg.Body, outputMsg2.Body) } -func TestChannelBackendMaxMsgSize(t *testing.T) { - opts := NewOptions() - opts.Logger = test.NewTestLogger(t) - _, _, nsqd := mustStartNSQD(opts) - defer os.RemoveAll(opts.DataPath) - defer nsqd.Exit() - - topicName := "test_channel_backend_maxmsgsize" + strconv.Itoa(int(time.Now().Unix())) - topic := nsqd.GetTopic(topicName) - ch := topic.GetChannel("ch") - - test.Equal(t, int32(opts.MaxMsgSize+minValidMsgLength), ch.backend.(*diskQueue).maxMsgSize) -} - func TestInFlightWorker(t *testing.T) { count := 250 diff --git a/nsqd/diskqueue.go b/nsqd/diskqueue.go deleted file mode 100644 index 01e7e9d3b..000000000 --- a/nsqd/diskqueue.go +++ /dev/null @@ -1,634 +0,0 @@ -package nsqd - -import ( - "bufio" - "bytes" - "encoding/binary" - "errors" - "fmt" - "io" - "math/rand" - "os" - "path" - "sync" - "sync/atomic" - "time" -) - -// diskQueue implements the BackendQueue interface -// providing a filesystem backed FIFO queue -type diskQueue struct { - // 64bit atomic vars need to be first for proper alignment on 32bit platforms - - // run-time state (also persisted to disk) - readPos int64 - writePos int64 - readFileNum int64 - writeFileNum int64 - depth int64 - - sync.RWMutex - - // instantiation time metadata - name string - dataPath string - maxBytesPerFile int64 // currently this cannot change once created - minMsgSize int32 - maxMsgSize int32 - syncEvery int64 // number of writes per fsync - syncTimeout time.Duration // duration of time per fsync - exitFlag int32 - needSync bool - - // keeps track of the position where we have read - // (but not yet sent over readChan) - nextReadPos int64 - nextReadFileNum int64 - - readFile *os.File - writeFile *os.File - reader *bufio.Reader - writeBuf bytes.Buffer - - // exposed via ReadChan() - readChan chan []byte - - // internal channels - writeChan chan []byte - writeResponseChan chan error - emptyChan chan int - emptyResponseChan chan error - exitChan chan int - exitSyncChan chan int - - logger Logger -} - -// newDiskQueue instantiates a new instance of diskQueue, retrieving metadata -// from the filesystem and starting the read ahead goroutine -func newDiskQueue(name string, dataPath string, maxBytesPerFile int64, - minMsgSize int32, maxMsgSize int32, - syncEvery int64, syncTimeout time.Duration, - logger Logger) BackendQueue { - d := diskQueue{ - name: name, - dataPath: dataPath, - maxBytesPerFile: maxBytesPerFile, - minMsgSize: minMsgSize, - maxMsgSize: maxMsgSize, - readChan: make(chan []byte), - writeChan: make(chan []byte), - writeResponseChan: make(chan error), - emptyChan: make(chan int), - emptyResponseChan: make(chan error), - exitChan: make(chan int), - exitSyncChan: make(chan int), - syncEvery: syncEvery, - syncTimeout: syncTimeout, - logger: logger, - } - - // no need to lock here, nothing else could possibly be touching this instance - err := d.retrieveMetaData() - if err != nil && !os.IsNotExist(err) { - d.logf("ERROR: diskqueue(%s) failed to retrieveMetaData - %s", d.name, err) - } - - go d.ioLoop() - - return &d -} - -func (d *diskQueue) logf(f string, args ...interface{}) { - if d.logger == nil { - return - } - d.logger.Output(2, fmt.Sprintf(f, args...)) -} - -// Depth returns the depth of the queue -func (d *diskQueue) Depth() int64 { - return atomic.LoadInt64(&d.depth) -} - -// ReadChan returns the []byte channel for reading data -func (d *diskQueue) ReadChan() chan []byte { - return d.readChan -} - -// Put writes a []byte to the queue -func (d *diskQueue) Put(data []byte) error { - d.RLock() - defer d.RUnlock() - - if d.exitFlag == 1 { - return errors.New("exiting") - } - - d.writeChan <- data - return <-d.writeResponseChan -} - -// Close cleans up the queue and persists metadata -func (d *diskQueue) Close() error { - err := d.exit(false) - if err != nil { - return err - } - return d.sync() -} - -func (d *diskQueue) Delete() error { - return d.exit(true) -} - -func (d *diskQueue) exit(deleted bool) error { - d.Lock() - defer d.Unlock() - - d.exitFlag = 1 - - if deleted { - d.logf("DISKQUEUE(%s): deleting", d.name) - } else { - d.logf("DISKQUEUE(%s): closing", d.name) - } - - close(d.exitChan) - // ensure that ioLoop has exited - <-d.exitSyncChan - - if d.readFile != nil { - d.readFile.Close() - d.readFile = nil - } - - if d.writeFile != nil { - d.writeFile.Close() - d.writeFile = nil - } - - return nil -} - -// Empty destructively clears out any pending data in the queue -// by fast forwarding read positions and removing intermediate files -func (d *diskQueue) Empty() error { - d.RLock() - defer d.RUnlock() - - if d.exitFlag == 1 { - return errors.New("exiting") - } - - d.logf("DISKQUEUE(%s): emptying", d.name) - - d.emptyChan <- 1 - return <-d.emptyResponseChan -} - -func (d *diskQueue) deleteAllFiles() error { - err := d.skipToNextRWFile() - - innerErr := os.Remove(d.metaDataFileName()) - if innerErr != nil && !os.IsNotExist(innerErr) { - d.logf("ERROR: diskqueue(%s) failed to remove metadata file - %s", d.name, innerErr) - return innerErr - } - - return err -} - -func (d *diskQueue) skipToNextRWFile() error { - var err error - - if d.readFile != nil { - d.readFile.Close() - d.readFile = nil - } - - if d.writeFile != nil { - d.writeFile.Close() - d.writeFile = nil - } - - for i := d.readFileNum; i <= d.writeFileNum; i++ { - fn := d.fileName(i) - innerErr := os.Remove(fn) - if innerErr != nil && !os.IsNotExist(innerErr) { - d.logf("ERROR: diskqueue(%s) failed to remove data file - %s", d.name, innerErr) - err = innerErr - } - } - - d.writeFileNum++ - d.writePos = 0 - d.readFileNum = d.writeFileNum - d.readPos = 0 - d.nextReadFileNum = d.writeFileNum - d.nextReadPos = 0 - atomic.StoreInt64(&d.depth, 0) - - return err -} - -// readOne performs a low level filesystem read for a single []byte -// while advancing read positions and rolling files, if necessary -func (d *diskQueue) readOne() ([]byte, error) { - var err error - var msgSize int32 - - if d.readFile == nil { - curFileName := d.fileName(d.readFileNum) - d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600) - if err != nil { - return nil, err - } - - d.logf("DISKQUEUE(%s): readOne() opened %s", d.name, curFileName) - - if d.readPos > 0 { - _, err = d.readFile.Seek(d.readPos, 0) - if err != nil { - d.readFile.Close() - d.readFile = nil - return nil, err - } - } - - d.reader = bufio.NewReader(d.readFile) - } - - err = binary.Read(d.reader, binary.BigEndian, &msgSize) - if err != nil { - d.readFile.Close() - d.readFile = nil - return nil, err - } - - if msgSize < d.minMsgSize || msgSize > d.maxMsgSize { - // this file is corrupt and we have no reasonable guarantee on - // where a new message should begin - d.readFile.Close() - d.readFile = nil - return nil, fmt.Errorf("invalid message read size (%d)", msgSize) - } - - readBuf := make([]byte, msgSize) - _, err = io.ReadFull(d.reader, readBuf) - if err != nil { - d.readFile.Close() - d.readFile = nil - return nil, err - } - - totalBytes := int64(4 + msgSize) - - // we only advance next* because we have not yet sent this to consumers - // (where readFileNum, readPos will actually be advanced) - d.nextReadPos = d.readPos + totalBytes - d.nextReadFileNum = d.readFileNum - - // TODO: each data file should embed the maxBytesPerFile - // as the first 8 bytes (at creation time) ensuring that - // the value can change without affecting runtime - if d.nextReadPos > d.maxBytesPerFile { - if d.readFile != nil { - d.readFile.Close() - d.readFile = nil - } - - d.nextReadFileNum++ - d.nextReadPos = 0 - } - - return readBuf, nil -} - -// writeOne performs a low level filesystem write for a single []byte -// while advancing write positions and rolling files, if necessary -func (d *diskQueue) writeOne(data []byte) error { - var err error - - if d.writeFile == nil { - curFileName := d.fileName(d.writeFileNum) - d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600) - if err != nil { - return err - } - - d.logf("DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName) - - if d.writePos > 0 { - _, err = d.writeFile.Seek(d.writePos, 0) - if err != nil { - d.writeFile.Close() - d.writeFile = nil - return err - } - } - } - - dataLen := int32(len(data)) - - if dataLen < d.minMsgSize || dataLen > d.maxMsgSize { - return fmt.Errorf("invalid message write size (%d) maxMsgSize=%d", dataLen, d.maxMsgSize) - } - - d.writeBuf.Reset() - err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen) - if err != nil { - return err - } - - _, err = d.writeBuf.Write(data) - if err != nil { - return err - } - - // only write to the file once - _, err = d.writeFile.Write(d.writeBuf.Bytes()) - if err != nil { - d.writeFile.Close() - d.writeFile = nil - return err - } - - totalBytes := int64(4 + dataLen) - d.writePos += totalBytes - atomic.AddInt64(&d.depth, 1) - - if d.writePos > d.maxBytesPerFile { - d.writeFileNum++ - d.writePos = 0 - - // sync every time we start writing to a new file - err = d.sync() - if err != nil { - d.logf("ERROR: diskqueue(%s) failed to sync - %s", d.name, err) - } - - if d.writeFile != nil { - d.writeFile.Close() - d.writeFile = nil - } - } - - return err -} - -// sync fsyncs the current writeFile and persists metadata -func (d *diskQueue) sync() error { - if d.writeFile != nil { - err := d.writeFile.Sync() - if err != nil { - d.writeFile.Close() - d.writeFile = nil - return err - } - } - - err := d.persistMetaData() - if err != nil { - return err - } - - d.needSync = false - return nil -} - -// retrieveMetaData initializes state from the filesystem -func (d *diskQueue) retrieveMetaData() error { - var f *os.File - var err error - - fileName := d.metaDataFileName() - f, err = os.OpenFile(fileName, os.O_RDONLY, 0600) - if err != nil { - return err - } - defer f.Close() - - var depth int64 - _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", - &depth, - &d.readFileNum, &d.readPos, - &d.writeFileNum, &d.writePos) - if err != nil { - return err - } - atomic.StoreInt64(&d.depth, depth) - d.nextReadFileNum = d.readFileNum - d.nextReadPos = d.readPos - - return nil -} - -// persistMetaData atomically writes state to the filesystem -func (d *diskQueue) persistMetaData() error { - var f *os.File - var err error - - fileName := d.metaDataFileName() - tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int()) - - // write to tmp file - f, err = os.OpenFile(tmpFileName, os.O_RDWR|os.O_CREATE, 0600) - if err != nil { - return err - } - - _, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n", - atomic.LoadInt64(&d.depth), - d.readFileNum, d.readPos, - d.writeFileNum, d.writePos) - if err != nil { - f.Close() - return err - } - f.Sync() - f.Close() - - // atomically rename - return atomicRename(tmpFileName, fileName) -} - -func (d *diskQueue) metaDataFileName() string { - return fmt.Sprintf(path.Join(d.dataPath, "%s.diskqueue.meta.dat"), d.name) -} - -func (d *diskQueue) fileName(fileNum int64) string { - return fmt.Sprintf(path.Join(d.dataPath, "%s.diskqueue.%06d.dat"), d.name, fileNum) -} - -func (d *diskQueue) checkTailCorruption(depth int64) { - if d.readFileNum < d.writeFileNum || d.readPos < d.writePos { - return - } - - // we've reached the end of the diskqueue - // if depth isn't 0 something went wrong - if depth != 0 { - if depth < 0 { - d.logf( - "ERROR: diskqueue(%s) negative depth at tail (%d), metadata corruption, resetting 0...", - d.name, depth) - } else if depth > 0 { - d.logf( - "ERROR: diskqueue(%s) positive depth at tail (%d), data loss, resetting 0...", - d.name, depth) - } - // force set depth 0 - atomic.StoreInt64(&d.depth, 0) - d.needSync = true - } - - if d.readFileNum != d.writeFileNum || d.readPos != d.writePos { - if d.readFileNum > d.writeFileNum { - d.logf( - "ERROR: diskqueue(%s) readFileNum > writeFileNum (%d > %d), corruption, skipping to next writeFileNum and resetting 0...", - d.name, d.readFileNum, d.writeFileNum) - } - - if d.readPos > d.writePos { - d.logf( - "ERROR: diskqueue(%s) readPos > writePos (%d > %d), corruption, skipping to next writeFileNum and resetting 0...", - d.name, d.readPos, d.writePos) - } - - d.skipToNextRWFile() - d.needSync = true - } -} - -func (d *diskQueue) moveForward() { - oldReadFileNum := d.readFileNum - d.readFileNum = d.nextReadFileNum - d.readPos = d.nextReadPos - depth := atomic.AddInt64(&d.depth, -1) - - // see if we need to clean up the old file - if oldReadFileNum != d.nextReadFileNum { - // sync every time we start reading from a new file - d.needSync = true - - fn := d.fileName(oldReadFileNum) - err := os.Remove(fn) - if err != nil { - d.logf("ERROR: failed to Remove(%s) - %s", fn, err) - } - } - - d.checkTailCorruption(depth) -} - -func (d *diskQueue) handleReadError() { - // jump to the next read file and rename the current (bad) file - if d.readFileNum == d.writeFileNum { - // if you can't properly read from the current write file it's safe to - // assume that something is fucked and we should skip the current file too - if d.writeFile != nil { - d.writeFile.Close() - d.writeFile = nil - } - d.writeFileNum++ - d.writePos = 0 - } - - badFn := d.fileName(d.readFileNum) - badRenameFn := badFn + ".bad" - - d.logf( - "NOTICE: diskqueue(%s) jump to next file and saving bad file as %s", - d.name, badRenameFn) - - err := atomicRename(badFn, badRenameFn) - if err != nil { - d.logf( - "ERROR: diskqueue(%s) failed to rename bad diskqueue file %s to %s", - d.name, badFn, badRenameFn) - } - - d.readFileNum++ - d.readPos = 0 - d.nextReadFileNum = d.readFileNum - d.nextReadPos = 0 - - // significant state change, schedule a sync on the next iteration - d.needSync = true -} - -// ioLoop provides the backend for exposing a go channel (via ReadChan()) -// in support of multiple concurrent queue consumers -// -// it works by looping and branching based on whether or not the queue has data -// to read and blocking until data is either read or written over the appropriate -// go channels -// -// conveniently this also means that we're asynchronously reading from the filesystem -func (d *diskQueue) ioLoop() { - var dataRead []byte - var err error - var count int64 - var r chan []byte - - syncTicker := time.NewTicker(d.syncTimeout) - - for { - // dont sync all the time :) - if count == d.syncEvery { - d.needSync = true - } - - if d.needSync { - err = d.sync() - if err != nil { - d.logf("ERROR: diskqueue(%s) failed to sync - %s", d.name, err) - } - count = 0 - } - - if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) { - if d.nextReadPos == d.readPos { - dataRead, err = d.readOne() - if err != nil { - d.logf("ERROR: reading from diskqueue(%s) at %d of %s - %s", - d.name, d.readPos, d.fileName(d.readFileNum), err) - d.handleReadError() - continue - } - } - r = d.readChan - } else { - r = nil - } - - select { - // the Go channel spec dictates that nil channel operations (read or write) - // in a select are skipped, we set r to d.readChan only when there is data to read - case r <- dataRead: - count++ - // moveForward sets needSync flag if a file is removed - d.moveForward() - case <-d.emptyChan: - d.emptyResponseChan <- d.deleteAllFiles() - count = 0 - case dataWrite := <-d.writeChan: - count++ - d.writeResponseChan <- d.writeOne(dataWrite) - case <-syncTicker.C: - if count == 0 { - // avoid sync when there's no activity - continue - } - d.needSync = true - case <-d.exitChan: - goto exit - } - } - -exit: - d.logf("DISKQUEUE(%s): closing ... ioLoop", d.name) - syncTicker.Stop() - d.exitSyncChan <- 1 -} diff --git a/nsqd/diskqueue_test.go b/nsqd/diskqueue_test.go deleted file mode 100644 index f0aead160..000000000 --- a/nsqd/diskqueue_test.go +++ /dev/null @@ -1,556 +0,0 @@ -package nsqd - -import ( - "bufio" - "bytes" - "fmt" - "io/ioutil" - "os" - "path" - "strconv" - "sync" - "sync/atomic" - "testing" - "time" - - "github.com/nsqio/nsq/internal/test" -) - -func TestDiskQueue(t *testing.T) { - l := test.NewTestLogger(t) - - dqName := "test_disk_queue" + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) - if err != nil { - panic(err) - } - defer os.RemoveAll(tmpDir) - dq := newDiskQueue(dqName, tmpDir, 1024, 4, 1<<10, 2500, 2*time.Second, l) - defer dq.Close() - test.NotNil(t, dq) - test.Equal(t, int64(0), dq.Depth()) - - msg := []byte("test") - err = dq.Put(msg) - test.Nil(t, err) - test.Equal(t, int64(1), dq.Depth()) - - msgOut := <-dq.ReadChan() - test.Equal(t, msg, msgOut) -} - -func TestDiskQueueRoll(t *testing.T) { - l := test.NewTestLogger(t) - dqName := "test_disk_queue_roll" + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) - if err != nil { - panic(err) - } - defer os.RemoveAll(tmpDir) - msg := bytes.Repeat([]byte{0}, 10) - ml := int64(len(msg)) - dq := newDiskQueue(dqName, tmpDir, 9*(ml+4), int32(ml), 1<<10, 2500, 2*time.Second, l) - defer dq.Close() - test.NotNil(t, dq) - test.Equal(t, int64(0), dq.Depth()) - - for i := 0; i < 10; i++ { - err := dq.Put(msg) - test.Nil(t, err) - test.Equal(t, int64(i+1), dq.Depth()) - } - - test.Equal(t, int64(1), dq.(*diskQueue).writeFileNum) - test.Equal(t, int64(0), dq.(*diskQueue).writePos) -} - -func assertFileNotExist(t *testing.T, fn string) { - f, err := os.OpenFile(fn, os.O_RDONLY, 0600) - test.Equal(t, (*os.File)(nil), f) - test.Equal(t, true, os.IsNotExist(err)) -} - -func TestDiskQueueEmpty(t *testing.T) { - l := test.NewTestLogger(t) - dqName := "test_disk_queue_empty" + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) - if err != nil { - panic(err) - } - defer os.RemoveAll(tmpDir) - msg := bytes.Repeat([]byte{0}, 10) - dq := newDiskQueue(dqName, tmpDir, 100, 0, 1<<10, 2500, 2*time.Second, l) - defer dq.Close() - test.NotNil(t, dq) - test.Equal(t, int64(0), dq.Depth()) - - for i := 0; i < 100; i++ { - err := dq.Put(msg) - test.Nil(t, err) - test.Equal(t, int64(i+1), dq.Depth()) - } - - for i := 0; i < 3; i++ { - <-dq.ReadChan() - } - - for { - if dq.Depth() == 97 { - break - } - time.Sleep(50 * time.Millisecond) - } - test.Equal(t, int64(97), dq.Depth()) - - numFiles := dq.(*diskQueue).writeFileNum - dq.Empty() - - assertFileNotExist(t, dq.(*diskQueue).metaDataFileName()) - for i := int64(0); i <= numFiles; i++ { - assertFileNotExist(t, dq.(*diskQueue).fileName(i)) - } - test.Equal(t, int64(0), dq.Depth()) - test.Equal(t, dq.(*diskQueue).writeFileNum, dq.(*diskQueue).readFileNum) - test.Equal(t, dq.(*diskQueue).writePos, dq.(*diskQueue).readPos) - test.Equal(t, dq.(*diskQueue).readPos, dq.(*diskQueue).nextReadPos) - test.Equal(t, dq.(*diskQueue).readFileNum, dq.(*diskQueue).nextReadFileNum) - - for i := 0; i < 100; i++ { - err := dq.Put(msg) - test.Nil(t, err) - test.Equal(t, int64(i+1), dq.Depth()) - } - - for i := 0; i < 100; i++ { - <-dq.ReadChan() - } - - for { - if dq.Depth() == 0 { - break - } - time.Sleep(50 * time.Millisecond) - } - - test.Equal(t, int64(0), dq.Depth()) - test.Equal(t, dq.(*diskQueue).writeFileNum, dq.(*diskQueue).readFileNum) - test.Equal(t, dq.(*diskQueue).writePos, dq.(*diskQueue).readPos) - test.Equal(t, dq.(*diskQueue).readPos, dq.(*diskQueue).nextReadPos) -} - -func TestDiskQueueCorruption(t *testing.T) { - l := test.NewTestLogger(t) - dqName := "test_disk_queue_corruption" + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) - if err != nil { - panic(err) - } - defer os.RemoveAll(tmpDir) - // require a non-zero message length for the corrupt (len 0) test below - dq := newDiskQueue(dqName, tmpDir, 1000, 10, 1<<10, 5, 2*time.Second, l) - defer dq.Close() - - msg := make([]byte, 123) // 127 bytes per message, 8 (1016 bytes) messages per file - for i := 0; i < 25; i++ { - dq.Put(msg) - } - - test.Equal(t, int64(25), dq.Depth()) - - // corrupt the 2nd file - dqFn := dq.(*diskQueue).fileName(1) - os.Truncate(dqFn, 500) // 3 valid messages, 5 corrupted - - for i := 0; i < 19; i++ { // 1 message leftover in 4th file - test.Equal(t, msg, <-dq.ReadChan()) - } - - // corrupt the 4th (current) file - dqFn = dq.(*diskQueue).fileName(3) - os.Truncate(dqFn, 100) - - dq.Put(msg) // in 5th file - - test.Equal(t, msg, <-dq.ReadChan()) - - // write a corrupt (len 0) message at the 5th (current) file - dq.(*diskQueue).writeFile.Write([]byte{0, 0, 0, 0}) - - // force a new 6th file - put into 5th, then readOne errors, then put into 6th - dq.Put(msg) - dq.Put(msg) - - test.Equal(t, msg, <-dq.ReadChan()) -} - -type md struct { - depth int64 - readFileNum int64 - writeFileNum int64 - readPos int64 - writePos int64 -} - -func readMetaDataFile(fileName string, retried int) md { - f, err := os.OpenFile(fileName, os.O_RDONLY, 0600) - if err != nil { - // provide a simple retry that results in up to - // another 500ms for the file to be written. - if retried < 9 { - retried++ - time.Sleep(50 * time.Millisecond) - return readMetaDataFile(fileName, retried) - } - panic(err) - } - defer f.Close() - - var ret md - _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", - &ret.depth, - &ret.readFileNum, &ret.readPos, - &ret.writeFileNum, &ret.writePos) - if err != nil { - panic(err) - } - return ret -} - -func TestDiskQueueSyncAfterRead(t *testing.T) { - l := test.NewTestLogger(t) - dqName := "test_disk_queue_read_after_sync" + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) - if err != nil { - panic(err) - } - defer os.RemoveAll(tmpDir) - dq := newDiskQueue(dqName, tmpDir, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) - defer dq.Close() - - msg := make([]byte, 1000) - dq.Put(msg) - - for i := 0; i < 10; i++ { - d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0) - if d.depth == 1 && - d.readFileNum == 0 && - d.writeFileNum == 0 && - d.readPos == 0 && - d.writePos == 1004 { - // success - goto next - } - time.Sleep(100 * time.Millisecond) - } - panic("fail") - -next: - dq.Put(msg) - <-dq.ReadChan() - - for i := 0; i < 10; i++ { - d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0) - if d.depth == 1 && - d.readFileNum == 0 && - d.writeFileNum == 0 && - d.readPos == 1004 && - d.writePos == 2008 { - // success - goto done - } - time.Sleep(100 * time.Millisecond) - } - panic("fail") - -done: -} - -func TestDiskQueueTorture(t *testing.T) { - var wg sync.WaitGroup - - l := test.NewTestLogger(t) - dqName := "test_disk_queue_torture" + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) - if err != nil { - panic(err) - } - defer os.RemoveAll(tmpDir) - dq := newDiskQueue(dqName, tmpDir, 262144, 0, 1<<10, 2500, 2*time.Second, l) - test.NotNil(t, dq) - test.Equal(t, int64(0), dq.Depth()) - - msg := []byte("aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeeeffffffffff") - - numWriters := 4 - numReaders := 4 - readExitChan := make(chan int) - writeExitChan := make(chan int) - - var depth int64 - for i := 0; i < numWriters; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for { - time.Sleep(100000 * time.Nanosecond) - select { - case <-writeExitChan: - return - default: - err := dq.Put(msg) - if err == nil { - atomic.AddInt64(&depth, 1) - } - } - } - }() - } - - time.Sleep(1 * time.Second) - - dq.Close() - - t.Logf("closing writeExitChan") - close(writeExitChan) - wg.Wait() - - t.Logf("restarting diskqueue") - - dq = newDiskQueue(dqName, tmpDir, 262144, 0, 1<<10, 2500, 2*time.Second, l) - defer dq.Close() - test.NotNil(t, dq) - test.Equal(t, depth, dq.Depth()) - - var read int64 - for i := 0; i < numReaders; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for { - time.Sleep(100000 * time.Nanosecond) - select { - case m := <-dq.ReadChan(): - test.Equal(t, m, msg) - atomic.AddInt64(&read, 1) - case <-readExitChan: - return - } - } - }() - } - - t.Logf("waiting for depth 0") - for { - if dq.Depth() == 0 { - break - } - time.Sleep(50 * time.Millisecond) - } - - t.Logf("closing readExitChan") - close(readExitChan) - wg.Wait() - - test.Equal(t, depth, read) -} - -func BenchmarkDiskQueuePut16(b *testing.B) { - benchmarkDiskQueuePut(16, b) -} -func BenchmarkDiskQueuePut64(b *testing.B) { - benchmarkDiskQueuePut(64, b) -} -func BenchmarkDiskQueuePut256(b *testing.B) { - benchmarkDiskQueuePut(256, b) -} -func BenchmarkDiskQueuePut1024(b *testing.B) { - benchmarkDiskQueuePut(1024, b) -} -func BenchmarkDiskQueuePut4096(b *testing.B) { - benchmarkDiskQueuePut(4096, b) -} -func BenchmarkDiskQueuePut16384(b *testing.B) { - benchmarkDiskQueuePut(16384, b) -} -func BenchmarkDiskQueuePut65536(b *testing.B) { - benchmarkDiskQueuePut(65536, b) -} -func BenchmarkDiskQueuePut262144(b *testing.B) { - benchmarkDiskQueuePut(262144, b) -} -func BenchmarkDiskQueuePut1048576(b *testing.B) { - benchmarkDiskQueuePut(1048576, b) -} -func benchmarkDiskQueuePut(size int64, b *testing.B) { - b.StopTimer() - l := test.NewTestLogger(b) - dqName := "bench_disk_queue_put" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) - if err != nil { - panic(err) - } - defer os.RemoveAll(tmpDir) - dq := newDiskQueue(dqName, tmpDir, 1024768*100, 0, 1<<20, 2500, 2*time.Second, l) - defer dq.Close() - b.SetBytes(size) - data := make([]byte, size) - b.StartTimer() - - for i := 0; i < b.N; i++ { - err := dq.Put(data) - if err != nil { - panic(err) - } - } -} - -func BenchmarkDiskWrite16(b *testing.B) { - benchmarkDiskWrite(16, b) -} -func BenchmarkDiskWrite64(b *testing.B) { - benchmarkDiskWrite(64, b) -} -func BenchmarkDiskWrite256(b *testing.B) { - benchmarkDiskWrite(256, b) -} -func BenchmarkDiskWrite1024(b *testing.B) { - benchmarkDiskWrite(1024, b) -} -func BenchmarkDiskWrite4096(b *testing.B) { - benchmarkDiskWrite(4096, b) -} -func BenchmarkDiskWrite16384(b *testing.B) { - benchmarkDiskWrite(16384, b) -} -func BenchmarkDiskWrite65536(b *testing.B) { - benchmarkDiskWrite(65536, b) -} -func BenchmarkDiskWrite262144(b *testing.B) { - benchmarkDiskWrite(262144, b) -} -func BenchmarkDiskWrite1048576(b *testing.B) { - benchmarkDiskWrite(1048576, b) -} -func benchmarkDiskWrite(size int64, b *testing.B) { - b.StopTimer() - fileName := "bench_disk_queue_put" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) - if err != nil { - panic(err) - } - defer os.RemoveAll(tmpDir) - f, _ := os.OpenFile(path.Join(tmpDir, fileName), os.O_RDWR|os.O_CREATE, 0600) - b.SetBytes(size) - data := make([]byte, size) - b.StartTimer() - - for i := 0; i < b.N; i++ { - f.Write(data) - } - f.Sync() -} - -func BenchmarkDiskWriteBuffered16(b *testing.B) { - benchmarkDiskWriteBuffered(16, b) -} -func BenchmarkDiskWriteBuffered64(b *testing.B) { - benchmarkDiskWriteBuffered(64, b) -} -func BenchmarkDiskWriteBuffered256(b *testing.B) { - benchmarkDiskWriteBuffered(256, b) -} -func BenchmarkDiskWriteBuffered1024(b *testing.B) { - benchmarkDiskWriteBuffered(1024, b) -} -func BenchmarkDiskWriteBuffered4096(b *testing.B) { - benchmarkDiskWriteBuffered(4096, b) -} -func BenchmarkDiskWriteBuffered16384(b *testing.B) { - benchmarkDiskWriteBuffered(16384, b) -} -func BenchmarkDiskWriteBuffered65536(b *testing.B) { - benchmarkDiskWriteBuffered(65536, b) -} -func BenchmarkDiskWriteBuffered262144(b *testing.B) { - benchmarkDiskWriteBuffered(262144, b) -} -func BenchmarkDiskWriteBuffered1048576(b *testing.B) { - benchmarkDiskWriteBuffered(1048576, b) -} -func benchmarkDiskWriteBuffered(size int64, b *testing.B) { - b.StopTimer() - fileName := "bench_disk_queue_put" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) - if err != nil { - panic(err) - } - defer os.RemoveAll(tmpDir) - f, _ := os.OpenFile(path.Join(tmpDir, fileName), os.O_RDWR|os.O_CREATE, 0600) - b.SetBytes(size) - data := make([]byte, size) - w := bufio.NewWriterSize(f, 1024*4) - b.StartTimer() - - for i := 0; i < b.N; i++ { - w.Write(data) - if i%1024 == 0 { - w.Flush() - } - } - w.Flush() - f.Sync() -} - -// you might want to run this like -// $ go test -bench=DiskQueueGet -benchtime 0.1s -// too avoid doing too many iterations. -func BenchmarkDiskQueueGet16(b *testing.B) { - benchmarkDiskQueueGet(16, b) -} -func BenchmarkDiskQueueGet64(b *testing.B) { - benchmarkDiskQueueGet(64, b) -} -func BenchmarkDiskQueueGet256(b *testing.B) { - benchmarkDiskQueueGet(256, b) -} -func BenchmarkDiskQueueGet1024(b *testing.B) { - benchmarkDiskQueueGet(1024, b) -} -func BenchmarkDiskQueueGet4096(b *testing.B) { - benchmarkDiskQueueGet(4096, b) -} -func BenchmarkDiskQueueGet16384(b *testing.B) { - benchmarkDiskQueueGet(16384, b) -} -func BenchmarkDiskQueueGet65536(b *testing.B) { - benchmarkDiskQueueGet(65536, b) -} -func BenchmarkDiskQueueGet262144(b *testing.B) { - benchmarkDiskQueueGet(262144, b) -} -func BenchmarkDiskQueueGet1048576(b *testing.B) { - benchmarkDiskQueueGet(1048576, b) -} - -func benchmarkDiskQueueGet(size int64, b *testing.B) { - b.StopTimer() - l := test.NewTestLogger(b) - dqName := "bench_disk_queue_get" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix())) - tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) - if err != nil { - panic(err) - } - defer os.RemoveAll(tmpDir) - dq := newDiskQueue(dqName, tmpDir, 1024768, 0, 1<<30, 2500, 2*time.Second, l) - defer dq.Close() - b.SetBytes(size) - data := make([]byte, size) - for i := 0; i < b.N; i++ { - dq.Put(data) - } - b.StartTimer() - - for i := 0; i < b.N; i++ { - <-dq.ReadChan() - } -} diff --git a/nsqd/topic.go b/nsqd/topic.go index b375746eb..4c3ee2560 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -8,6 +8,7 @@ import ( "sync/atomic" "time" + "github.com/nsqio/go-diskqueue" "github.com/nsqio/nsq/internal/quantile" "github.com/nsqio/nsq/internal/util" ) @@ -56,7 +57,7 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi t.ephemeral = true t.backend = newDummyBackendQueue() } else { - t.backend = newDiskQueue(topicName, + t.backend = diskqueue.New(topicName, ctx.nsqd.getOpts().DataPath, ctx.nsqd.getOpts().MaxBytesPerFile, int32(minValidMsgLength), diff --git a/nsqd/topic_test.go b/nsqd/topic_test.go index 9a7fa53b9..013b7ed08 100644 --- a/nsqd/topic_test.go +++ b/nsqd/topic_test.go @@ -195,19 +195,6 @@ func TestPause(t *testing.T) { test.Equal(t, int64(1), channel.Depth()) } -func TestTopicBackendMaxMsgSize(t *testing.T) { - opts := NewOptions() - opts.Logger = test.NewTestLogger(t) - _, _, nsqd := mustStartNSQD(opts) - defer os.RemoveAll(opts.DataPath) - defer nsqd.Exit() - - topicName := "test_topic_backend_maxmsgsize" + strconv.Itoa(int(time.Now().Unix())) - topic := nsqd.GetTopic(topicName) - - test.Equal(t, int32(opts.MaxMsgSize+minValidMsgLength), topic.backend.(*diskQueue).maxMsgSize) -} - func BenchmarkTopicPut(b *testing.B) { b.StopTimer() topicName := "bench_topic_put" + strconv.Itoa(b.N)