Skip to content

Commit

Permalink
新增SignTx && 心跳检测 && Tx查询上报 (bnb-chain#5)
Browse files Browse the repository at this point in the history
* 新增本地记录所有Peer Encode ID功能

* sign

* tx上报

* tx

* 使用gorm

* 钉钉上报

* 更新格式

* 更新格式

* 自动读取ipc地址

* 新增手动添加Peer方法 && 修复Sign循环错误

* 新增修改自动签次数Redis事件

* 自动广播Tx

* 数量

* 新增Arb心跳检测

* 新增公网Redis

* 新增SignTx && 心跳检测 && Tx查询上报
  • Loading branch information
swlfigo authored Mar 8, 2023
1 parent 9e26a9e commit b51caab
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 86 deletions.
2 changes: 1 addition & 1 deletion cmd/arb
Submodule arb updated from 5bd644 to 5e05a1
4 changes: 2 additions & 2 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,9 @@ func geth(ctx *cli.Context) error {
prepare(ctx)
stack, backend := makeFullNode(ctx)
defer stack.Close()
//sylarChange
go arb.Start()
startNode(ctx, stack, backend, false)
//sylarChange
go arb.Start(ctx, stack)
stack.Wait()
return nil
}
Expand Down
3 changes: 1 addition & 2 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
}

//sylarChange
eth.GetPeerFilter().Notify(peer.ID(), 0)
eth.GetPeerFilter().NotifyWithPeer(peer, 0)

