Skip to content

Commit

Permalink
Merge pull request nsqio#101 from youzan/add-more-grpc
Browse files Browse the repository at this point in the history
adjust balance
  • Loading branch information
absolute8511 authored Sep 4, 2019
2 parents 2ecb5ca + f7051c6 commit ea4a190
Show file tree
Hide file tree
Showing 12 changed files with 116 additions and 44 deletions.
10 changes: 7 additions & 3 deletions apps/nsq_data_tool/tool.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func main() {
log.Fatalln(err)
}
} else if *searchMode == "virtual_offset" {
searchLogIndexStart, searchOffset, _, err = tpLogMgr.SearchLogDataByMsgOffset(*viewStartID)
searchLogIndexStart, searchOffset, _, err = tpLogMgr.SearchLogDataByMsgOffset(*viewOffset)
if err != nil {
log.Fatalln(err)
}
Expand Down Expand Up @@ -166,7 +166,11 @@ func main() {
}
backendReader := nsqd.NewDiskQueueSnapshot(backendName, topicDataPath, backendWriter.GetQueueReadEnd())
backendReader.SetQueueStart(backendWriter.GetQueueReadStart())
backendReader.SeekTo(nsqd.BackendOffset(queueOffset))
if queueOffset == 0 {
backendReader.SeekTo(nsqd.BackendOffset(queueOffset), 0)
} else {
backendReader.SeekTo(nsqd.BackendOffset(queueOffset), logData.MsgCnt-1)
}
cnt := *viewCnt
for cnt > 0 {
cnt--
Expand All @@ -176,7 +180,7 @@ func main() {
return
}

fmt.Printf("%v:%v:%v, string: %v\n", ret.Offset, ret.MovedSize, ret.Data, string(ret.Data))
fmt.Printf("%v:%v:%v:%v, string: %v\n", ret.Offset, ret.MovedSize, ret.CurCnt, ret.Data, string(ret.Data))
msg, err := nsqd.DecodeMessage(ret.Data, *isExt)
if err != nil {
log.Fatalf("decode data error: %v", err)
Expand Down
16 changes: 8 additions & 8 deletions consistence/data_placement_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ const (
HIGHEST_PUB_QPS_LEVEL = 100
HIGHEST_LEFT_CONSUME_MB_SIZE = 50 * 1024
HIGHEST_LEFT_DATA_MB_SIZE = 200 * 1024
busyTopicLevel = 13
busyTopicLevel = 15
)

type balanceOpLevel int
Expand Down Expand Up @@ -736,7 +736,7 @@ func (dpm *DataPlacement) DoBalance(monitorChan chan struct{}) {
dpm.balanceTopicLeaderBetweenNodes(monitorChan, moveLeader, moveAny, minLeaderLoad,
maxLeaderLoad, topicStatsMinMax, nodeTopicStats)
} else if avgLeaderLoad >= 20 &&
((minLeaderLoad*2 < maxLeaderLoad) || (maxLeaderLoad > avgLeaderLoad*1.5)) {
((minLeaderLoad*1.5 < maxLeaderLoad) || (maxLeaderLoad > avgLeaderLoad*1.3)) {
dpm.balanceTopicLeaderBetweenNodes(monitorChan, moveLeader,
moveTryIdle, minLeaderLoad, maxLeaderLoad,
topicStatsMinMax, nodeTopicStats)
Expand All @@ -753,7 +753,7 @@ func (dpm *DataPlacement) DoBalance(monitorChan chan struct{}) {
dpm.balanceTopicLeaderBetweenNodes(monitorChan, moveLeader, moveAny, minNodeLoad,
maxNodeLoad, topicStatsMinMax, nodeTopicStatsSortedSlave)
} else if avgNodeLoad >= 20 &&
(minNodeLoad*2 < maxNodeLoad || maxNodeLoad > avgNodeLoad*1.5) {
(minNodeLoad*1.5 < maxNodeLoad || maxNodeLoad > avgNodeLoad*1.3) {
topicStatsMinMax[0] = &nodeTopicStatsSortedSlave[0]
topicStatsMinMax[1] = &nodeTopicStatsSortedSlave[len(nodeTopicStatsSortedSlave)-1]
moveLeader = len(topicStatsMinMax[1].TopicLeaderDataSize) > len(topicStatsMinMax[1].TopicTotalDataSize)/2
Expand Down Expand Up @@ -785,7 +785,7 @@ func (dpm *DataPlacement) DoBalance(monitorChan chan struct{}) {
// maybe too much topic followers on this node
if leastLeaderStats.NodeID == topicStatsMinMax[1].NodeID && followerNum > avgTopicNum {
moveLeader = false
} else if followerNum > int(float64(avgTopicNum)*1.5) {
} else if followerNum > int(float64(avgTopicNum)*1.3) {
// too much followers
coordLog.Infof("move follower topic since less leader and much follower on node: %v, %v, avg %v",
leastLeaderStats.NodeID, followerNum, avgTopicNum)
Expand Down Expand Up @@ -928,9 +928,6 @@ func (dpm *DataPlacement) balanceTopicLeaderBetweenNodes(monitorChan chan struct
sortedNodeTopicStats, moveOp, moveLeader)
}
}
if !checkMoveOK {
coordLog.Infof("check topic for moving can not move any on node: %v, all sorted topic: %v", statsMinMax[1].TopicHourlyPubDataList, sortedTopics)
}
}

func (dpm *DataPlacement) checkAndPrepareMove(monitorChan chan struct{}, fromNode string, topicName string, partitionID int,
Expand All @@ -943,7 +940,7 @@ func (dpm *DataPlacement) checkAndPrepareMove(monitorChan chan struct{}, fromNod
}
checkMoveOK := false
if topicInfo.OrderedMulti {
coordLog.Infof("topic %v is configured as multi ordered, no balance", topicName)
coordLog.Debugf("topic %v is configured as multi ordered, no balance", topicName)
return false
}
if moveOp > moveAny {
Expand Down Expand Up @@ -977,6 +974,9 @@ func (dpm *DataPlacement) checkAndPrepareMove(monitorChan chan struct{}, fromNod
if checkMoveOK {
break
}
if nid == fromNode {
continue
}
for index, stat := range sortedNodeTopicStats {
if index >= len(sortedNodeTopicStats)/3 {
break
Expand Down
42 changes: 35 additions & 7 deletions consistence/nsqd_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,11 @@ func checkAndFixLocalLogQueueData(tc *coordData,

snap := localLogQ.GetDiskQueueSnapshot()
for {
err = snap.SeekTo(nsqd.BackendOffset(log.MsgOffset))
seekCnt := int64(0)
if log.MsgCnt > 0 {
seekCnt = log.MsgCnt - 1
}
err = snap.SeekTo(nsqd.BackendOffset(log.MsgOffset), seekCnt)
if err == nil {
break
}
Expand Down Expand Up @@ -1176,7 +1180,12 @@ func (ncoord *NsqdCoordinator) SearchLogByMsgOffset(topic string, part int, offs
return l, 0, 0, localErr
}
snap := t.GetDiskQueueSnapshot()
localErr = snap.SeekTo(nsqd.BackendOffset(realOffset))

seekCnt := int64(0)
if l.MsgCnt > 0 {
seekCnt = l.MsgCnt - 1
}
localErr = snap.SeekTo(nsqd.BackendOffset(realOffset), seekCnt)
if localErr != nil {
coordLog.Infof("seek to disk queue error: %v, %v", localErr, realOffset)
return l, 0, 0, localErr
Expand Down Expand Up @@ -1220,7 +1229,12 @@ func (ncoord *NsqdCoordinator) SearchLogByMsgCnt(topic string, part int, count i
return l, 0, 0, localErr
}
snap := t.GetDiskQueueSnapshot()
localErr = snap.SeekTo(nsqd.BackendOffset(realOffset))

seekCnt := int64(0)
if l.MsgCnt > 0 {
seekCnt = l.MsgCnt - 1
}
localErr = snap.SeekTo(nsqd.BackendOffset(realOffset), seekCnt)
if localErr != nil {
coordLog.Infof("seek to disk queue error: %v, %v", localErr, realOffset)
return l, 0, 0, localErr
Expand Down Expand Up @@ -1258,7 +1272,12 @@ func (ncoord *MsgTimestampComparator) SearchEndBoundary() int64 {
}

func (ncoord *MsgTimestampComparator) LessThanLeftBoundary(l *CommitLogData) bool {
err := ncoord.localTopicReader.ResetSeekTo(nsqd.BackendOffset(l.MsgOffset))

seekCnt := int64(0)
if l.MsgCnt > 0 {
seekCnt = l.MsgCnt - 1
}
err := ncoord.localTopicReader.ResetSeekTo(nsqd.BackendOffset(l.MsgOffset), seekCnt)
if err != nil {
coordLog.Errorf("seek disk queue failed: %v, %v", l, err)
return true
Expand All @@ -1280,8 +1299,12 @@ func (ncoord *MsgTimestampComparator) LessThanLeftBoundary(l *CommitLogData) boo
}

func (ncoord *MsgTimestampComparator) GreatThanRightBoundary(l *CommitLogData) bool {
seekCnt := int64(0)
if l.MsgCnt > 0 {
seekCnt = l.MsgCnt - 1
}
// we may read the eof , in this situation we reach the end, so the search should not be great than right boundary
err := ncoord.localTopicReader.ResetSeekTo(nsqd.BackendOffset(l.MsgOffset + int64(l.MsgSize)))
err := ncoord.localTopicReader.ResetSeekTo(nsqd.BackendOffset(l.MsgOffset+int64(l.MsgSize)), seekCnt)
if err != nil {
coordLog.Errorf("seek disk queue failed: %v, %v", l, err)
return false
Expand Down Expand Up @@ -1332,8 +1355,13 @@ func (ncoord *NsqdCoordinator) SearchLogByMsgTimestamp(topic string, part int, t
return nil, 0, 0, localErr
}
realOffset := l.MsgOffset

seekCnt := int64(0)
if l.MsgCnt > 0 {
seekCnt = l.MsgCnt - 1
}
// check if the message timestamp is fit the require
localErr = snap.ResetSeekTo(nsqd.BackendOffset(realOffset))
localErr = snap.ResetSeekTo(nsqd.BackendOffset(realOffset), seekCnt)
if localErr != nil {
coordLog.Infof("seek to disk queue error: %v, %v", localErr, realOffset)
return l, 0, 0, localErr
Expand Down Expand Up @@ -2662,7 +2690,7 @@ func (ncoord *NsqdCoordinator) readTopicRawData(topic string, partition int, off
}
for i, offset := range offsetList {
size := sizeList[i]
err = snap.SeekTo(nsqd.BackendOffset(offset))
err = snap.SeekTo(nsqd.BackendOffset(offset), 0)
if err != nil {
coordLog.Infof("read topic %v data at offset %v, size: %v, error: %v", t.GetFullName(), offset, size, err)
break
Expand Down
2 changes: 1 addition & 1 deletion consistence/nsqd_coordinator_cluster_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func (ncoord *NsqdCoordinator) doSyncOpToCluster(isWrite bool, coord *TopicCoord
needLeaveISR = true

retrysync:
if !halfSuccess && coord.IsExiting() {
if isWrite && !halfSuccess && coord.IsExiting() {
needLeaveISR = true
clusterWriteErr = ErrTopicExiting
goto exitsync
Expand Down
20 changes: 14 additions & 6 deletions consistence/nsqlookup_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,7 @@ func (nlcoord *NsqLookupCoordinator) doCheckTopics(monitorChan chan struct{}, fa
}
if currentNodesEpoch != atomic.LoadInt64(&nlcoord.nodesEpoch) {
coordLog.Infof("nodes changed while checking topics: %v, %v", currentNodesEpoch, atomic.LoadInt64(&nlcoord.nodesEpoch))
atomic.StoreInt32(&nlcoord.isClusterUnstable, 1)
return
}
// should copy to avoid reused
Expand Down Expand Up @@ -814,6 +815,9 @@ func (nlcoord *NsqLookupCoordinator) doCheckTopics(monitorChan chan struct{}, fa
// notify the nsqd node to acquire the leader session.
nlcoord.notifyISRTopicMetaInfo(&topicInfo)
nlcoord.notifyAcquireTopicLeader(&topicInfo)
if err == ErrLeaderSessionNotExist {
break
}
time.Sleep(time.Millisecond * 100)
continue
} else {
Expand Down Expand Up @@ -841,14 +845,17 @@ func (nlcoord *NsqLookupCoordinator) doCheckTopics(monitorChan chan struct{}, fa
tmpTopicInfo := t
tmpTopicInfo.Leader = leaderSession.LeaderNode.ID
nlcoord.notifyReleaseTopicLeader(&tmpTopicInfo, leaderSession.LeaderEpoch, leaderSession.Session)
err := nlcoord.waitOldLeaderRelease(&tmpTopicInfo)
if err != nil {
coordLog.Warningf("topic %v leader session release failed: %v, force release", t.GetTopicDesp(), err.Error())
err = nlcoord.leadership.ReleaseTopicLeader(topicInfo.Name, topicInfo.Partition, leaderSession)
tmpSession := *leaderSession
go func() {
err := nlcoord.waitOldLeaderRelease(&tmpTopicInfo)
if err != nil {
coordLog.Errorf("release session failed [%s] : %v", topicInfo.GetTopicDesp(), err)
coordLog.Warningf("topic %v leader session release failed: %v, force release", tmpTopicInfo.GetTopicDesp(), err.Error())
err = nlcoord.leadership.ReleaseTopicLeader(tmpTopicInfo.Name, tmpTopicInfo.Partition, &tmpSession)
if err != nil {
coordLog.Errorf("release session failed [%s] : %v", tmpTopicInfo.GetTopicDesp(), err)
}
}
}
}()
nlcoord.notifyISRTopicMetaInfo(&topicInfo)
nlcoord.notifyAcquireTopicLeader(&topicInfo)
continue
Expand Down Expand Up @@ -916,6 +923,7 @@ func (nlcoord *NsqLookupCoordinator) doCheckTopics(monitorChan chan struct{}, fa

if currentNodesEpoch != atomic.LoadInt64(&nlcoord.nodesEpoch) {
coordLog.Infof("nodes changed while checking topics: %v, %v", currentNodesEpoch, atomic.LoadInt64(&nlcoord.nodesEpoch))
atomic.StoreInt32(&nlcoord.isClusterUnstable, 1)
return
}
// check if write disabled
Expand Down
16 changes: 13 additions & 3 deletions consistence/nsqlookup_coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1931,9 +1931,19 @@ func checkOrderedMultiTopic(t *testing.T, topic string, expectedPart int, aliveN
nodePartitionNum := make(map[string][]int)
nodeLeaderPartNum := make(map[string][]int)
for i := 0; i < pn; i++ {
t0, err := leadership.GetTopicInfo(topic, i)
test.Nil(t, err)
t.Logf("topic %v isr: %v", t0.GetTopicDesp(), t0.ISR)
var t0 *TopicPartitionMetaInfo
retry := 3
for retry > 0 {
retry--
t0, err = leadership.GetTopicInfo(topic, i)
test.Nil(t, err)
t.Logf("topic %v isr: %v", t0.GetTopicDesp(), t0.ISR)
if len(t0.ISR) > pmeta.Replica {
time.Sleep(time.Second * 3)
continue
}
break
}
test.Equal(t, pmeta.Replica, len(t0.ISR))

if nodeInfoList[t0.Leader] == nil {
Expand Down
19 changes: 15 additions & 4 deletions nsqd/delay_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -1119,9 +1119,16 @@ func (q *DelayQueue) ConfirmedMessage(msg *Message) error {
atomic.StoreInt64(&q.changedTs, time.Now().UnixNano())
q.compactMutex.Unlock()
if err != nil {
nsqLog.LogErrorf(
"%s : failed to delete delayed message %v-%v, %v",
q.GetFullName(), msg.DelayedOrigID, msg, err)
if err != errBucketKeyNotFound {
nsqLog.LogErrorf(
"%s : failed to delete delayed message %v-%v, %v",
q.GetFullName(), msg.DelayedOrigID, msg, err)
} else {
nsqLog.Logf(
"%s : failed to delete delayed message %v-%v, %v",
q.GetFullName(), msg.DelayedOrigID, msg, err)
return nil
}
}
return err
}
Expand Down Expand Up @@ -1258,7 +1265,11 @@ func (q *DelayQueue) TryCleanOldData(retentionSize int64, noRealClean bool, maxC
}
snapReader := NewDiskQueueSnapshot(getDelayQueueBackendName(q.tname, q.partition), q.dataPath, oldestPos)
snapReader.SetQueueStart(cleanStart)
err := snapReader.SeekTo(cleanStart.Offset())
seekCnt := int64(0)
if cleanStart.TotalMsgCnt() > 0 {
seekCnt = cleanStart.TotalMsgCnt() - 1
}
err := snapReader.SeekTo(cleanStart.Offset(), seekCnt)
if err != nil {
nsqLog.Errorf("topic: %v failed to seek to %v: %v", q.GetFullName(), cleanStart, err)
return nil, err
Expand Down
13 changes: 8 additions & 5 deletions nsqd/disk_queue_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"os"
"sync"
"sync/atomic"

"github.com/youzan/nsq/internal/levellogger"
)
Expand Down Expand Up @@ -156,15 +157,15 @@ func (d *DiskQueueSnapshot) SkipToNext() error {
}

// this can allow backward seek
func (d *DiskQueueSnapshot) ResetSeekTo(voffset BackendOffset) error {
return d.seekTo(voffset, true)
func (d *DiskQueueSnapshot) ResetSeekTo(voffset BackendOffset, cnt int64) error {
return d.seekTo(voffset, cnt, true)
}

func (d *DiskQueueSnapshot) SeekTo(voffset BackendOffset) error {
return d.seekTo(voffset, false)
func (d *DiskQueueSnapshot) SeekTo(voffset BackendOffset, cnt int64) error {
return d.seekTo(voffset, cnt, false)
}

func (d *DiskQueueSnapshot) seekTo(voffset BackendOffset, allowBackward bool) error {
func (d *DiskQueueSnapshot) seekTo(voffset BackendOffset, cnt int64, allowBackward bool) error {
d.Lock()
defer d.Unlock()
if d.readFile != nil {
Expand Down Expand Up @@ -195,6 +196,7 @@ func (d *DiskQueueSnapshot) seekTo(voffset BackendOffset, allowBackward bool) er
d.readPos, newPos, voffset)
d.readPos.EndOffset = newPos
d.readPos.virtualEnd = voffset
d.readPos.totalMsgCnt = cnt
return nil
}

Expand Down Expand Up @@ -344,6 +346,7 @@ CheckFileOpen:

oldPos := d.readPos
d.readPos.EndOffset.Pos = d.readPos.EndOffset.Pos + totalBytes
result.CurCnt = atomic.AddInt64(&d.readPos.totalMsgCnt, 1)
d.readPos.virtualEnd += BackendOffset(totalBytes)
nsqLog.LogDebugf("=== read move forward: %v to %v", oldPos,
d.readPos)
Expand Down
4 changes: 3 additions & 1 deletion nsqd/diskqueue_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,11 @@ func TestDiskQueueSnapshotReader(t *testing.T) {
result := dqReader.ReadOne()
test.Nil(t, result.Err)
test.Equal(t, midEnd2.Offset(), result.Offset)
err = dqReader.SeekTo(midEnd.Offset())
test.Equal(t, midEnd2.TotalMsgCnt()+1, result.CurCnt)
err = dqReader.SeekTo(midEnd.Offset(), midEnd.TotalMsgCnt()-1)
test.Nil(t, err)
test.Equal(t, midEnd.Offset(), dqReader.readPos.virtualEnd)
test.Equal(t, midEnd.TotalMsgCnt()-1, dqReader.readPos.totalMsgCnt)
result = dqReader.ReadOne()
test.Nil(t, result.Err)
test.Equal(t, midEnd.Offset(), result.Offset)
Expand Down
2 changes: 1 addition & 1 deletion nsqd/meta_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (dbs *dbMetaStorage) RetrieveReader(key string) (diskQueueEndInfo, diskQueu
return nil
})
if err != nil {
nsqLog.LogErrorf("failed to read meta key %v from db: %v , %v ", key, dbs.dataPath, err)
nsqLog.Warningf("failed to read meta key %v from db: %v , %v ", key, dbs.dataPath, err)
}
if fallback || err != nil {
nsqLog.Logf("fallback to read meta key %v from file meta", key)
Expand Down
7 changes: 6 additions & 1 deletion nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -1379,7 +1379,12 @@ func (t *Topic) TryCleanOldData(retentionSize int64, noRealClean bool, maxCleanO
}
snapReader := NewDiskQueueSnapshot(getBackendName(t.tname, t.partition), t.dataPath, oldestPos)
snapReader.SetQueueStart(cleanStart)
err := snapReader.SeekTo(cleanStart.Offset())

seekCnt := int64(0)
if cleanStart.TotalMsgCnt() > 0 {
seekCnt = cleanStart.TotalMsgCnt() - 1
}
err := snapReader.SeekTo(cleanStart.Offset(), seekCnt)
if err != nil {
nsqLog.Errorf("topic: %v failed to seek to %v: %v", t.GetFullName(), cleanStart, err)
return nil, err
Expand Down
Loading

0 comments on commit ea4a190

Please sign in to comment.