Skip to content

Commit

Permalink
Merge pull request ethereum#108 from thinkAfCod/portal_fix
Browse files Browse the repository at this point in the history
fix: cache node id and add logs
  • Loading branch information
fearlessfe authored May 20, 2024
2 parents 652b087 + 0159c17 commit 4968c89
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 40 deletions.
2 changes: 1 addition & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server.
PortalLogLevelFlag = &cli.IntFlag{
Name: "loglevel",
Usage: "loglevel of portal network",
Value: node.DetaultLoglevel,
Value: node.DefaultLoglevel,
Category: flags.PortalNetworkCategory,
}

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ require (
github.com/mattn/go-sqlite3 v1.14.18
github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416
github.com/olekukonko/tablewriter v0.0.5
github.com/optimism-java/utp-go v0.0.0-20240309041853-b6b3a0dea581
github.com/optimism-java/utp-go v0.0.0-20240518144144-6560912a0d99
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7
github.com/protolambda/bls12-381-util v0.1.0
github.com/protolambda/zrnt v0.32.2
Expand All @@ -75,7 +75,7 @@ require (
golang.org/x/crypto v0.22.0
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa
golang.org/x/sync v0.7.0
golang.org/x/sys v0.19.0
golang.org/x/sys v0.20.0
golang.org/x/text v0.14.0
golang.org/x/time v0.5.0
golang.org/x/tools v0.20.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,8 @@ github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsq
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/optimism-java/utp-go v0.0.0-20240309041853-b6b3a0dea581 h1:ZxgrtI0xIw+clB32iDDDWaiTcCizTeN7rNyzH9YorPI=
github.com/optimism-java/utp-go v0.0.0-20240309041853-b6b3a0dea581/go.mod h1:DZ0jYzLzt4ZsCmhI/iqYgGFoNx45OfpEoKzXB8HVALQ=
github.com/optimism-java/utp-go v0.0.0-20240518144144-6560912a0d99 h1:8NEQQ8KNNUASMBB0OdnfYuxnOIEHJm1NcDiPnMb2Kvk=
github.com/optimism-java/utp-go v0.0.0-20240518144144-6560912a0d99/go.mod h1:DZ0jYzLzt4ZsCmhI/iqYgGFoNx45OfpEoKzXB8HVALQ=
github.com/optimism-java/zrnt v0.32.4-0.20240415084906-d9dbf06b32f7 h1:ZTQWXQ8xblCRUXhZs3h5qrBMSAHe8iNH7BG7a7IVFlI=
github.com/optimism-java/zrnt v0.32.4-0.20240415084906-d9dbf06b32f7/go.mod h1:A0fezkp9Tt3GBLATSPIbuY4ywYESyAuc/FFmPKg8Lqs=
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 h1:oYW+YCJ1pachXTQmzR3rNLYGGz4g/UgFcjb28p/viDM=
Expand Down Expand Up @@ -709,6 +711,8 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand Down
2 changes: 1 addition & 1 deletion node/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (
DefaultAuthHost = "localhost" // Default host interface for the authenticated apis
DefaultAuthPort = 8551 // Default port for the authenticated apis
DefaultUDPPort = 9009 // Default UDP port for the p2p network
DetaultLoglevel = 1 // Default loglevel for portal network, which is error level
DefaultLoglevel = 1 // Default loglevel for portal network, which is error level
)

const (
Expand Down
92 changes: 56 additions & 36 deletions p2p/discover/portal_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ const (

portalFindnodesResultLimit = 32

defaultUTPConnectTimeout = 15 * time.Second
defaultUTPConnectTimeout = 60 * time.Second

defaultUTPWriteTimeout = 60 * time.Second

Expand Down Expand Up @@ -159,7 +159,9 @@ func DefaultPortalProtocolConfig() *PortalProtocolConfig {
}

type PortalProtocol struct {
table *Table
table *Table
cachedIdsLock sync.Mutex
cachedIds map[string]enode.ID

protocolId string
protocolName string
Expand Down Expand Up @@ -199,6 +201,7 @@ func NewPortalProtocol(config *PortalProtocolConfig, protocolId string, privateK

protocolName := portalwire.NetworkNameMap[protocolId]
protocol := &PortalProtocol{
cachedIds: make(map[string]enode.ID),
protocolId: protocolId,
protocolName: protocolName,
ListenAddr: config.ListenAddr,
Expand Down Expand Up @@ -232,7 +235,6 @@ func (p *PortalProtocol) Start() error {
if err != nil {
return err
}

p.DiscV5.RegisterTalkHandler(p.protocolId, p.handleTalkRequest)
p.DiscV5.RegisterTalkHandler(string(portalwire.UTPNetwork), p.handleUtpTalkRequest)

Expand Down Expand Up @@ -293,37 +295,28 @@ func (p *PortalProtocol) setupUDPListening() error {
var err error
p.packetRouter = utp.NewPacketRouter(
func(buf []byte, addr *net.UDPAddr) (int, error) {
nodes := p.table.Nodes()
var target *enode.Node
for _, n := range nodes {
if addr.Port != n.UDP() {
continue
}
if addr.IP != nil && addr.IP.To4().String() == n.IP().To4().String() {
target = n

break
}
if addr.IP == nil {
nodeIp := n.IP().To4().String()
if nodeIp == "127.0.0.1" || nodeIp == "0.0.0.0" {
target = n
break
p.Log.Info("will send to target data", "network", string(portalwire.UTPNetwork), "ip", addr.IP.To4().String(), "port", addr.Port, "bufLength", len(buf))

p.cachedIdsLock.Lock()
defer p.cachedIdsLock.Unlock()
if id, ok := p.cachedIds[addr.String()]; ok {
sendToId := id
go func(targetId enode.ID, addr *net.UDPAddr, utpNetwork string, buffer []byte) {
_, err := p.DiscV5.TalkRequestToID(targetId, addr, utpNetwork, buffer)
if err != nil {
p.Log.Error("send utp talk request failed", "err", err)
}
}
}

if target == nil {
p.Log.Warn("not fount target node info", "ip", addr.IP.To4().String(), "port", addr.Port, "bufLength", len(buf))
return 0, fmt.Errorf("not found target node info")
}(sendToId, addr, string(portalwire.UTPNetwork), buf)
return len(buf), nil
} else {
p.Log.Warn("not found target node info", "ip", addr.IP.To4().String(), "port", addr.Port, "bufLength", len(buf))
return 0, fmt.Errorf("not found target node id")
}
p.Log.Trace("send to target data", "ip", addr.IP.To4().String(), "port", addr.Port, "bufLength", len(buf))
_, err := p.DiscV5.TalkRequest(target, string(portalwire.UTPNetwork), buf)
return len(buf), err
})

ctx := context.Background()
var logger *zap.Logger

if p.Log.Enabled(ctx, log.LevelDebug) || p.Log.Enabled(ctx, log.LevelTrace) {
logger, err = zap.NewDevelopmentConfig().Build()
} else {
Expand Down Expand Up @@ -367,6 +360,23 @@ func (p *PortalProtocol) setupDiscV5AndTable() error {
return nil
}

func (p *PortalProtocol) putCacheNodeId(node *enode.Node) {
p.cachedIdsLock.Lock()
defer p.cachedIdsLock.Unlock()
addr := &net.UDPAddr{IP: node.IP(), Port: node.UDP()}
if _, ok := p.cachedIds[addr.String()]; !ok {
p.cachedIds[addr.String()] = node.ID()
}
}

func (p *PortalProtocol) putCacheId(id enode.ID, addr *net.UDPAddr) {
p.cachedIdsLock.Lock()
defer p.cachedIdsLock.Unlock()
if _, ok := p.cachedIds[addr.String()]; !ok {
p.cachedIds[addr.String()] = id
}
}

func (p *PortalProtocol) ping(node *enode.Node) (uint64, error) {
pong, err := p.pingInner(node)
if err != nil {
Expand Down Expand Up @@ -510,6 +520,9 @@ func (p *PortalProtocol) processOffer(target *enode.Node, resp []byte, request *
return nil, fmt.Errorf("invalid accept response")
}

p.Log.Info("will process Offer", "id", target.ID(), "ip", target.IP().To4().String(), "port", target.UDP())
p.putCacheNodeId(target)

accept := &portalwire.Accept{}
err = accept.UnmarshalSSZ(resp[1:])
if err != nil {
Expand Down Expand Up @@ -578,8 +591,8 @@ func (p *PortalProtocol) processOffer(target *enode.Node, resp []byte, request *
connctx, conncancel := context.WithTimeout(ctx, defaultUTPConnectTimeout)
laddr := p.utp.Addr().(*utp.Addr)
raddr := &utp.Addr{IP: target.IP(), Port: target.UDP()}
conn, err = utp.DialUTPOptions("utp", laddr, raddr, utp.WithContext(connctx), utp.WithSocketManager(p.utpSm), utp.WithConnId(uint32(connId)))
p.Log.Info("will connect to: ", "addr", raddr.String(), "connId", connId)
conn, err = utp.DialUTPOptions("utp", laddr, raddr, utp.WithContext(connctx), utp.WithSocketManager(p.utpSm), utp.WithConnId(uint32(connId)))
if err != nil {
conncancel()
p.Log.Error("failed to dial utp connection", "err", err)
Expand Down Expand Up @@ -633,6 +646,9 @@ func (p *PortalProtocol) processContent(target *enode.Node, resp []byte) (byte,
return 0xff, nil, fmt.Errorf("invalid content response")
}

p.Log.Info("will process content", "id", target.ID(), "ip", target.IP().To4().String(), "port", target.UDP())
p.putCacheNodeId(target)

switch resp[1] {
case portalwire.ContentRawSelector:
content := &portalwire.Content{}
Expand All @@ -657,8 +673,8 @@ func (p *PortalProtocol) processContent(target *enode.Node, resp []byte) (byte,
laddr := p.utp.Addr().(*utp.Addr)
raddr := &utp.Addr{IP: target.IP(), Port: target.UDP()}
connId := binary.BigEndian.Uint16(connIdMsg.Id[:])
conn, err := utp.DialUTPOptions("utp", laddr, raddr, utp.WithContext(connctx), utp.WithSocketManager(p.utpSm), utp.WithConnId(uint32(connId)))
p.Log.Info("will connect to: ", "addr", raddr.String(), "connId", connId)
conn, err := utp.DialUTPOptions("utp", laddr, raddr, utp.WithContext(connctx), utp.WithSocketManager(p.utpSm), utp.WithConnId(uint32(connId)))
if err != nil {
conncancel()
return 0xff, nil, err
Expand Down Expand Up @@ -784,16 +800,17 @@ func (p *PortalProtocol) handleUtpTalkRequest(id enode.ID, addr *net.UDPAddr, ms
if n := p.DiscV5.getNode(id); n != nil {
p.table.addSeenNode(wrapNode(n))
}
p.putCacheId(id, addr)
p.Log.Trace("receive utp data", "addr", addr, "msg-length", len(msg))
p.packetRouter.ReceiveMessage(msg, addr)
return []byte("")
}

func (p *PortalProtocol) handleTalkRequest(id enode.ID, addr *net.UDPAddr, msg []byte) []byte {
p.Log.Trace("handleTalkRequest", "id", id, "addr", addr)
if n := p.DiscV5.getNode(id); n != nil {
p.table.addSeenNode(wrapNode(n))
}
p.putCacheId(id, addr)

msgCode := msg[0]

Expand Down Expand Up @@ -958,6 +975,8 @@ func (p *PortalProtocol) handleFindContent(id enode.ID, addr *net.UDPAddr, reque
return nil, err
}

p.putCacheId(id, addr)

if errors.Is(err, ContentNotFound) {
closestNodes := p.findNodesCloseToContent(contentId, portalFindnodesResultLimit)
for i, n := range closestNodes {
Expand Down Expand Up @@ -1027,14 +1046,13 @@ func (p *PortalProtocol) handleFindContent(id enode.ID, addr *net.UDPAddr, reque
default:
ctx, cancel := context.WithTimeout(bctx, defaultUTPConnectTimeout)
var conn *utp.Conn
p.Log.Debug("will accept find content conn from: ", "source", addr, "connId", connId)
conn, err = p.utp.AcceptUTPContext(ctx, connIdSend)
p.Log.Info("will accept from: ", "source", addr, "connId", connId)
if err != nil {
p.Log.Error("failed to accept utp connection", "connId", connIdSend, "err", err)
p.Log.Error("failed to accept utp connection for handle find content", "connId", connIdSend, "err", err)
cancel()
return
}
p.Log.Info("")
cancel()

err = conn.SetWriteDeadline(time.Now().Add(defaultUTPWriteTimeout))
Expand Down Expand Up @@ -1135,6 +1153,8 @@ func (p *PortalProtocol) handleOffer(id enode.ID, addr *net.UDPAddr, request *po
}
}

p.putCacheId(id, addr)

idBuffer := make([]byte, 2)
if contentKeyBitlist.Count() != 0 {
connId := p.connIdGen.GenCid(id, false)
Expand All @@ -1148,10 +1168,10 @@ func (p *PortalProtocol) handleOffer(id enode.ID, addr *net.UDPAddr, request *po
default:
ctx, cancel := context.WithTimeout(bctx, defaultUTPConnectTimeout)
var conn *utp.Conn
p.Log.Debug("will accept offer conn from: ", "source", addr, "connId", connId)
conn, err = p.utp.AcceptUTPContext(ctx, connIdSend)
p.Log.Info("will accept from: ", "source", addr, "connId", connId)
if err != nil {
p.Log.Error("failed to accept utp connection", "connId", connIdSend, "err", err)
p.Log.Error("failed to accept utp connection for handle offer", "connId", connIdSend, "err", err)
cancel()
return
}
Expand Down

0 comments on commit 4968c89

Please sign in to comment.