Skip to content

Commit

Permalink
adapt end2end test to use Dial/Listen API
Browse files Browse the repository at this point in the history
  • Loading branch information
JordiSubira committed May 14, 2024
1 parent 913c83f commit 2e0a8dc
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 126 deletions.
1 change: 0 additions & 1 deletion tools/end2end/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ go_library(
"//pkg/private/serrors:go_default_library",
"//pkg/private/util:go_default_library",
"//pkg/snet:go_default_library",
"//pkg/snet/addrutil:go_default_library",
"//pkg/snet/metrics:go_default_library",
"//pkg/snet/path:go_default_library",
"//private/tracing:go_default_library",
Expand Down
178 changes: 53 additions & 125 deletions tools/end2end/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"flag"
"fmt"
"net"
"net/netip"
"os"
"time"

Expand All @@ -43,7 +42,6 @@ import (
"github.com/scionproto/scion/pkg/private/serrors"
"github.com/scionproto/scion/pkg/private/util"
"github.com/scionproto/scion/pkg/snet"
"github.com/scionproto/scion/pkg/snet/addrutil"
"github.com/scionproto/scion/pkg/snet/metrics"
snetpath "github.com/scionproto/scion/pkg/snet/path"
"github.com/scionproto/scion/private/tracing"
Expand Down Expand Up @@ -144,19 +142,18 @@ func (s server) run() {
PacketConnMetrics: scionPacketConnMetrics,
Topology: sdConn,
}
conn, err := sn.OpenRaw(context.Background(), integration.Local.Host)
conn, err := sn.Listen(context.Background(), "udp", integration.Local.Host)
if err != nil {
integration.LogFatal("Error listening", "err", err)
}
defer conn.Close()
localAddr := conn.LocalAddr().(*net.UDPAddr)
localAddr := conn.LocalAddr().(*snet.UDPAddr)
if len(os.Getenv(libint.GoIntegrationEnv)) > 0 {
// Needed for integration test ready signal.
fmt.Printf("Port=%d\n", localAddr.Port)
fmt.Printf("Port=%d\n", localAddr.Host.Port)
fmt.Printf("%s%s\n\n", libint.ReadySignal, integration.Local.IA)
}
log.Info("Listening", "local", fmt.Sprintf("%v:%d", integration.Local.Host.IP, localAddr.Port))

log.Info("Listening", "local", fmt.Sprintf("%v:%d", localAddr.Host.IP, localAddr.Host.Port))
// Receive ping message
for {
if err := s.handlePing(conn); err != nil {
Expand All @@ -165,26 +162,17 @@ func (s server) run() {
}
}

func (s server) handlePing(conn snet.PacketConn) error {
var p snet.Packet
var ov net.UDPAddr
if err := readFrom(conn, &p, &ov); err != nil {
func (s server) handlePing(conn *snet.Conn) error {
rawPld := make([]byte, common.MaxMTU)
n, clientAddr, err := readFrom(conn, rawPld)
if err != nil {
return serrors.WrapStr("reading packet", err)
}
udp, ok := p.Payload.(snet.UDPPayload)
if !ok {
return serrors.New("unexpected payload received",
"source", p.Source,
"destination", p.Destination,
"type", common.TypeOf(p.Payload),
)
}

var pld Ping
if err := json.Unmarshal(udp.Payload, &pld); err != nil {
if err := json.Unmarshal(rawPld[:n], &pld); err != nil {
return serrors.New("invalid payload contents",
"source", p.Source,
"destination", p.Destination,
"data", string(udp.Payload),
"data", string(rawPld),
)
}

Expand All @@ -205,53 +193,35 @@ func (s server) handlePing(conn snet.PacketConn) error {
tracing.Error(span, err)
return err
}

clientUDPAddr := clientAddr.(*snet.UDPAddr)
if pld.Message != ping || !pld.Server.Equal(integration.Local.IA) {
return withTag(serrors.New("unexpected data in payload",
"source", p.Source,
"destination", p.Destination,
"remote", clientUDPAddr,
"data", pld,
))
}
log.Info(fmt.Sprintf("Ping received from %s:%d, sending pong.", p.Source, udp.SrcPort))
log.Info(fmt.Sprintf("Ping received from %v, sending pong.", clientUDPAddr))
raw, err := json.Marshal(Pong{
Client: p.Source.IA,
Client: clientUDPAddr.IA,
Server: integration.Local.IA,
Message: pong,
Trace: pld.Trace,
})
if err != nil {
return withTag(serrors.WrapStr("packing pong", err))
}

p.Destination, p.Source = p.Source, p.Destination
p.Payload = snet.UDPPayload{
DstPort: udp.SrcPort,
SrcPort: udp.DstPort,
Payload: raw,
}
// reverse path
rpath, ok := p.Path.(snet.RawPath)
if !ok {
return serrors.New("unexpected path", "type", common.TypeOf(p.Path))
}
replypather := snet.DefaultReplyPather{}
replyPath, err := replypather.ReplyPath(rpath)
if err != nil {
return serrors.WrapStr("creating reply path", err)
}
p.Path = replyPath
// Send pong
if err := conn.WriteTo(&p, &ov); err != nil {
if _, err := conn.WriteTo(raw, clientUDPAddr); err != nil {
return withTag(serrors.WrapStr("sending reply", err))
}
log.Info("Sent pong to", "client", p.Destination)
log.Info("Sent pong to", "client", clientUDPAddr)
return nil
}

type client struct {
conn snet.PacketConn
sdConn daemon.Connector
network *snet.SCIONNetwork
conn *snet.Conn
sdConn daemon.Connector

errorPaths map[snet.PathFingerprint]struct{}
}
Expand All @@ -263,24 +233,18 @@ func (c *client) run() int {
defer integration.Done(integration.Local.IA, remote.IA)
c.sdConn = integration.SDConn()
defer c.sdConn.Close()

sn := &snet.SCIONNetwork{
c.network = &snet.SCIONNetwork{
SCMPHandler: snet.DefaultSCMPHandler{
RevocationHandler: daemon.RevHandler{Connector: c.sdConn},
SCMPErrors: scmpErrorsCounter,
},
PacketConnMetrics: scionPacketConnMetrics,
Topology: c.sdConn,
}

var err error
c.conn, err = sn.OpenRaw(context.Background(), integration.Local.Host)
if err != nil {
integration.LogFatal("Unable to listen", "err", err)
}
port := c.conn.LocalAddr().(*net.UDPAddr).Port
log.Info("Send on", "local",
fmt.Sprintf("%v,[%v]:%d", integration.Local.IA, integration.Local.Host.IP, port))
log.Info("Send", "local",
fmt.Sprintf("%v,[%v] -> %v,[%v]",
integration.Local.IA, integration.Local.Host,
remote.IA, remote.Host))
c.errorPaths = make(map[snet.PathFingerprint]struct{})
return integration.AttemptRepeatedly("End2End", c.attemptRequest)
}
Expand Down Expand Up @@ -310,10 +274,12 @@ func (c *client) attemptRequest(n int) bool {
}

// Send ping
if err := c.ping(ctx, n, path); err != nil {
close, err := c.ping(ctx, n, path)
if err != nil {
logger.Error("Could not send packet", "err", withTag(err))
return false
}
defer close()
// Receive pong
if err := c.pong(ctx); err != nil {
logger.Error("Error receiving pong", "err", withTag(err))
Expand All @@ -325,66 +291,33 @@ func (c *client) attemptRequest(n int) bool {
return true
}

func (c *client) ping(ctx context.Context, n int, path snet.Path) error {
func (c *client) ping(ctx context.Context, n int, path snet.Path) (func(), error) {
rawPing, err := json.Marshal(Ping{
Server: remote.IA,
Message: ping,
Trace: tracing.IDFromCtx(ctx),
})
if err != nil {
return serrors.WrapStr("packing ping", err)
}
if err := c.conn.SetWriteDeadline(getDeadline(ctx)); err != nil {
return serrors.WrapStr("setting write deadline", err)
return nil, serrors.WrapStr("packing ping", err)
}
if remote.NextHop == nil {
remote.NextHop = &net.UDPAddr{
IP: remote.Host.IP,
Port: remote.Host.Port,
}
log.FromCtx(ctx).Info("Dialing", "remote", remote)
c.conn, err = c.network.Dial(ctx, "udp", integration.Local.Host, &remote)
if err != nil {
return nil, serrors.WrapStr("dialing conn", err)
}

remoteHostIP, ok := netip.AddrFromSlice(remote.Host.IP)
if !ok {
return serrors.New("invalid remote host IP", "ip", remote.Host.IP)
if err := c.conn.SetWriteDeadline(getDeadline(ctx)); err != nil {
return nil, serrors.WrapStr("setting write deadline", err)
}
localHostIP, ok := netip.AddrFromSlice(integration.Local.Host.IP)
if !ok {
return serrors.New("invalid local host IP", "ip", integration.Local.Host.IP)
log.Info("sending ping", "attempt", n, "remote", c.conn.RemoteAddr())
if _, err := c.conn.Write(rawPing); err != nil {
return nil, err
}
if localHostIP.IsUnspecified() {
resolvedLocal, err := addrutil.ResolveLocal(remote.Host.IP)
if err != nil {
return err
}
localHostIP, ok = netip.AddrFromSlice(resolvedLocal)
if !ok {
return serrors.New("invalid resolved local addr", "ip", resolvedLocal)
closer := func() {
if err := c.conn.Close(); err != nil {
log.Error("Unable to close connection", "err", err)
}
}
pkt := &snet.Packet{
PacketInfo: snet.PacketInfo{
Destination: snet.SCIONAddress{
IA: remote.IA,
Host: addr.HostIP(remoteHostIP),
},
Source: snet.SCIONAddress{
IA: integration.Local.IA,
Host: addr.HostIP(localHostIP),
},
Path: remote.Path,
Payload: snet.UDPPayload{
SrcPort: uint16(c.conn.LocalAddr().(*net.UDPAddr).Port),
DstPort: uint16(remote.Host.Port),
Payload: rawPing,
},
},
}
log.Info("sending ping", "attempt", n, "path", path)
if err := c.conn.WriteTo(pkt, remote.NextHop); err != nil {
return err
}
return nil
return closer, nil
}

func (c *client) getRemote(ctx context.Context, n int) (snet.Path, error) {
Expand Down Expand Up @@ -446,20 +379,15 @@ func (c *client) pong(ctx context.Context) error {
if err := c.conn.SetReadDeadline(getDeadline(ctx)); err != nil {
return serrors.WrapStr("setting read deadline", err)
}
var p snet.Packet
var ov net.UDPAddr
if err := readFrom(c.conn, &p, &ov); err != nil {
rawPld := make([]byte, common.MaxMTU)
n, serverAddr, err := readFrom(c.conn, rawPld)
if err != nil {
return serrors.WrapStr("reading packet", err)
}

udp, ok := p.Payload.(snet.UDPPayload)
if !ok {
return serrors.New("unexpected payload received", "type", common.TypeOf(p.Payload))
}

var pld Pong
if err := json.Unmarshal(udp.Payload, &pld); err != nil {
return serrors.WrapStr("unpacking pong", err, "data", string(udp.Payload))
if err := json.Unmarshal(rawPld[:n], &pld); err != nil {
return serrors.WrapStr("unpacking pong", err, "data", string(rawPld))
}

expected := Pong{
Expand All @@ -470,7 +398,7 @@ func (c *client) pong(ctx context.Context) error {
if pld.Client != expected.Client || pld.Server != expected.Server || pld.Message != pong {
return serrors.New("unexpected contents received", "data", pld, "expected", expected)
}
log.Info("Received pong", "server", p.Source)
log.Info("Received pong", "server", serverAddr)
return nil
}

Expand All @@ -482,14 +410,14 @@ func getDeadline(ctx context.Context) time.Time {
return dl
}

func readFrom(conn snet.PacketConn, pkt *snet.Packet, ov *net.UDPAddr) error {
err := conn.ReadFrom(pkt, ov)
func readFrom(conn *snet.Conn, pld []byte) (int, net.Addr, error) {
n, remoteAddr, err := conn.ReadFrom(pld)
// Attach more context to error
var opErr *snet.OpError
if !(errors.As(err, &opErr) && opErr.RevInfo() != nil) {
return err
return n, remoteAddr, err
}
return serrors.WithCtx(err,
return n, remoteAddr, serrors.WithCtx(err,
"isd_as", opErr.RevInfo().IA(),
"interface", opErr.RevInfo().IfID,
)
Expand Down

0 comments on commit 2e0a8dc

Please sign in to comment.