Skip to content

Commit

Permalink
Merge pull request nsqio#184 from youzan/revert-fix
Browse files Browse the repository at this point in the history
Revert fix
  • Loading branch information
CrazyHZM authored Aug 31, 2022
2 parents 249770e + f3d0e05 commit c9d8c6b
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 22 deletions.
42 changes: 35 additions & 7 deletions nsqd/delay_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,27 +815,55 @@ func (q *DelayQueue) put(m *Message, rawData []byte, trace bool, checkSize int64
wstart := time.Now()
q.compactMutex.Lock()
err = q.getStore().Update(func(tx *bolt.Tx) error {
newIndexKey := getDelayedMsgDBIndexKey(int(m.DelayedType), m.DelayedChannel, m.DelayedOrigID)
ib := tx.Bucket(bucketDelayedMsgIndex)
iv := ib.Get(newIndexKey)
b := tx.Bucket(bucketDelayedMsg)
oldV := b.Get(msgKey)
var oldV []byte
var oldMsgKey []byte
if iv != nil {
ts, oldID, err := decodeDelayedMsgDBIndexValue(iv)
if err != nil {
return err
}
oldMsgKey = getDelayedMsgDBKey(int(m.DelayedType), m.DelayedChannel, ts, oldID)
oldV = b.Get(oldMsgKey)
// it may have some old data which only have index but no msg value
if len(oldV) > 0 {
oldMsg, err := DecodeDelayedMessage(oldV, q.IsExt())
if err != nil {
nsqLog.Warningf("found old delayed index key %v (%v, %v) msg value data wrong: %v, %v", newIndexKey, iv, oldMsgKey, oldV, m)
// we can just delete this safely
} else if oldMsg.DelayedOrigID != m.DelayedOrigID || oldMsg.DelayedChannel != m.DelayedChannel || oldMsg.DelayedType != m.DelayedType {
// the value from old index is not the same message we are inserting, this may happend when old code use the wrong id in index
nsqLog.Infof("found old delayed index key %v (%v, %v) msg value not matched : %v, %v", newIndexKey, iv, oldMsgKey, oldMsg, m)
oldV = nil
}
}
}
exists := oldV != nil
if exists && bytes.Equal(oldV, q.putBuffer.Bytes()) {
} else {
err := b.Put(msgKey, q.putBuffer.Bytes())
if err != nil {
return err
}
if oldV != nil {
err = deleteMsgIndex(oldV, tx, q.IsExt())
if oldV != nil && (!bytes.Equal(oldMsgKey, msgKey)) {
err := b.Delete(oldMsgKey)
if err != nil {
nsqLog.Infof("failed to delete old delayed message: %v", msgKey)
return err
}
err = ib.Delete(newIndexKey)
if err != nil {
nsqLog.Infof("failed to delete old delayed index : %v, %v", oldV, err)
return err
}
}
// note here we only support one index for the same message(if dup inserted, only one has the index)
b = tx.Bucket(bucketDelayedMsgIndex)
newIndexKey := getDelayedMsgDBIndexKey(int(m.DelayedType), m.DelayedChannel, m.DelayedOrigID)
d := getDelayedMsgDBIndexValue(m.DelayedTs, m.DelayedOrigID)
err = b.Put(newIndexKey, d)
ib = tx.Bucket(bucketDelayedMsgIndex)
d := getDelayedMsgDBIndexValue(m.DelayedTs, m.ID)
err = ib.Put(newIndexKey, d)
if err != nil {
return err
}
Expand Down
118 changes: 103 additions & 15 deletions nsqd/delay_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,105 @@ func TestDelayQueuePutChannelDelayed(t *testing.T) {
test.NotNil(t, err)
}

func TestDelayQueueChannelDelayedChanged(t *testing.T) {
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-delay-change-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)

opts := NewOptions()
opts.Logger = newTestLogger(t)
opts.SyncEvery = 1

dq, err := NewDelayQueue("test", 0, tmpDir, opts, nil, false)
test.Nil(t, err)
defer dq.Close()
cnt := 10
originDelay := time.Second * 2
for i := 0; i < cnt; i++ {
msg := NewMessage(0, []byte("old body"))
msg.DelayedType = ChannelDelayed
msg.DelayedTs = time.Now().Add(originDelay).UnixNano()
msg.DelayedChannel = "test"
msg.DelayedOrigID = MessageID(i + 1)
_, _, _, _, err := dq.PutDelayMessage(msg)
test.Nil(t, err)
test.Equal(t, true, dq.IsChannelMessageDelayed(msg.DelayedOrigID, "test"))
}

// change delayed ts to early or more later
newCnt, _ := dq.GetCurrentDelayedCnt(ChannelDelayed, "test")
test.Equal(t, cnt, int(newCnt))
_, err = os.Stat(dq.dataPath)
test.Nil(t, err)
delayedTsLater := time.Now().Add(originDelay * 10).UnixNano()
for i := 0; i < cnt; i++ {
msg := NewMessage(0, []byte("old body"))
msg.DelayedType = ChannelDelayed
msg.DelayedTs = delayedTsLater
msg.DelayedChannel = "test"
msg.DelayedOrigID = MessageID(i + 1)
_, _, _, _, err := dq.PutDelayMessage(msg)
test.Nil(t, err)
test.Equal(t, true, dq.IsChannelMessageDelayed(msg.DelayedOrigID, "test"))
}
// check delayed ts for new, and make sure the old one is deleted
for i := 0; i < cnt; i++ {
ts, err := dq.getChannelDelayedMsgDelayedTs(MessageID(i+1), "test")
test.Nil(t, err)
test.Equal(t, true, ts > time.Now().Add(originDelay*2).UnixNano())
test.Equal(t, true, ts <= time.Now().Add(originDelay*10).UnixNano())
}

newCnt, _ = dq.GetCurrentDelayedCnt(ChannelDelayed, "test")
test.Equal(t, cnt, int(newCnt))

rets := make([]Message, cnt*2)
n, err := dq.PeekAll(rets)
test.Nil(t, err)
test.Equal(t, cnt, n)
for i := 0; i < n; i++ {
test.Equal(t, uint64(rets[i].DelayedOrigID), uint64(i+1))
test.Equal(t, rets[i].DelayedTs, delayedTsLater)
}

delayedTsEarly := time.Now().Add(originDelay * 4).UnixNano()
for i := 0; i < cnt; i++ {
msg := NewMessage(0, []byte("old body"))
msg.DelayedType = ChannelDelayed
msg.DelayedTs = delayedTsEarly
msg.DelayedChannel = "test"
msg.DelayedOrigID = MessageID(i + 1)
_, _, _, _, err := dq.PutDelayMessage(msg)
test.Nil(t, err)
test.Equal(t, true, dq.IsChannelMessageDelayed(msg.DelayedOrigID, "test"))
}
// check delayed ts for new, and make sure the old one is deleted
for i := 0; i < cnt; i++ {
ts, err := dq.getChannelDelayedMsgDelayedTs(MessageID(i+1), "test")
test.Nil(t, err)
test.Equal(t, true, ts > time.Now().Add(originDelay*2).UnixNano())
test.Equal(t, true, ts <= time.Now().Add(originDelay*4).UnixNano())
}
rets = make([]Message, cnt*2)
n, err = dq.PeekAll(rets)
test.Nil(t, err)
test.Equal(t, cnt, n)
for i := 0; i < n; i++ {
test.Equal(t, uint64(rets[i].DelayedOrigID), uint64(i+1))
test.Equal(t, rets[i].DelayedTs, delayedTsEarly)
}

newCnt, _ = dq.GetCurrentDelayedCnt(ChannelDelayed, "test")
test.Equal(t, cnt, int(newCnt))
dq.Delete()
_, err = os.Stat(dq.dataPath)
test.Nil(t, err)
_, err = os.Stat(path.Join(dq.dataPath, getDelayQueueDBName(dq.tname, dq.partition)))
test.NotNil(t, err)
}

func TestDelayQueuePutChannelDelayedDuplicate(t *testing.T) {
// the slave may insert same delayed messages if retry by network
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-delay-%d", time.Now().UnixNano()))
Expand Down Expand Up @@ -107,7 +206,7 @@ func TestDelayQueuePutChannelDelayedDuplicate(t *testing.T) {
}

newCnt, _ := dq.GetCurrentDelayedCnt(ChannelDelayed, "test")
test.Equal(t, cnt*2, int(newCnt))
test.Equal(t, cnt, int(newCnt))
_, err = os.Stat(dq.dataPath)
test.Nil(t, err)
for i := 0; i < cnt; i++ {
Expand All @@ -128,23 +227,12 @@ func TestDelayQueuePutChannelDelayedDuplicate(t *testing.T) {
test.Nil(t, err)
test.Nil(t, m)

m, err = dq.FindChannelMessageDelayed(msgID, "test", true)
test.Nil(t, err)
test.NotNil(t, m)
test.Equal(t, m.DelayedOrigID, msgID)
test.Equal(t, []byte("body"), m.Body)
test.NotEqual(t, delayedID, m.ID)
// delete again, since confirm need the origin id in queue, we need exchange the id in message
delayedID = m.ID
m.ID = m.DelayedOrigID
m.DelayedOrigID = delayedID
err = dq.ConfirmedMessage(m)
test.Nil(t, err)

m, err = dq.FindChannelMessageDelayed(msgID, "test", true)
test.Nil(t, err)
test.Nil(t, m)
}
newCnt, _ = dq.GetCurrentDelayedCnt(ChannelDelayed, "test")
test.Equal(t, 0, int(newCnt))
dq.Delete()
_, err = os.Stat(dq.dataPath)
test.Nil(t, err)
Expand Down Expand Up @@ -1139,7 +1227,7 @@ func TestDelayedQueueDeleteIndexCheckValue(t *testing.T) {
test.Equal(t, true, dq.IsChannelMessageDelayed(origID, "test"))
m.DelayedOrigID = m.ID
dq.ConfirmedMessage(&m)
test.Equal(t, true, dq.IsChannelMessageDelayed(origID, "test"))
test.Equal(t, false, dq.IsChannelMessageDelayed(origID, "test"))
}
}

Expand Down

0 comments on commit c9d8c6b

Please sign in to comment.