diff --git a/nsqd/delay_queue.go b/nsqd/delay_queue.go index 40a310a1c..ccdf4cfa1 100644 --- a/nsqd/delay_queue.go +++ b/nsqd/delay_queue.go @@ -815,8 +815,32 @@ func (q *DelayQueue) put(m *Message, rawData []byte, trace bool, checkSize int64 wstart := time.Now() q.compactMutex.Lock() err = q.getStore().Update(func(tx *bolt.Tx) error { + newIndexKey := getDelayedMsgDBIndexKey(int(m.DelayedType), m.DelayedChannel, m.DelayedOrigID) + ib := tx.Bucket(bucketDelayedMsgIndex) + iv := ib.Get(newIndexKey) b := tx.Bucket(bucketDelayedMsg) - oldV := b.Get(msgKey) + var oldV []byte + var oldMsgKey []byte + if iv != nil { + ts, oldID, err := decodeDelayedMsgDBIndexValue(iv) + if err != nil { + return err + } + oldMsgKey = getDelayedMsgDBKey(int(m.DelayedType), m.DelayedChannel, ts, oldID) + oldV = b.Get(oldMsgKey) + // it may have some old data which only have index but no msg value + if len(oldV) > 0 { + oldMsg, err := DecodeDelayedMessage(oldV, q.IsExt()) + if err != nil { + nsqLog.Warningf("found old delayed index key %v (%v, %v) msg value data wrong: %v, %v", newIndexKey, iv, oldMsgKey, oldV, m) + // we can just delete this safely + } else if oldMsg.DelayedOrigID != m.DelayedOrigID || oldMsg.DelayedChannel != m.DelayedChannel || oldMsg.DelayedType != m.DelayedType { + // the value from old index is not the same message we are inserting, this may happend when old code use the wrong id in index + nsqLog.Infof("found old delayed index key %v (%v, %v) msg value not matched : %v, %v", newIndexKey, iv, oldMsgKey, oldMsg, m) + oldV = nil + } + } + } exists := oldV != nil if exists && bytes.Equal(oldV, q.putBuffer.Bytes()) { } else { @@ -824,18 +848,22 @@ func (q *DelayQueue) put(m *Message, rawData []byte, trace bool, checkSize int64 if err != nil { return err } - if oldV != nil { - err = deleteMsgIndex(oldV, tx, q.IsExt()) + if oldV != nil && (!bytes.Equal(oldMsgKey, msgKey)) { + err := b.Delete(oldMsgKey) + if err != nil { + nsqLog.Infof("failed to delete old delayed message: %v", msgKey) + return err + } + err = ib.Delete(newIndexKey) if err != nil { nsqLog.Infof("failed to delete old delayed index : %v, %v", oldV, err) return err } } // note here we only support one index for the same message(if dup inserted, only one has the index) - b = tx.Bucket(bucketDelayedMsgIndex) - newIndexKey := getDelayedMsgDBIndexKey(int(m.DelayedType), m.DelayedChannel, m.DelayedOrigID) - d := getDelayedMsgDBIndexValue(m.DelayedTs, m.DelayedOrigID) - err = b.Put(newIndexKey, d) + ib = tx.Bucket(bucketDelayedMsgIndex) + d := getDelayedMsgDBIndexValue(m.DelayedTs, m.ID) + err = ib.Put(newIndexKey, d) if err != nil { return err } diff --git a/nsqd/delay_queue_test.go b/nsqd/delay_queue_test.go index 6f73fa6b6..99304412c 100644 --- a/nsqd/delay_queue_test.go +++ b/nsqd/delay_queue_test.go @@ -75,6 +75,105 @@ func TestDelayQueuePutChannelDelayed(t *testing.T) { test.NotNil(t, err) } +func TestDelayQueueChannelDelayedChanged(t *testing.T) { + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-delay-change-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + + opts := NewOptions() + opts.Logger = newTestLogger(t) + opts.SyncEvery = 1 + + dq, err := NewDelayQueue("test", 0, tmpDir, opts, nil, false) + test.Nil(t, err) + defer dq.Close() + cnt := 10 + originDelay := time.Second * 2 + for i := 0; i < cnt; i++ { + msg := NewMessage(0, []byte("old body")) + msg.DelayedType = ChannelDelayed + msg.DelayedTs = time.Now().Add(originDelay).UnixNano() + msg.DelayedChannel = "test" + msg.DelayedOrigID = MessageID(i + 1) + _, _, _, _, err := dq.PutDelayMessage(msg) + test.Nil(t, err) + test.Equal(t, true, dq.IsChannelMessageDelayed(msg.DelayedOrigID, "test")) + } + + // change delayed ts to early or more later + newCnt, _ := dq.GetCurrentDelayedCnt(ChannelDelayed, "test") + test.Equal(t, cnt, int(newCnt)) + _, err = os.Stat(dq.dataPath) + test.Nil(t, err) + delayedTsLater := time.Now().Add(originDelay * 10).UnixNano() + for i := 0; i < cnt; i++ { + msg := NewMessage(0, []byte("old body")) + msg.DelayedType = ChannelDelayed + msg.DelayedTs = delayedTsLater + msg.DelayedChannel = "test" + msg.DelayedOrigID = MessageID(i + 1) + _, _, _, _, err := dq.PutDelayMessage(msg) + test.Nil(t, err) + test.Equal(t, true, dq.IsChannelMessageDelayed(msg.DelayedOrigID, "test")) + } + // check delayed ts for new, and make sure the old one is deleted + for i := 0; i < cnt; i++ { + ts, err := dq.getChannelDelayedMsgDelayedTs(MessageID(i+1), "test") + test.Nil(t, err) + test.Equal(t, true, ts > time.Now().Add(originDelay*2).UnixNano()) + test.Equal(t, true, ts <= time.Now().Add(originDelay*10).UnixNano()) + } + + newCnt, _ = dq.GetCurrentDelayedCnt(ChannelDelayed, "test") + test.Equal(t, cnt, int(newCnt)) + + rets := make([]Message, cnt*2) + n, err := dq.PeekAll(rets) + test.Nil(t, err) + test.Equal(t, cnt, n) + for i := 0; i < n; i++ { + test.Equal(t, uint64(rets[i].DelayedOrigID), uint64(i+1)) + test.Equal(t, rets[i].DelayedTs, delayedTsLater) + } + + delayedTsEarly := time.Now().Add(originDelay * 4).UnixNano() + for i := 0; i < cnt; i++ { + msg := NewMessage(0, []byte("old body")) + msg.DelayedType = ChannelDelayed + msg.DelayedTs = delayedTsEarly + msg.DelayedChannel = "test" + msg.DelayedOrigID = MessageID(i + 1) + _, _, _, _, err := dq.PutDelayMessage(msg) + test.Nil(t, err) + test.Equal(t, true, dq.IsChannelMessageDelayed(msg.DelayedOrigID, "test")) + } + // check delayed ts for new, and make sure the old one is deleted + for i := 0; i < cnt; i++ { + ts, err := dq.getChannelDelayedMsgDelayedTs(MessageID(i+1), "test") + test.Nil(t, err) + test.Equal(t, true, ts > time.Now().Add(originDelay*2).UnixNano()) + test.Equal(t, true, ts <= time.Now().Add(originDelay*4).UnixNano()) + } + rets = make([]Message, cnt*2) + n, err = dq.PeekAll(rets) + test.Nil(t, err) + test.Equal(t, cnt, n) + for i := 0; i < n; i++ { + test.Equal(t, uint64(rets[i].DelayedOrigID), uint64(i+1)) + test.Equal(t, rets[i].DelayedTs, delayedTsEarly) + } + + newCnt, _ = dq.GetCurrentDelayedCnt(ChannelDelayed, "test") + test.Equal(t, cnt, int(newCnt)) + dq.Delete() + _, err = os.Stat(dq.dataPath) + test.Nil(t, err) + _, err = os.Stat(path.Join(dq.dataPath, getDelayQueueDBName(dq.tname, dq.partition))) + test.NotNil(t, err) +} + func TestDelayQueuePutChannelDelayedDuplicate(t *testing.T) { // the slave may insert same delayed messages if retry by network tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-delay-%d", time.Now().UnixNano())) @@ -107,7 +206,7 @@ func TestDelayQueuePutChannelDelayedDuplicate(t *testing.T) { } newCnt, _ := dq.GetCurrentDelayedCnt(ChannelDelayed, "test") - test.Equal(t, cnt*2, int(newCnt)) + test.Equal(t, cnt, int(newCnt)) _, err = os.Stat(dq.dataPath) test.Nil(t, err) for i := 0; i < cnt; i++ { @@ -128,23 +227,12 @@ func TestDelayQueuePutChannelDelayedDuplicate(t *testing.T) { test.Nil(t, err) test.Nil(t, m) - m, err = dq.FindChannelMessageDelayed(msgID, "test", true) - test.Nil(t, err) - test.NotNil(t, m) - test.Equal(t, m.DelayedOrigID, msgID) - test.Equal(t, []byte("body"), m.Body) - test.NotEqual(t, delayedID, m.ID) - // delete again, since confirm need the origin id in queue, we need exchange the id in message - delayedID = m.ID - m.ID = m.DelayedOrigID - m.DelayedOrigID = delayedID - err = dq.ConfirmedMessage(m) - test.Nil(t, err) - m, err = dq.FindChannelMessageDelayed(msgID, "test", true) test.Nil(t, err) test.Nil(t, m) } + newCnt, _ = dq.GetCurrentDelayedCnt(ChannelDelayed, "test") + test.Equal(t, 0, int(newCnt)) dq.Delete() _, err = os.Stat(dq.dataPath) test.Nil(t, err) @@ -1139,7 +1227,7 @@ func TestDelayedQueueDeleteIndexCheckValue(t *testing.T) { test.Equal(t, true, dq.IsChannelMessageDelayed(origID, "test")) m.DelayedOrigID = m.ID dq.ConfirmedMessage(&m) - test.Equal(t, true, dq.IsChannelMessageDelayed(origID, "test")) + test.Equal(t, false, dq.IsChannelMessageDelayed(origID, "test")) } }