diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 72d2c47a3445..c91d07343060 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1000,7 +1000,7 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server. PortalLogLevelFlag = &cli.IntFlag{ Name: "loglevel", Usage: "loglevel of portal network", - Value: node.DetaultLoglevel, + Value: node.DefaultLoglevel, Category: flags.PortalNetworkCategory, } diff --git a/go.mod b/go.mod index 57abc1e0e8c4..faa3710c643d 100644 --- a/go.mod +++ b/go.mod @@ -55,7 +55,7 @@ require ( github.com/mattn/go-sqlite3 v1.14.18 github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416 github.com/olekukonko/tablewriter v0.0.5 - github.com/optimism-java/utp-go v0.0.0-20240309041853-b6b3a0dea581 + github.com/optimism-java/utp-go v0.0.0-20240518144144-6560912a0d99 github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 github.com/protolambda/bls12-381-util v0.1.0 github.com/protolambda/zrnt v0.32.2 @@ -75,7 +75,7 @@ require ( golang.org/x/crypto v0.22.0 golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa golang.org/x/sync v0.7.0 - golang.org/x/sys v0.19.0 + golang.org/x/sys v0.20.0 golang.org/x/text v0.14.0 golang.org/x/time v0.5.0 golang.org/x/tools v0.20.0 diff --git a/go.sum b/go.sum index 98985cb56a7c..cb9ed1295ecb 100644 --- a/go.sum +++ b/go.sum @@ -431,6 +431,8 @@ github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsq github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/optimism-java/utp-go v0.0.0-20240309041853-b6b3a0dea581 h1:ZxgrtI0xIw+clB32iDDDWaiTcCizTeN7rNyzH9YorPI= github.com/optimism-java/utp-go v0.0.0-20240309041853-b6b3a0dea581/go.mod h1:DZ0jYzLzt4ZsCmhI/iqYgGFoNx45OfpEoKzXB8HVALQ= +github.com/optimism-java/utp-go v0.0.0-20240518144144-6560912a0d99 h1:8NEQQ8KNNUASMBB0OdnfYuxnOIEHJm1NcDiPnMb2Kvk= +github.com/optimism-java/utp-go v0.0.0-20240518144144-6560912a0d99/go.mod h1:DZ0jYzLzt4ZsCmhI/iqYgGFoNx45OfpEoKzXB8HVALQ= github.com/optimism-java/zrnt v0.32.4-0.20240415084906-d9dbf06b32f7 h1:ZTQWXQ8xblCRUXhZs3h5qrBMSAHe8iNH7BG7a7IVFlI= github.com/optimism-java/zrnt v0.32.4-0.20240415084906-d9dbf06b32f7/go.mod h1:A0fezkp9Tt3GBLATSPIbuY4ywYESyAuc/FFmPKg8Lqs= github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 h1:oYW+YCJ1pachXTQmzR3rNLYGGz4g/UgFcjb28p/viDM= @@ -709,6 +711,8 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= diff --git a/node/defaults.go b/node/defaults.go index 47511a98e886..b87ba1d8af5b 100644 --- a/node/defaults.go +++ b/node/defaults.go @@ -35,7 +35,7 @@ const ( DefaultAuthHost = "localhost" // Default host interface for the authenticated apis DefaultAuthPort = 8551 // Default port for the authenticated apis DefaultUDPPort = 9009 // Default UDP port for the p2p network - DetaultLoglevel = 1 // Default loglevel for portal network, which is error level + DefaultLoglevel = 1 // Default loglevel for portal network, which is error level ) const ( diff --git a/p2p/discover/portal_protocol.go b/p2p/discover/portal_protocol.go index 7b5e1a0245c8..79919c3327ac 100644 --- a/p2p/discover/portal_protocol.go +++ b/p2p/discover/portal_protocol.go @@ -53,7 +53,7 @@ const ( portalFindnodesResultLimit = 32 - defaultUTPConnectTimeout = 15 * time.Second + defaultUTPConnectTimeout = 60 * time.Second defaultUTPWriteTimeout = 60 * time.Second @@ -159,7 +159,9 @@ func DefaultPortalProtocolConfig() *PortalProtocolConfig { } type PortalProtocol struct { - table *Table + table *Table + cachedIdsLock sync.Mutex + cachedIds map[string]enode.ID protocolId string protocolName string @@ -199,6 +201,7 @@ func NewPortalProtocol(config *PortalProtocolConfig, protocolId string, privateK protocolName := portalwire.NetworkNameMap[protocolId] protocol := &PortalProtocol{ + cachedIds: make(map[string]enode.ID), protocolId: protocolId, protocolName: protocolName, ListenAddr: config.ListenAddr, @@ -232,7 +235,6 @@ func (p *PortalProtocol) Start() error { if err != nil { return err } - p.DiscV5.RegisterTalkHandler(p.protocolId, p.handleTalkRequest) p.DiscV5.RegisterTalkHandler(string(portalwire.UTPNetwork), p.handleUtpTalkRequest) @@ -293,37 +295,28 @@ func (p *PortalProtocol) setupUDPListening() error { var err error p.packetRouter = utp.NewPacketRouter( func(buf []byte, addr *net.UDPAddr) (int, error) { - nodes := p.table.Nodes() - var target *enode.Node - for _, n := range nodes { - if addr.Port != n.UDP() { - continue - } - if addr.IP != nil && addr.IP.To4().String() == n.IP().To4().String() { - target = n - - break - } - if addr.IP == nil { - nodeIp := n.IP().To4().String() - if nodeIp == "127.0.0.1" || nodeIp == "0.0.0.0" { - target = n - break + p.Log.Info("will send to target data", "network", string(portalwire.UTPNetwork), "ip", addr.IP.To4().String(), "port", addr.Port, "bufLength", len(buf)) + + p.cachedIdsLock.Lock() + defer p.cachedIdsLock.Unlock() + if id, ok := p.cachedIds[addr.String()]; ok { + sendToId := id + go func(targetId enode.ID, addr *net.UDPAddr, utpNetwork string, buffer []byte) { + _, err := p.DiscV5.TalkRequestToID(targetId, addr, utpNetwork, buffer) + if err != nil { + p.Log.Error("send utp talk request failed", "err", err) } - } - } - - if target == nil { - p.Log.Warn("not fount target node info", "ip", addr.IP.To4().String(), "port", addr.Port, "bufLength", len(buf)) - return 0, fmt.Errorf("not found target node info") + }(sendToId, addr, string(portalwire.UTPNetwork), buf) + return len(buf), nil + } else { + p.Log.Warn("not found target node info", "ip", addr.IP.To4().String(), "port", addr.Port, "bufLength", len(buf)) + return 0, fmt.Errorf("not found target node id") } - p.Log.Trace("send to target data", "ip", addr.IP.To4().String(), "port", addr.Port, "bufLength", len(buf)) - _, err := p.DiscV5.TalkRequest(target, string(portalwire.UTPNetwork), buf) - return len(buf), err }) ctx := context.Background() var logger *zap.Logger + if p.Log.Enabled(ctx, log.LevelDebug) || p.Log.Enabled(ctx, log.LevelTrace) { logger, err = zap.NewDevelopmentConfig().Build() } else { @@ -367,6 +360,23 @@ func (p *PortalProtocol) setupDiscV5AndTable() error { return nil } +func (p *PortalProtocol) putCacheNodeId(node *enode.Node) { + p.cachedIdsLock.Lock() + defer p.cachedIdsLock.Unlock() + addr := &net.UDPAddr{IP: node.IP(), Port: node.UDP()} + if _, ok := p.cachedIds[addr.String()]; !ok { + p.cachedIds[addr.String()] = node.ID() + } +} + +func (p *PortalProtocol) putCacheId(id enode.ID, addr *net.UDPAddr) { + p.cachedIdsLock.Lock() + defer p.cachedIdsLock.Unlock() + if _, ok := p.cachedIds[addr.String()]; !ok { + p.cachedIds[addr.String()] = id + } +} + func (p *PortalProtocol) ping(node *enode.Node) (uint64, error) { pong, err := p.pingInner(node) if err != nil { @@ -510,6 +520,9 @@ func (p *PortalProtocol) processOffer(target *enode.Node, resp []byte, request * return nil, fmt.Errorf("invalid accept response") } + p.Log.Info("will process Offer", "id", target.ID(), "ip", target.IP().To4().String(), "port", target.UDP()) + p.putCacheNodeId(target) + accept := &portalwire.Accept{} err = accept.UnmarshalSSZ(resp[1:]) if err != nil { @@ -578,8 +591,8 @@ func (p *PortalProtocol) processOffer(target *enode.Node, resp []byte, request * connctx, conncancel := context.WithTimeout(ctx, defaultUTPConnectTimeout) laddr := p.utp.Addr().(*utp.Addr) raddr := &utp.Addr{IP: target.IP(), Port: target.UDP()} - conn, err = utp.DialUTPOptions("utp", laddr, raddr, utp.WithContext(connctx), utp.WithSocketManager(p.utpSm), utp.WithConnId(uint32(connId))) p.Log.Info("will connect to: ", "addr", raddr.String(), "connId", connId) + conn, err = utp.DialUTPOptions("utp", laddr, raddr, utp.WithContext(connctx), utp.WithSocketManager(p.utpSm), utp.WithConnId(uint32(connId))) if err != nil { conncancel() p.Log.Error("failed to dial utp connection", "err", err) @@ -633,6 +646,9 @@ func (p *PortalProtocol) processContent(target *enode.Node, resp []byte) (byte, return 0xff, nil, fmt.Errorf("invalid content response") } + p.Log.Info("will process content", "id", target.ID(), "ip", target.IP().To4().String(), "port", target.UDP()) + p.putCacheNodeId(target) + switch resp[1] { case portalwire.ContentRawSelector: content := &portalwire.Content{} @@ -657,8 +673,8 @@ func (p *PortalProtocol) processContent(target *enode.Node, resp []byte) (byte, laddr := p.utp.Addr().(*utp.Addr) raddr := &utp.Addr{IP: target.IP(), Port: target.UDP()} connId := binary.BigEndian.Uint16(connIdMsg.Id[:]) - conn, err := utp.DialUTPOptions("utp", laddr, raddr, utp.WithContext(connctx), utp.WithSocketManager(p.utpSm), utp.WithConnId(uint32(connId))) p.Log.Info("will connect to: ", "addr", raddr.String(), "connId", connId) + conn, err := utp.DialUTPOptions("utp", laddr, raddr, utp.WithContext(connctx), utp.WithSocketManager(p.utpSm), utp.WithConnId(uint32(connId))) if err != nil { conncancel() return 0xff, nil, err @@ -784,16 +800,17 @@ func (p *PortalProtocol) handleUtpTalkRequest(id enode.ID, addr *net.UDPAddr, ms if n := p.DiscV5.getNode(id); n != nil { p.table.addSeenNode(wrapNode(n)) } + p.putCacheId(id, addr) p.Log.Trace("receive utp data", "addr", addr, "msg-length", len(msg)) p.packetRouter.ReceiveMessage(msg, addr) return []byte("") } func (p *PortalProtocol) handleTalkRequest(id enode.ID, addr *net.UDPAddr, msg []byte) []byte { - p.Log.Trace("handleTalkRequest", "id", id, "addr", addr) if n := p.DiscV5.getNode(id); n != nil { p.table.addSeenNode(wrapNode(n)) } + p.putCacheId(id, addr) msgCode := msg[0] @@ -958,6 +975,8 @@ func (p *PortalProtocol) handleFindContent(id enode.ID, addr *net.UDPAddr, reque return nil, err } + p.putCacheId(id, addr) + if errors.Is(err, ContentNotFound) { closestNodes := p.findNodesCloseToContent(contentId, portalFindnodesResultLimit) for i, n := range closestNodes { @@ -1027,14 +1046,13 @@ func (p *PortalProtocol) handleFindContent(id enode.ID, addr *net.UDPAddr, reque default: ctx, cancel := context.WithTimeout(bctx, defaultUTPConnectTimeout) var conn *utp.Conn + p.Log.Debug("will accept find content conn from: ", "source", addr, "connId", connId) conn, err = p.utp.AcceptUTPContext(ctx, connIdSend) - p.Log.Info("will accept from: ", "source", addr, "connId", connId) if err != nil { - p.Log.Error("failed to accept utp connection", "connId", connIdSend, "err", err) + p.Log.Error("failed to accept utp connection for handle find content", "connId", connIdSend, "err", err) cancel() return } - p.Log.Info("") cancel() err = conn.SetWriteDeadline(time.Now().Add(defaultUTPWriteTimeout)) @@ -1135,6 +1153,8 @@ func (p *PortalProtocol) handleOffer(id enode.ID, addr *net.UDPAddr, request *po } } + p.putCacheId(id, addr) + idBuffer := make([]byte, 2) if contentKeyBitlist.Count() != 0 { connId := p.connIdGen.GenCid(id, false) @@ -1148,10 +1168,10 @@ func (p *PortalProtocol) handleOffer(id enode.ID, addr *net.UDPAddr, request *po default: ctx, cancel := context.WithTimeout(bctx, defaultUTPConnectTimeout) var conn *utp.Conn + p.Log.Debug("will accept offer conn from: ", "source", addr, "connId", connId) conn, err = p.utp.AcceptUTPContext(ctx, connIdSend) - p.Log.Info("will accept from: ", "source", addr, "connId", connId) if err != nil { - p.Log.Error("failed to accept utp connection", "connId", connIdSend, "err", err) + p.Log.Error("failed to accept utp connection for handle offer", "connId", connIdSend, "err", err) cancel() return }