Skip to content

Commit

Permalink
refact: get monitor address for collect
Browse files Browse the repository at this point in the history
  • Loading branch information
sunhongtao committed Dec 19, 2023
1 parent 8513b44 commit fbb2ce7
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 139 deletions.
6 changes: 3 additions & 3 deletions blockchain/service/bnb/bnb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
)

func Init() blockchain.API {
cfg := config.LoadConfig("./../../../cmd/blockchain/config_bnb.json")
return NewBnb(cfg.Cluster[202], 202, xlog.NewXLogger())
cfg := config.LoadConfig("./../../../cmd/blockchain/blockchain_config.json")
return NewBnb(cfg.Cluster[2610], 2610, xlog.NewXLogger())
}

func TestBnb_Token(t *testing.T) {
Expand All @@ -26,7 +26,7 @@ func TestBnb_Token(t *testing.T) {

func TestBnb_TokenBalance(t *testing.T) {
s := Init()
resp, err := s.TokenBalance(202, "0x403f9D1EA51D55d0341ce3c2fBF33E09846F2C74", "0x55d398326f99059fF775485246999027B3197955", "")
resp, err := s.TokenBalance(2610, "0xf0e4939183a76746e602a12c389ab183be4290b1", "0x337610d27c682e347c9cd60bd4b3b107c9d34ddd", "")

if err != nil {
t.Error(err)
Expand Down
78 changes: 51 additions & 27 deletions collect/service/cmd/chain/bnb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"time"

"github.com/0xcregis/easynode/blockchain"
Expand All @@ -30,6 +31,8 @@ type Service struct {
txChainClient blockchain.API
blockChainClient blockchain.API
receiptChainClient blockchain.API
monitorAddress map[string]int64
lock sync.RWMutex
}

func (s *Service) GetMultiBlockByNumber(blockNumber string, log *logrus.Entry, flag bool) ([]*collect.BlockInterface, []*collect.TxInterface) {
Expand All @@ -39,7 +42,7 @@ func (s *Service) GetMultiBlockByNumber(blockNumber string, log *logrus.Entry, f
func (s *Service) Monitor() {
go func() {
for {
<-time.After(30 * time.Second)
<-time.After(60 * time.Second)
if s.txChainClient != nil {
txCluster := s.txChainClient.MonitorCluster()
_ = s.store.StoreClusterNode(int64(s.chain.BlockChainCode), "tx", txCluster)
Expand All @@ -54,10 +57,24 @@ func (s *Service) Monitor() {
receiptCluster := s.receiptChainClient.MonitorCluster()
_ = s.store.StoreClusterNode(int64(s.chain.BlockChainCode), "receipt", receiptCluster)
}

// reload monitor address
s.reload()
}
}()
}

func (s *Service) reload() {
addressList, err := s.store.GetMonitorAddress(int64(s.chain.BlockChainCode))
if err != nil {
s.log.Warnf("ReloadMonitorAddress BlockChainName=%v,err=%v", s.chain.BlockChainName, err)
}

s.lock.Lock()
s.monitorAddress = rebuildAddress(addressList)
s.lock.Unlock()
}

func (s *Service) GetBlockByHash(blockHash string, eLog *logrus.Entry, flag bool) (*collect.BlockInterface, []*collect.TxInterface) {
start := time.Now()
defer func() {
Expand Down Expand Up @@ -122,21 +139,21 @@ func (s *Service) GetBlockByHash(blockHash string, eLog *logrus.Entry, flag bool
}

//get monitor address list
addressList, err := s.store.GetMonitorAddress(int64(s.chain.BlockChainCode))
if err != nil {
eLog.Errorf("GetBlockByHash|BlockChainName=%v,err=%v,blockHash=%v", s.chain.BlockChainName, err, blockHash)
}
if len(addressList) < 1 {
eLog.Warnf("the tx is ignored due to monitor address is empty,blockHash=%v", blockHash)
return r, nil
}

//filter tx
addressMp := rebuildAddress(addressList)
//addressList, err := s.store.GetMonitorAddress(int64(s.chain.BlockChainCode))
//if err != nil {
// eLog.Errorf("GetBlockByHash|BlockChainName=%v,err=%v,blockHash=%v", s.chain.BlockChainName, err, blockHash)
//}
//if len(addressList) < 1 {
// eLog.Warnf("the tx is ignored due to monitor address is empty,blockHash=%v", blockHash)
// return r, nil
//}
//
////filter tx
//addressMp := rebuildAddress(addressList)
txs := make([]*collect.TxInterface, 0, len(txList))
for _, tx := range txList {
bs, _ := json.Marshal(tx)
if s.CheckAddress(bs, addressMp) {
if s.CheckAddress(bs, s.monitorAddress) {
t := &collect.TxInterface{TxHash: tx.TxHash, Tx: tx}
txs = append(txs, t)
} else {
Expand Down Expand Up @@ -216,22 +233,22 @@ func (s *Service) GetBlockByNumber(blockNumber string, eLog *logrus.Entry, flag
}

//get monitor address list
addressList, err := s.store.GetMonitorAddress(int64(s.chain.BlockChainCode))
if err != nil {
eLog.Errorf("GetBlockByNumber|BlockChainName=%v,err=%v,blockNumber=%v", s.chain.BlockChainName, err, blockNumber)
}

if len(addressList) < 1 {
eLog.Warnf("the tx is ignored due to monitor address is empty,blockNumber=%v", blockNumber)
return r, nil
}

//filter tx
addressMp := rebuildAddress(addressList)
//addressList, err := s.store.GetMonitorAddress(int64(s.chain.BlockChainCode))
//if err != nil {
// eLog.Errorf("GetBlockByNumber|BlockChainName=%v,err=%v,blockNumber=%v", s.chain.BlockChainName, err, blockNumber)
//}
//
//if len(addressList) < 1 {
// eLog.Warnf("the tx is ignored due to monitor address is empty,blockNumber=%v", blockNumber)
// return r, nil
//}
//
////filter tx
//addressMp := rebuildAddress(addressList)
txs := make([]*collect.TxInterface, 0, len(txList))
for _, tx := range txList {
bs, _ := json.Marshal(tx)
if s.CheckAddress(bs, addressMp) {
if s.CheckAddress(bs, s.monitorAddress) {
t := &collect.TxInterface{TxHash: tx.TxHash, Tx: tx}
txs = append(txs, t)
} else {
Expand Down Expand Up @@ -561,13 +578,16 @@ func (s *Service) CheckAddress(tx []byte, addrList map[string]int64) bool {
//}

has := false
s.lock.RLock()
for k := range txAddressList {
//monitorAddr := getCoreAddr(v)
if _, ok := addrList[k]; ok {
has = true
break
}
}

s.lock.RUnlock()
return has
}

Expand Down Expand Up @@ -616,7 +636,7 @@ func NewService(c *config.Chain, x *xlog.XLog, store collect.StoreTaskInterface,
receiptClient = chainService.NewApi(int64(c.BlockChainCode), list, x)
}

return &Service{
s := &Service{
log: x,
chain: c,
store: store,
Expand All @@ -626,5 +646,9 @@ func NewService(c *config.Chain, x *xlog.XLog, store collect.StoreTaskInterface,
nodeId: nodeId,
transferTopic: transferTopic,
nftTransferSingleTopic: nftTransferSingleTopic,
lock: sync.RWMutex{},
}

s.reload()
return s
}
67 changes: 45 additions & 22 deletions collect/service/cmd/chain/btc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package btc
import (
"encoding/json"
"strings"
"sync"
"time"

"github.com/0xcregis/easynode/blockchain"
Expand All @@ -23,6 +24,8 @@ type Service struct {
txChainClient blockchain.API
blockChainClient blockchain.API
receiptChainClient blockchain.API
monitorAddress map[string]int64
lock sync.RWMutex
}

func (s *Service) GetMultiBlockByNumber(blockNumber string, log *logrus.Entry, flag bool) ([]*collect.BlockInterface, []*collect.TxInterface) {
Expand All @@ -32,7 +35,7 @@ func (s *Service) GetMultiBlockByNumber(blockNumber string, log *logrus.Entry, f
func (s *Service) Monitor() {
go func() {
for {
<-time.After(30 * time.Second)
<-time.After(60 * time.Second)
if s.txChainClient != nil {
txCluster := s.txChainClient.MonitorCluster()
_ = s.store.StoreClusterNode(int64(s.chain.BlockChainCode), "tx", txCluster)
Expand All @@ -47,10 +50,24 @@ func (s *Service) Monitor() {
receiptCluster := s.receiptChainClient.MonitorCluster()
_ = s.store.StoreClusterNode(int64(s.chain.BlockChainCode), "receipt", receiptCluster)
}

// reload monitor address
s.reload()
}
}()
}

func (s *Service) reload() {
addressList, err := s.store.GetMonitorAddress(int64(s.chain.BlockChainCode))
if err != nil {
s.log.Warnf("ReloadMonitorAddress BlockChainName=%v,err=%v", s.chain.BlockChainName, err)
}

s.lock.Lock()
s.monitorAddress = rebuildAddress(addressList)
s.lock.Unlock()
}

func (s *Service) GetBlockByHash(blockHash string, eLog *logrus.Entry, flag bool) (*collect.BlockInterface, []*collect.TxInterface) {
start := time.Now()
defer func() {
Expand Down Expand Up @@ -84,15 +101,15 @@ func (s *Service) GetBlockByHash(blockHash string, eLog *logrus.Entry, flag bool
return r, nil
}

addressList, err := s.store.GetMonitorAddress(int64(s.chain.BlockChainCode))
if err != nil {
eLog.Errorf("GetBlockByHash|BlockChainName=%v,err=%v,blockHash=%v", s.chain.BlockChainName, err, blockHash)
}
addressMp := rebuildAddress(addressList)
//addressList, err := s.store.GetMonitorAddress(int64(s.chain.BlockChainCode))
//if err != nil {
// eLog.Errorf("GetBlockByHash|BlockChainName=%v,err=%v,blockHash=%v", s.chain.BlockChainName, err, blockHash)
//}
//addressMp := rebuildAddress(addressList)
txs := make([]*collect.TxInterface, 0, len(txList))
for _, tx := range txList {
bs, _ := json.Marshal(tx)
if s.CheckAddress(bs, addressMp) {
if s.CheckAddress(bs, s.monitorAddress) {
t := &collect.TxInterface{TxHash: tx.TxHash, Tx: tx}
txs = append(txs, t)
}
Expand Down Expand Up @@ -137,16 +154,16 @@ func (s *Service) GetBlockByNumber(blockNumber string, eLog *logrus.Entry, flag
return r, nil
}

addressList, err := s.store.GetMonitorAddress(int64(s.chain.BlockChainCode))
if err != nil {
eLog.Warnf("GetBlockByNumber|BlockChainName=%v,err=%v,blockNumber=%v", s.chain.BlockChainName, err, blockNumber)
}

addressMp := rebuildAddress(addressList)
//addressList, err := s.store.GetMonitorAddress(int64(s.chain.BlockChainCode))
//if err != nil {
// eLog.Warnf("GetBlockByNumber|BlockChainName=%v,err=%v,blockNumber=%v", s.chain.BlockChainName, err, blockNumber)
//}
//
//addressMp := rebuildAddress(addressList)
txs := make([]*collect.TxInterface, 0, len(txList))
for _, tx := range txList {
bs, _ := json.Marshal(tx)
if s.CheckAddress(bs, addressMp) {
if s.CheckAddress(bs, s.monitorAddress) {
t := &collect.TxInterface{TxHash: tx.TxHash, Tx: tx}
txs = append(txs, t)
} else {
Expand Down Expand Up @@ -192,14 +209,14 @@ func (s *Service) GetTx(txHash string, eLog *logrus.Entry) *collect.TxInterface
}
}

addressList, err := s.store.GetMonitorAddress(int64(s.chain.BlockChainCode))
if err != nil {
eLog.Errorf("GetTx|BlockChainName=%v,err=%v,txHash=%v", s.chain.BlockChainName, err, txHash)
}

addressMp := rebuildAddress(addressList)
//addressList, err := s.store.GetMonitorAddress(int64(s.chain.BlockChainCode))
//if err != nil {
// eLog.Errorf("GetTx|BlockChainName=%v,err=%v,txHash=%v", s.chain.BlockChainName, err, txHash)
//}
//
//addressMp := rebuildAddress(addressList)
bs, _ := json.Marshal(tx)
if s.CheckAddress(bs, addressMp) {
if s.CheckAddress(bs, s.monitorAddress) {
r := &collect.TxInterface{TxHash: tx.TxHash, Tx: tx}
return r
} else {
Expand Down Expand Up @@ -260,12 +277,14 @@ func (s *Service) CheckAddress(tx []byte, addrList map[string]int64) bool {
}

has := false
s.lock.RLock()
for k := range txAddressList {
if _, ok := addrList[k]; ok {
has = true
break
}
}
s.lock.RUnlock()
return has
}

Expand Down Expand Up @@ -314,13 +333,17 @@ func NewService(c *config.Chain, x *xlog.XLog, store collect.StoreTaskInterface,
receiptClient = chainService.NewApi(int64(c.BlockChainCode), list, x)
}

return &Service{
s := &Service{
log: x,
chain: c,
store: store,
txChainClient: txClient,
blockChainClient: blockClient,
receiptChainClient: receiptClient,
nodeId: nodeId,
lock: sync.RWMutex{},
}

s.reload()
return s
}
Loading

0 comments on commit fbb2ce7

Please sign in to comment.