Skip to content

Commit

Permalink
fix: format nodeTaskTopic
Browse files Browse the repository at this point in the history
  • Loading branch information
sunhongtao committed Dec 21, 2023
1 parent 3d5462b commit 3759f3b
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 33 deletions.
61 changes: 31 additions & 30 deletions collect/service/monitor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
)

const (
DateFormat = "20060102"
DateFormat = "20060102"
NodeTaskTopic = "task_%v"
)

type Service struct {
Expand Down Expand Up @@ -82,12 +83,12 @@ func (s *Service) CheckClusterHealth() {
go func() {
for {
<-time.After(1 * time.Minute)
for blockchain, t := range s.taskStore {
mp := make(map[string]int64, 2)
s.rebuildCluster(t, blockchain, "tx", mp)
s.rebuildCluster(t, blockchain, "block", mp)
s.rebuildCluster(t, blockchain, "receipt", mp)
_ = t.StoreClusterHealthStatus(blockchain, mp)
for chainCode, t := range s.taskStore {
mp := make(map[string]int64, 3)
s.rebuildCluster(t, chainCode, "tx", mp)
s.rebuildCluster(t, chainCode, "block", mp)
s.rebuildCluster(t, chainCode, "receipt", mp)
_ = t.StoreClusterHealthStatus(chainCode, mp)
}
}
}()
Expand Down Expand Up @@ -118,17 +119,17 @@ func (s *Service) CheckNodeTask() {

<-time.After(time.Duration(l))

for blockchain, store := range s.taskStore {
for chainCode, store := range s.taskStore {

list, err := store.GetAllKeyForNodeTask(blockchain)
list, err := store.GetAllKeyForNodeTask(chainCode)
if err != nil {
continue
}

tempList := make([]*kafka.Message, 0, 10)

for _, hash := range list {
count, task, err := store.GetNodeTask(blockchain, hash)
count, task, err := store.GetNodeTask(chainCode, hash)
if err != nil || count >= 5 || task == nil {
continue
}
Expand All @@ -142,23 +143,24 @@ func (s *Service) CheckNodeTask() {
task.CreateTime = time.Now()
task.LogTime = time.Now()
if task.BlockChain < 1 {
task.BlockChain = int(blockchain)
task.BlockChain = int(chainCode)
}
task.Id = time.Now().UnixNano()
task.TaskStatus = 0
task.NodeId = s.nodeId
bs, _ := json.Marshal(task)
s.log.Printf("NodeTask|task:%+v", task)
msg := &kafka.Message{Topic: fmt.Sprintf("task_%v", blockchain), Partition: 0, Key: []byte(task.NodeId), Value: bs}

bs, _ := json.Marshal(task)
msg := &kafka.Message{Topic: fmt.Sprintf(NodeTaskTopic, chainCode), Partition: 0, Key: []byte(task.NodeId), Value: bs}
tempList = append(tempList, msg)

if len(tempList) > 10 {
s.kafkaSender[blockchain] <- tempList
s.kafkaSender[chainCode] <- tempList
tempList = tempList[len(tempList):]
}
}

s.kafkaSender[blockchain] <- tempList
s.kafkaSender[chainCode] <- tempList
}

}
Expand All @@ -179,25 +181,23 @@ func (s *Service) CheckContract() {
//l = l * int64(time.Second)

<-time.After(30 * time.Minute)
for chainCode, store := range s.taskStore {

for blockchain, store := range s.taskStore {

list, err := store.GetAllKeyForContract(blockchain)
list, err := store.GetAllKeyForContract(chainCode)
if err != nil {
continue
}

for _, contract := range list {
data, _ := store.GetContract(blockchain, contract)
data, _ := store.GetContract(chainCode, contract)
if len(data) < 1 {
//todo 合约无效,需要刷新
s.getToken(blockchain, contract, contract)
s.getToken(chainCode, contract, contract)
}

}

}

}

}()
Expand All @@ -218,22 +218,22 @@ func (s *Service) CheckErrTx() {

<-time.After(time.Duration(l))

for blockchain, store := range s.taskStore {
for chainCode, store := range s.taskStore {

list, err := store.GetAllKeyForErrTx(blockchain)
list, err := store.GetAllKeyForErrTx(chainCode)
if err != nil {
continue
}

tempList := make([]*kafka.Message, 0, 10)

for _, hash := range list {
count, data, err := store.GetErrTxNodeTask(blockchain, hash)
count, data, err := store.GetErrTxNodeTask(chainCode, hash)
if err != nil || count >= 5 {
continue
}

//todo 重发交易任务
//重发交易任务
var v collect.NodeTask
_ = json.Unmarshal([]byte(data), &v)

Expand All @@ -247,22 +247,23 @@ func (s *Service) CheckErrTx() {
v.LogTime = time.Now()
v.NodeId = s.nodeId
if v.BlockChain < 1 {
v.BlockChain = int(blockchain)
v.BlockChain = int(chainCode)
}
v.Id = time.Now().UnixNano()
v.TaskStatus = 0
bs, _ := json.Marshal(v)
s.log.Printf("ErrTx|task:%+v", v)
msg := &kafka.Message{Topic: fmt.Sprintf("task_%v", v.BlockChain), Partition: 0, Key: []byte(v.NodeId), Value: bs}

bs, _ := json.Marshal(v)
msg := &kafka.Message{Topic: fmt.Sprintf(NodeTaskTopic, chainCode), Partition: 0, Key: []byte(v.NodeId), Value: bs}
tempList = append(tempList, msg)

if len(tempList) > 10 {
s.kafkaSender[blockchain] <- tempList
s.kafkaSender[chainCode] <- tempList
tempList = tempList[len(tempList):]
}
}

s.kafkaSender[blockchain] <- tempList
s.kafkaSender[chainCode] <- tempList
}

}
Expand Down
7 changes: 4 additions & 3 deletions task/service/taskcreate/db/file_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ import (
)

var (
NodeKey = "nodeKey_%v"
BlockChain = "blockChain"
NodeKey = "nodeKey_%v"
BlockChain = "blockChain"
NodeTaskTopic = "task_%v"
)

type TaskCreateFile struct {
Expand Down Expand Up @@ -67,7 +68,7 @@ func (t *TaskCreateFile) ToKafkaMessage(list []*task.NodeTask) ([]*kafka.Message
resultList := make([]*kafka.Message, 0, 5)
for _, v := range list {
bs, _ := json.Marshal(v)
msg := &kafka.Message{Topic: fmt.Sprintf("task_%v", v.BlockChain), Partition: 0, Key: []byte(v.NodeId), Value: bs}
msg := &kafka.Message{Topic: fmt.Sprintf(NodeTaskTopic, v.BlockChain), Partition: 0, Key: []byte(v.NodeId), Value: bs}
resultList = append(resultList, msg)
}
//t.sendCh <- resultList
Expand Down

0 comments on commit 3759f3b

Please sign in to comment.