Skip to content

Commit

Permalink
check missing partitions for topic; handle remove node
Browse files Browse the repository at this point in the history
  • Loading branch information
absolute8511 committed Jul 18, 2018
1 parent 78c67e0 commit 60dff75
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 9 deletions.
1 change: 1 addition & 0 deletions consistence/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ type NSQLookupdLeadership interface {
WatchNsqdNodes(nsqds chan []NsqdNodeInfo, stop chan struct{})
// get all topics info, should cache the newest to improve performance.
ScanTopics() ([]TopicPartitionMetaInfo, error)
GetAllTopicMetas() (map[string]*TopicMetaInfo, error)
// should return both the meta info for topic and the replica info for topic partition
// epoch should be updated while return
GetTopicInfo(topic string, partition int) (*TopicPartitionMetaInfo, error)
Expand Down
9 changes: 7 additions & 2 deletions consistence/nsq_lookupd_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,13 @@ func (self *NsqLookupdEtcdMgr) getNsqdNodes() ([]NsqdNodeInfo, error) {
return nsqdNodes, nil
}

func (self *NsqLookupdEtcdMgr) GetAllTopicMetas() (map[string]*TopicMetaInfo, error) {
self.tmisMutex.Lock()
topicMetas := self.topicMetaMap
self.tmisMutex.Unlock()
return topicMetas, nil
}

func (self *NsqLookupdEtcdMgr) ScanTopics() ([]TopicPartitionMetaInfo, error) {
if atomic.LoadInt32(&self.ifTopicChanged) == 1 {
return self.scanTopics()
Expand Down Expand Up @@ -509,9 +516,7 @@ func (self *NsqLookupdEtcdMgr) GetTopicInfo(topic string, partition int) (*Topic
if !ok {
rsp, err := self.client.Get(self.createTopicMetaPath(topic), false, false)
if err != nil {

self.tmiMutex.Unlock()

if client.IsKeyNotFound(err) {
atomic.StoreInt32(&self.ifTopicChanged, 1)
return nil, ErrKeyNotFound
Expand Down
18 changes: 16 additions & 2 deletions consistence/nsqlookup_coord_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,16 @@ func (self *NsqLookupCoordinator) CreateTopic(topic string, meta TopicMetaInfo)
}
} else {
coordLog.Warningf("topic already exist :%v ", topic)
return ErrAlreadyExist
// check if meta is the same, if so we re-create again to make sure partitions are all ready
oldMeta, _, err := self.leadership.GetTopicMetaInfo(topic)
if err != nil {
coordLog.Infof("get topic meta key %v failed :%v", topic, err)
return err
}
meta.MagicCode = oldMeta.MagicCode
if oldMeta != meta {
return ErrAlreadyExist
}
}
coordLog.Infof("create topic: %v, with meta: %v", topic, meta)

Expand All @@ -606,7 +615,7 @@ func (self *NsqLookupCoordinator) checkAndUpdateTopicPartitions(currentNodes map
err := self.leadership.CreateTopicPartition(topic, i)
if err != nil {
coordLog.Warningf("failed to create topic %v-%v: %v", topic, i, err)
// handle already exist
// handle already exist, the partition dir may exist but missing real topic info
t, err := self.leadership.GetTopicInfo(topic, i)
if err != nil {
coordLog.Warningf("exist topic partition failed to get info: %v", err)
Expand All @@ -619,6 +628,11 @@ func (self *NsqLookupCoordinator) checkAndUpdateTopicPartitions(currentNodes map
}
}
}
if len(existPart) == meta.PartitionNum {
coordLog.Infof("topic: %v partitions %v are all ready", topic, existPart)
self.triggerCheckTopics("", 0, time.Millisecond*500)
return nil
}
leaders, isrList, err := self.dpm.allocTopicLeaderAndISR(meta.OrderedMulti, currentNodes, meta.Replica, meta.PartitionNum, existPart)
if err != nil {
coordLog.Infof("failed to alloc nodes for topic: %v", err)
Expand Down
35 changes: 30 additions & 5 deletions consistence/nsqlookup_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var (
const (
waitMigrateInterval = time.Minute * 10
waitEmergencyMigrateInterval = time.Second * 10
waitRemovingNodeInterval = time.Second * 30
)

type JoinISRState struct {
Expand Down Expand Up @@ -458,7 +459,7 @@ func (self *NsqLookupCoordinator) handleRemovingNodes(monitorChan chan struct{})
defer func() {
coordLog.Infof("stop handle the removing nsqd nodes.")
}()
ticker := time.NewTicker(time.Second * 30)
ticker := time.NewTicker(waitRemovingNodeInterval)
nodeTopicStats := make([]NodeTopicStats, 0, 10)
defer ticker.Stop()
for {
Expand Down Expand Up @@ -644,7 +645,30 @@ func (self *NsqLookupCoordinator) doCheckTopics(monitorChan chan struct{}, faile
}
return
}
coordLog.Debugf("scan found topics: %v", topics)
topicMetas, _ := self.leadership.GetAllTopicMetas()
coordLog.Debugf("scan found topics: %v, %v", topics, topicMetas)
// check partition number for topic, maybe failed to create
// some partition when creating topic.
topicParts := make(map[string]int, len(topicMetas))
for _, t := range topics {
parts, ok := topicParts[t.Name]
if !ok {
parts = 0
topicParts[t.Name] = 0
}
parts++
topicParts[t.Name] = parts
}

for name, meta := range topicMetas {
metaNum := meta.PartitionNum
pnum, _ := topicParts[name]
if pnum >= metaNum {
continue
}
coordLog.Warningf("topic %v partitions not enough : %v, %v", name, pnum, metaNum)
self.CreateTopic(name, *meta)
}
} else {
var err error
coordLog.Infof("check single topic : %v ", failedInfo)
Expand All @@ -657,8 +681,6 @@ func (self *NsqLookupCoordinator) doCheckTopics(monitorChan chan struct{}, faile
topics = append(topics, *t)
}

// TODO: check partition number for topic, maybe failed to create
// some partition when creating topic.
currentNodes, currentNodesEpoch := self.getCurrentNodesWithRemoving()
checkOK := true
for _, t := range topics {
Expand Down Expand Up @@ -863,7 +885,10 @@ func (self *NsqLookupCoordinator) doCheckTopics(monitorChan chan struct{}, faile
delete(lostLeaderSessions, t.GetTopicDesp())
}
}
if aliveCount > t.Replica && atomic.LoadInt32(&self.balanceWaiting) == 0 {
self.nodesMutex.RLock()
hasRemovingNode := len(self.removingNodes) > 0
self.nodesMutex.RUnlock()
if aliveCount > t.Replica && atomic.LoadInt32(&self.balanceWaiting) == 0 && !hasRemovingNode {
//remove the unwanted node in isr, it may happen that the nodes in isr is more than the configured replicator
coordLog.Infof("isr is more than replicator: %v, %v", aliveCount, t.Replica)
removeNode := self.dpm.decideUnwantedISRNode(&topicInfo, currentNodes)
Expand Down
112 changes: 112 additions & 0 deletions consistence/nsqlookup_coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,17 @@ func (self *FakeNsqlookupLeadership) WatchNsqdNodes(nsqds chan []NsqdNodeInfo, s
}
}

func (self *FakeNsqlookupLeadership) GetAllTopicMetas() (map[string]*TopicMetaInfo, error) {
metas := make(map[string]*TopicMetaInfo)
self.dataMutex.Lock()
for n, m := range self.fakeTopicMetaInfo {
tmp := m
metas[n] = &tmp
}
self.dataMutex.Unlock()
return metas, nil
}

func (self *FakeNsqlookupLeadership) ScanTopics() ([]TopicPartitionMetaInfo, error) {
alltopics := make([]TopicPartitionMetaInfo, 0)
self.dataMutex.Lock()
Expand Down Expand Up @@ -1094,6 +1105,107 @@ func TestNsqLookupNsqdCreateTopic(t *testing.T) {
SetCoordLogger(newTestLogger(t), levellogger.LOG_ERR)
}

func TestNsqLookupNsqdCreateTopicFailPartition(t *testing.T) {
// create topic meta success, but failed to create some partitions
if testing.Verbose() {
SetCoordLogger(levellogger.NewSimpleLog(), levellogger.LOG_INFO)
glog.SetFlags(0, "", "", true, true, 1)
glog.StartWorker(time.Second)
} else {
SetCoordLogger(newTestLogger(t), levellogger.LOG_WARN)
}
idList := []string{"id1", "id2", "id3", "id4"}
lookupCoord1, nodeInfoList := prepareCluster(t, idList, false)
for _, n := range nodeInfoList {
defer os.RemoveAll(n.dataPath)
defer n.localNsqd.Exit()
defer n.nsqdCoord.Stop()
}
test.Equal(t, 4, len(nodeInfoList))

topic_p3_r1 := "test-nsqlookup-topic-unit-testcreate-failpart-p3-r1"
topic_p2_r2 := "test-nsqlookup-topic-unit-testcreate-failpart-p2-r2"

lookupLeadership := lookupCoord1.leadership

time.Sleep(time.Second)
checkDeleteErr(t, lookupCoord1.DeleteTopic(topic_p3_r1, "**"))
checkDeleteErr(t, lookupCoord1.DeleteTopic(topic_p2_r2, "**"))
time.Sleep(time.Second * 3)
defer func() {
waitClusterStable(lookupCoord1, time.Second*3)
checkDeleteErr(t, lookupCoord1.DeleteTopic(topic_p3_r1, "**"))
checkDeleteErr(t, lookupCoord1.DeleteTopic(topic_p2_r2, "**"))
time.Sleep(time.Second * 3)

lookupCoord1.Stop()
}()

err := lookupLeadership.CreateTopic(topic_p3_r1, &TopicMetaInfo{3, 1, 0, 0, 0, 0, false, false})
test.Nil(t, err)
waitClusterStable(lookupCoord1, time.Second*2)
waitClusterStable(lookupCoord1, time.Second*5)
pmeta, _, err := lookupLeadership.GetTopicMetaInfo(topic_p3_r1)
pn := pmeta.PartitionNum
test.Nil(t, err)
test.Equal(t, pn, 3)
t0, err := lookupLeadership.GetTopicInfo(topic_p3_r1, 0)
test.Nil(t, err)
test.Equal(t, len(t0.ISR), 1)

t.Logf("t0 leader is: %v", t0.Leader)
if nodeInfoList[t0.Leader] == nil {
t.Fatalf("no leader: %v, %v", t0, nodeInfoList)
}

t0LeaderCoord := nodeInfoList[t0.Leader].nsqdCoord
test.NotNil(t, t0LeaderCoord)
tc0, coordErr := t0LeaderCoord.getTopicCoord(topic_p3_r1, 0)
test.Nil(t, coordErr)
test.Equal(t, tc0.topicInfo.Leader, t0.Leader)
test.Equal(t, len(tc0.topicInfo.ISR), 1)

t1, err := lookupLeadership.GetTopicInfo(topic_p3_r1, 1)
t1LeaderCoord := nodeInfoList[t1.Leader].nsqdCoord
test.NotNil(t, t1LeaderCoord)
tc1, coordErr := t1LeaderCoord.getTopicCoord(topic_p3_r1, 1)
test.Nil(t, coordErr)
test.Equal(t, tc1.topicInfo.Leader, t1.Leader)
test.Equal(t, len(tc1.topicInfo.ISR), 1)

err = lookupLeadership.CreateTopic(topic_p2_r2, &TopicMetaInfo{2, 2, 0, 0, 0, 0, false, false})
test.Nil(t, err)
waitClusterStable(lookupCoord1, time.Second*3)
waitClusterStable(lookupCoord1, time.Second*5)
pmeta, _, err = lookupLeadership.GetTopicMetaInfo(topic_p2_r2)
pn = pmeta.PartitionNum
test.Nil(t, err)
test.Equal(t, pn, 2)
t0, err = lookupLeadership.GetTopicInfo(topic_p2_r2, 0)
test.Nil(t, err)
test.Equal(t, len(t0.ISR), 2)

t.Logf("t0 leader is: %v", t0.Leader)
if nodeInfoList[t0.Leader] == nil {
t.Fatalf("no leader: %v, %v", t0, nodeInfoList)
}

t0LeaderCoord = nodeInfoList[t0.Leader].nsqdCoord
test.NotNil(t, t0LeaderCoord)
tc0, coordErr = t0LeaderCoord.getTopicCoord(topic_p2_r2, 0)
test.Nil(t, coordErr)
test.Equal(t, tc0.topicInfo.Leader, t0.Leader)
test.Equal(t, len(tc0.topicInfo.ISR), 2)

t1, err = lookupLeadership.GetTopicInfo(topic_p2_r2, 1)
t1LeaderCoord = nodeInfoList[t1.Leader].nsqdCoord
test.NotNil(t, t1LeaderCoord)
tc1, coordErr = t1LeaderCoord.getTopicCoord(topic_p2_r2, 1)
test.Nil(t, coordErr)
test.Equal(t, tc1.topicInfo.Leader, t1.Leader)
test.Equal(t, len(tc1.topicInfo.ISR), 2)
}

func TestNsqLookupUpdateTopicMeta(t *testing.T) {
if testing.Verbose() {
SetCoordLogger(levellogger.NewSimpleLog(), levellogger.LOG_INFO)
Expand Down

0 comments on commit 60dff75

Please sign in to comment.