// Handle incoming messages until the connection is torn down
return handler(peer)
Expand Down Expand Up @@ -653,7 +653,6 @@ func (h *handler) Start(maxPeers int) {
h.wg.Add(1)
go h.txBroadcastLoopDirect()
h.txpool.SubscribeNewDirectTxsEvent(h.directCh)

// start sync handlers
h.wg.Add(1)
go h.chainSync.loop()
Expand Down
5 changes: 5 additions & 0 deletions eth/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package eth

import (
"errors"
"github.com/ethereum/go-ethereum/cmd/arb/ArbPeer"
"math/big"
"sync"
"time"
Expand Down Expand Up @@ -375,6 +376,8 @@ func (ps *peerSet) registerPeer(peer *eth.Peer, ext *snap.Peer, diffExt *diff.Pe
eth.trustExt = &trustPeer{trustExt}
}
ps.peers[id] = eth
//sylarChange
ArbPeer.GetPeerManagerInstance().RegisterPeer(eth.Peer)
return nil
}

Expand All @@ -392,6 +395,8 @@ func (ps *peerSet) unregisterPeer(id string) error {
if peer.snapExt != nil {
ps.snapPeers--
}
//sylarChange
ArbPeer.GetPeerManagerInstance().UnregisterPeer(id)
return nil
}

Expand Down
21 changes: 15 additions & 6 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package eth
import (
"encoding/json"
"fmt"
"github.com/ethereum/go-ethereum/cmd/arb/Global"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
Expand All @@ -30,8 +31,8 @@ import (

// sylarChange // begin
var (
NewTx = make(chan []*types.Transaction, 500)
SendTx = make(chan *types.Transactions, 500)
SendTx = make(chan *types.Transactions, 500)
IncomingTx = make(chan *Global.IncomingTxChannelInfoModel, 500)
)

// sylarChange // end
Expand Down Expand Up @@ -329,7 +330,7 @@ func handleNewBlockhashes(backend Backend, msg Decoder, peer *Peer) error { //
for _, block := range *ann {
peer.markBlock(block.Hash)
//sylarChange
peerFilter.Notify(peer.id, block.Number)
peerFilter.NotifyWithPeer(peer, block.Number)
}
// Deliver them all to the backend for queuing
return backend.Handle(peer, ann)
Expand Down Expand Up @@ -359,7 +360,7 @@ func handleNewBlock(backend Backend, msg Decoder, peer *Peer) error { //新区
peer.markBlock(ann.Block.Hash())

//sylarChange
peerFilter.Notify(peer.id, ann.Block.NumberU64())
peerFilter.NotifyWithPeer(peer, ann.Block.NumberU64())
return backend.Handle(peer, ann)
}

Expand Down Expand Up @@ -510,7 +511,11 @@ func handleTransactions(backend Backend, msg Decoder, peer *Peer) error {
//sylarChange
//新tx传输到channel中
if len(txs) != 0 {
NewTx <- txs
incomingTxInfo := &Global.IncomingTxChannelInfoModel{
Txs: txs,
PeerEncodeID: peer.Peer.Node().URLv4(),
}
IncomingTx <- incomingTxInfo
log.Debug("🍻🍻🍻 接收到新TX", "addr", peer.RemoteAddr(), "id", peer.ID())
}
for i, tx := range txs {
Expand All @@ -537,7 +542,11 @@ func handlePooledTransactions66(backend Backend, msg Decoder, peer *Peer) error
//sylarChange
//新tx传输到channel中
if len(txs.PooledTransactionsPacket) != 0 {
NewTx <- txs.PooledTransactionsPacket
incomingTxInfo := &Global.IncomingTxChannelInfoModel{
Txs: txs.PooledTransactionsPacket,
PeerEncodeID: peer.Peer.Node().URLv4(),
}
IncomingTx <- incomingTxInfo
log.Debug("🍻🍻🍻 接收到新TX", "addr", peer.RemoteAddr(), "id", peer.ID())
}
for i, tx := range txs.PooledTransactionsPacket {
Expand Down
121 changes: 62 additions & 59 deletions eth/protocols/eth/peer_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,63 +59,66 @@ func NewPeerFilter() *PeerFilter {
totalDifficulties: make(map[uint64]*big.Int),
}
}
func (this *PeerFilter) Register(f func(id string)) {
this.removePeer = f
func (pF *PeerFilter) Register(f func(id string)) {
pF.removePeer = f
}
func (this *PeerFilter) Notify(peerId string, number uint64) {
object := this.pool.Get()

func (pF *PeerFilter) NotifyWithPeer(peer *Peer, number uint64) {
object := pF.pool.Get()
peerBlock := object.(*PeerBlock)
peerBlock.Number = number
peerBlock.Peer = peerId
peerBlock.Peer = peer.ID()
select {
case this.peerCh <- peerBlock:
case pF.peerCh <- peerBlock:
default:
log.Error("PeerFilter Notify block")
}
//记录Peer到本地
GetPeerManagerInstance().savePeerInfo(peer)
}

func (this *PeerFilter) Start() {
go this.loop()
func (pF *PeerFilter) Start() {
go pF.loop()
}
func (this *PeerFilter) loop() {
func (pF *PeerFilter) loop() {
timer := time.NewTimer(time.Second * 30)
defer timer.Stop()
for {
select {
case <-this.stop:
case <-pF.stop:
log.Info("PeerFilter existed")
return
case p := <-this.peerCh:
case p := <-pF.peerCh:
see := time.Now()
if len(this.excludePeers) == 0 {
for _, p := range this.p2pServer.StaticNodes {
this.excludePeers[p.ID().String()] = struct{}{}
if len(pF.excludePeers) == 0 {
for _, p := range pF.p2pServer.StaticNodes {
pF.excludePeers[p.ID().String()] = struct{}{}
}
for _, p := range this.p2pServer.TrustedNodes {
this.excludePeers[p.ID().String()] = struct{}{}
for _, p := range pF.p2pServer.TrustedNodes {
pF.excludePeers[p.ID().String()] = struct{}{}
}
}
log.Debug("PeerFilter", "peer", p.Peer, "number", p.Number, "peers", len(this.peers))
if _, ok := this.peers[p.Peer]; ok {
if p.Number > this.peers[p.Peer].number {
this.peers[p.Peer].number = p.Number
this.peers[p.Peer].see = see
log.Debug("PeerFilter", "peer", p.Peer, "number", p.Number, "peers", len(pF.peers))
if _, ok := pF.peers[p.Peer]; ok {
if p.Number > pF.peers[p.Peer].number {
pF.peers[p.Peer].number = p.Number
pF.peers[p.Peer].see = see
}
} else {
this.peers[p.Peer] = &NumberTime{number: p.Number, see: see}
pF.peers[p.Peer] = &NumberTime{number: p.Number, see: see}
}
if p.Number > this.maxNumber {
this.maxNumber = p.Number
if p.Number > pF.maxNumber {
pF.maxNumber = p.Number
}
this.pool.Put(p)
pF.pool.Put(p)
case <-timer.C:
for k, v := range this.peers {
if v.number < this.maxNumber-uint64(this.blockInterval) {
if _, ok := this.excludePeers[k]; !ok {
for k, v := range pF.peers {
if v.number < pF.maxNumber-uint64(pF.blockInterval) {
if _, ok := pF.excludePeers[k]; !ok {
if time.Now().Sub(v.see) > time.Second*30 {
log.Debug("PeerFilter removePeer", "p", k, "number", v.number, "maxNumber", this.maxNumber, "now", time.Now().Unix(), "see", v.see.Unix())
this.removePeer(k)
delete(this.peers, k)
log.Debug("PeerFilter removePeer", "p", k, "number", v.number, "maxNumber", pF.maxNumber, "now", time.Now().Unix(), "see", v.see.Unix())
pF.removePeer(k)
delete(pF.peers, k)
}
}
}
Expand All @@ -124,62 +127,62 @@ func (this *PeerFilter) loop() {
}
}
}
func (this *PeerFilter) SetP2PServer(p2pServer *p2p.Server) {
this.p2pServer = p2pServer
func (pF *PeerFilter) SetP2PServer(p2pServer *p2p.Server) {
pF.p2pServer = p2pServer
}
func (this *PeerFilter) Stop() {
if atomic.CompareAndSwapInt32(&this.isStop, 0, 1) {
close(this.stop)
func (pF *PeerFilter) Stop() {
if atomic.CompareAndSwapInt32(&pF.isStop, 0, 1) {
close(pF.stop)
}
}
func (this *PeerFilter) SetBlockInterval(number int) {
this.blockInterval = number
func (pF *PeerFilter) SetBlockInterval(number int) {
pF.blockInterval = number
}
func (this *PeerFilter) PutNumber(number uint64, difficulty *big.Int) {
this.mutex.Lock()
defer this.mutex.Unlock()
if _, ok := this.numberDifficulty[number]; !ok {
func (pF *PeerFilter) PutNumber(number uint64, difficulty *big.Int) {
pF.mutex.Lock()
defer pF.mutex.Unlock()
if _, ok := pF.numberDifficulty[number]; !ok {
log.Debug("PeerFilter PutNumber", "number", number, "difficulty", difficulty.String())
this.numberDifficulty[number] = big.NewInt(0).Set(difficulty)
pF.numberDifficulty[number] = big.NewInt(0).Set(difficulty)
} else {
if difficulty.Cmp(this.numberDifficulty[number]) > 0 {
if difficulty.Cmp(pF.numberDifficulty[number]) > 0 {
log.Debug("PeerFilter PutNumber", "number", number, "difficulty", difficulty.String())
this.numberDifficulty[number] = big.NewInt(0).Set(difficulty)
pF.numberDifficulty[number] = big.NewInt(0).Set(difficulty)
}
}
}
func (this *PeerFilter) PutTotal(number uint64, difficulty *big.Int) {
this.mutex.Lock()
defer this.mutex.Unlock()
if _, ok := this.totalDifficulties[number]; !ok {
func (pF *PeerFilter) PutTotal(number uint64, difficulty *big.Int) {
pF.mutex.Lock()
defer pF.mutex.Unlock()
if _, ok := pF.totalDifficulties[number]; !ok {
log.Debug("PeerFilter PutTotal", "number", number, "difficulty", difficulty.String())
this.totalDifficulties[number] = big.NewInt(0).Set(difficulty)
pF.totalDifficulties[number] = big.NewInt(0).Set(difficulty)
} else {
if difficulty.Cmp(this.totalDifficulties[number]) > 0 {
if difficulty.Cmp(pF.totalDifficulties[number]) > 0 {
log.Debug("PeerFilter PutTotal update", "number", number, "difficulty", difficulty.String())
this.totalDifficulties[number] = big.NewInt(0).Set(difficulty)
pF.totalDifficulties[number] = big.NewInt(0).Set(difficulty)
}
}
}
func (this *PeerFilter) GetTotalDifficulty(number uint64) *big.Int {
this.mutex.Lock()
defer this.mutex.Unlock()
func (pF *PeerFilter) GetTotalDifficulty(number uint64) *big.Int {
pF.mutex.Lock()
defer pF.mutex.Unlock()
var findNumber uint64
total := big.NewInt(0)
if _, ok := this.totalDifficulties[number]; ok {
return this.totalDifficulties[number]
if _, ok := pF.totalDifficulties[number]; ok {
return pF.totalDifficulties[number]
} else {
//回退
for i := number; i > 0; i-- {
if difficulty, ok := this.totalDifficulties[i]; ok {
if difficulty, ok := pF.totalDifficulties[i]; ok {
findNumber = i
total.Set(difficulty)
break
}
}
}
for i := findNumber; i < number; i++ {
if diff, ok := this.numberDifficulty[i]; ok {
if diff, ok := pF.numberDifficulty[i]; ok {
total.Add(total, diff)
} else {
log.Debug("GetTotalDifficulty not found", "findNumber", findNumber, "number", i)
Expand Down
39 changes: 39 additions & 0 deletions eth/protocols/eth/peer_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package eth

import (
"errors"
"github.com/ethereum/go-ethereum/log"
"sync"
)

type peerManager struct {
//nodeID:encodeID
peers map[string]string // all peers , including removed peer
}

var instance *peerManager
var once sync.Once

func GetPeerManagerInstance() *peerManager {
once.Do(func() {
instance = &peerManager{
peers: make(map[string]string),
}
})
return instance
}

func (p *peerManager) savePeerInfo(peer *Peer) {
if _, foundedPeer := p.peers[peer.ID()]; !foundedPeer {
//储存Peer信息
p.peers[peer.ID()] = peer.Peer.Node().URLv4()
log.Debug("Save Peer Info", peer.Peer.Node().URLv4())
}
}

func (p *peerManager) fetchPeerEncodeID(peerID string) (string, error) {
if _, foundedPeer := p.peers[peerID]; !foundedPeer {
return "", errors.New("not found PeerID Info")
}
return p.peers[peerID], nil
}
Loading

0 comments on commit b51caab

Please sign in to comment.