From e67a1bd54abc77030b78e6a608774eef5020bf74 Mon Sep 17 00:00:00 2001 From: sanaz Date: Tue, 16 Jan 2024 14:54:20 -0800 Subject: [PATCH 01/34] adds tests --- p2p/conn/connection_test.go | 138 ++++++++++++++++++++++++++++++++++++ 1 file changed, 138 insertions(+) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 1cf1f7a0f4..b93f9bb14d 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -6,6 +6,10 @@ import ( "fmt" "math" "net" + "os" + "runtime" + "runtime/pprof" + "sync" "testing" "time" @@ -769,3 +773,137 @@ func stopAll(t *testing.T, stoppers ...stopper) func() { } } } + +func fib(n int) int { + if n < 2 { + return n + } + return fib(n-1) + fib(n-2) +} + +// GenerateMessages generates messages of a given size at specified rate `messagingRate` +// for a given duration `totalDuration`. +func GenerateMessages(mc *MConnection, messagingRate time.Duration, + totalDuration time.Duration, totalSize int, msgSize int, chID byte) { + // all messages have an identical content + msg := bytes.Repeat([]byte{'x'}, msgSize) + + // message generation interval ticker + ticker := time.NewTicker(messagingRate) + defer ticker.Stop() + + // timer for the total duration + timer := time.NewTimer(totalDuration) + defer timer.Stop() + + sentBytes := 0 + // generating messages + for { + select { + case <-ticker.C: + // generate message + if mc.Send(chID, msg) { + sentBytes += msgSize + if sentBytes >= totalSize && totalSize > 0 { + fmt.Println("Completed the message generation") + return + } + } + case <-timer.C: + // time's up + fmt.Println("Completed the message generation") + return + } + } +} + +func BenchmarkMConnection(b *testing.B) { + txSize := 10 + totalNumberOfMessages := 100 + var msgList [100][]byte + for i := 0; i < totalNumberOfMessages; i++ { + msgList[i] = bytes.Repeat([]byte{byte(i)}, txSize) + } + chID := byte(0x01) + SendQueueCapacity := 1 + + b.Run("test capacity", func(b *testing.B) { + cpuFile, _ := os.Create("cpu.pprof") + pprof.StartCPUProfile(cpuFile) + defer pprof.StopCPUProfile() + + f, _ := os.Create("block.pprof") + runtime.SetBlockProfileRate(1) + + for n := 0; n < b.N; n++ { + // set up two nodes + //server, client := NetPipe() + server, client := tcpNetPipe() + defer server.Close() + defer client.Close() + + onReceive := func(chID byte, msgBytes []byte) { + log.TestingLogger().Info("onReceive: received message") + } + onError := func(r interface{}) { + log.TestingLogger().Info("onError: received error") + } + + cnfg := DefaultMConnConfig() + cnfg.SendRate = 500_000_000 // 500 MB/s + cnfg.RecvRate = 500_000_000 // 500 MB/s + chDescs := []*ChannelDescriptor{{ID: chID, Priority: 1, + SendQueueCapacity: SendQueueCapacity}} + clientMconn := NewMConnectionWithConfig(client, chDescs, onReceive, + onError, + cnfg) + serverChDescs := []*ChannelDescriptor{{ID: chID, Priority: 1, + SendQueueCapacity: SendQueueCapacity}} + serverMconn := NewMConnectionWithConfig(server, serverChDescs, + onReceive, + onError, + cnfg) + clientMconn.SetLogger(log.TestingLogger()) + serverMconn.SetLogger(log.TestingLogger()) + + err := clientMconn.Start() + require.Nil(b, err) + defer clientMconn.Stop() //nolint:errcheck // ignore for tests + + err = serverMconn.Start() + require.Nil(b, err) + defer serverMconn.Stop() //nolint:errcheck // ignore for tests + + // blocking call to generate messages + GenerateMessages(clientMconn, 1000*time.Millisecond, + 1*time.Minute, + -1, // unlimited + txSize+100, chID) // this mimics network load of 10KB per second + + pprof.Lookup("block").WriteTo(f, 0) + } + //_, err = server.Read(make([]byte, len(msg))) + //require.NoError(b, err) + }) + + //} +} + +// testPipe creates a pair of connected net. +// Conn objects that can be used in tests. +func tcpNetPipe() (net.Conn, net.Conn) { + ln, _ := net.Listen("tcp", "127.0.0.1:0") + var clientConn net.Conn + var wg sync.WaitGroup + wg.Add(1) + go func(c *net.Conn) { + *c, _ = ln.Accept() + wg.Done() + }(&clientConn) + + serverAddr := ln.Addr().String() + serverConn, _ := net.Dial("tcp", serverAddr) + + wg.Wait() + return serverConn, clientConn // sender, receiver +} From 81f8918ae7214bc1e4d2e6d0cc7cd2ca36c781fd Mon Sep 17 00:00:00 2001 From: sanaz Date: Tue, 16 Jan 2024 14:51:41 -0800 Subject: [PATCH 02/34] adds logs --- p2p/conn/connection.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 9f3f3a615d..72e2856539 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -228,7 +228,9 @@ func (c *MConnection) OnStart() error { c.quitSendRoutine = make(chan struct{}) c.doneSendRoutine = make(chan struct{}) c.quitRecvRoutine = make(chan struct{}) + c.Logger.Debug("Starting sendRoutine") go c.sendRoutine() + c.Logger.Debug("Starting recvRoutine") go c.recvRoutine() return nil } @@ -564,6 +566,7 @@ func (c *MConnection) recvRoutine() { FOR_LOOP: for { + c.Logger.Debug("recvRoutine", "address", c.conn.RemoteAddr()) // Block until .recvMonitor says we can read. c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true) @@ -583,7 +586,6 @@ FOR_LOOP: // Read packet type var packet tmp2p.Packet - _n, err := protoReader.ReadMsg(&packet) c.recvMonitor.Update(_n) if err != nil { @@ -606,6 +608,7 @@ FOR_LOOP: break FOR_LOOP } + c.Logger.Debug("Reading packet") // Read more depending on packet type. switch pkt := packet.Sum.(type) { case *tmp2p.Packet_PacketPing: @@ -636,6 +639,7 @@ FOR_LOOP: msgBytes, err := channel.recvPacketMsg(*pkt.PacketMsg) if err != nil { + c.Logger.Debug("error in reading packet message", "err", err) if c.IsRunning() { c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err) c.stopForError(err) @@ -782,6 +786,8 @@ func (ch *Channel) sendBytes(bytes []byte) bool { select { case ch.sendQueue <- bytes: atomic.AddInt32(&ch.sendQueueSize, 1) + ch.Logger.Debug("send queue size changed", "size", + ch.loadSendQueueSize()) return true case <-time.After(defaultSendTimeout): return false @@ -795,6 +801,7 @@ func (ch *Channel) trySendBytes(bytes []byte) bool { select { case ch.sendQueue <- bytes: atomic.AddInt32(&ch.sendQueueSize, 1) + ch.Logger.Debug("send queue size changed", "size", ch.loadSendQueueSize()) return true default: return false @@ -835,6 +842,7 @@ func (ch *Channel) nextPacketMsg() tmp2p.PacketMsg { packet.EOF = true ch.sending = nil atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize + ch.Logger.Debug("send queue size changed", "size", ch.loadSendQueueSize()) } else { packet.EOF = false ch.sending = ch.sending[cmtmath.MinInt(maxSize, len(ch.sending)):] From 02df8e7e13f3b00f055b2c6d54fc4acd7cd094bf Mon Sep 17 00:00:00 2001 From: sanaz Date: Tue, 16 Jan 2024 17:12:58 -0800 Subject: [PATCH 03/34] includes a lot of updates on the message generation load control and comments --- p2p/conn/connection_test.go | 69 +++++++++++++++++++++---------------- 1 file changed, 39 insertions(+), 30 deletions(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index b93f9bb14d..315bcaa88f 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -6,9 +6,6 @@ import ( "fmt" "math" "net" - "os" - "runtime" - "runtime/pprof" "sync" "testing" "time" @@ -782,9 +779,11 @@ func fib(n int) int { } // GenerateMessages generates messages of a given size at specified rate `messagingRate` -// for a given duration `totalDuration`. +// for a given duration `totalDuration` until the total number of messages +// `totalNum` is reached. If `totalNum` is less than zero, +// then the message generation continues until the `totalDuration` is reached. func GenerateMessages(mc *MConnection, messagingRate time.Duration, - totalDuration time.Duration, totalSize int, msgSize int, chID byte) { + totalDuration time.Duration, totalNum int, msgSize int, chID byte) { // all messages have an identical content msg := bytes.Repeat([]byte{'x'}, msgSize) @@ -796,44 +795,41 @@ func GenerateMessages(mc *MConnection, messagingRate time.Duration, timer := time.NewTimer(totalDuration) defer timer.Stop() - sentBytes := 0 + sentNum := 0 // generating messages for { select { case <-ticker.C: // generate message if mc.Send(chID, msg) { - sentBytes += msgSize - if sentBytes >= totalSize && totalSize > 0 { - fmt.Println("Completed the message generation") + sentNum++ + if totalNum > 0 && sentNum >= totalNum { + log.TestingLogger().Info("Completed the message generation as the" + + " total number of messages is reached") return } } case <-timer.C: // time's up - fmt.Println("Completed the message generation") + log.TestingLogger().Info("Completed the message generation as the total " + "duration is reached") return } } } func BenchmarkMConnection(b *testing.B) { - txSize := 10 - totalNumberOfMessages := 100 - var msgList [100][]byte - for i := 0; i < totalNumberOfMessages; i++ { - msgList[i] = bytes.Repeat([]byte{byte(i)}, txSize) - } + msgSize := 1024 // in bytes + totalMsg := 100 // total number of messages to be sent chID := byte(0x01) - SendQueueCapacity := 1 + SendQueueCapacity := 10 // in messages b.Run("test capacity", func(b *testing.B) { - cpuFile, _ := os.Create("cpu.pprof") - pprof.StartCPUProfile(cpuFile) - defer pprof.StopCPUProfile() + //cpuFile, _ := os.Create("cpu.pprof") + //pprof.StartCPUProfile(cpuFile) + //defer pprof.StopCPUProfile() - f, _ := os.Create("block.pprof") - runtime.SetBlockProfileRate(1) + //f, _ := os.Create("block.pprof") + //runtime.SetBlockProfileRate(1) for n := 0; n < b.N; n++ { // set up two nodes @@ -842,16 +838,24 @@ func BenchmarkMConnection(b *testing.B) { defer server.Close() defer client.Close() + // prepare call backs to receive messages + allReceived := make(chan bool) + receivedLoad := 0 // in messages onReceive := func(chID byte, msgBytes []byte) { + receivedLoad++ log.TestingLogger().Info("onReceive: received message") + if receivedLoad >= totalMsg && totalMsg > 0 { + log.TestingLogger().Info("onReceive: received all messages") + allReceived <- true + } } onError := func(r interface{}) { log.TestingLogger().Info("onError: received error") } cnfg := DefaultMConnConfig() - cnfg.SendRate = 500_000_000 // 500 MB/s - cnfg.RecvRate = 500_000_000 // 500 MB/s + cnfg.SendRate = 500 * 1024 // 500 KB/s + cnfg.RecvRate = 500 * 1024 // 500 KB/s chDescs := []*ChannelDescriptor{{ID: chID, Priority: 1, SendQueueCapacity: SendQueueCapacity}} clientMconn := NewMConnectionWithConfig(client, chDescs, onReceive, @@ -866,6 +870,7 @@ func BenchmarkMConnection(b *testing.B) { clientMconn.SetLogger(log.TestingLogger()) serverMconn.SetLogger(log.TestingLogger()) + //b.ResetTimer() err := clientMconn.Start() require.Nil(b, err) defer clientMconn.Stop() //nolint:errcheck // ignore for tests @@ -874,14 +879,18 @@ func BenchmarkMConnection(b *testing.B) { require.Nil(b, err) defer serverMconn.Stop() //nolint:errcheck // ignore for tests - // blocking call to generate messages - GenerateMessages(clientMconn, 1000*time.Millisecond, - 1*time.Minute, - -1, // unlimited - txSize+100, chID) // this mimics network load of 10KB per second + // generate messages, is a blocking call + go GenerateMessages(clientMconn, + 1*time.Millisecond, // the messaging rate + 1*time.Minute, // the total duration + totalMsg, // unlimited + msgSize, chID) // this mimics network load of 10KB per second + + // wait for all messages to be received + assert.True(b, <-allReceived) - pprof.Lookup("block").WriteTo(f, 0) } + //pprof.Lookup("block").WriteTo(f, 0) //_, err = server.Read(make([]byte, len(msg))) //require.NoError(b, err) }) From 2aee45b923ec0ad999c4601c979e8b036bbf6e53 Mon Sep 17 00:00:00 2001 From: sanaz Date: Tue, 16 Jan 2024 17:46:54 -0800 Subject: [PATCH 04/34] updates the test setting and adds comments --- p2p/conn/connection_test.go | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 315bcaa88f..fc6f351f6d 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -818,12 +818,12 @@ func GenerateMessages(mc *MConnection, messagingRate time.Duration, } func BenchmarkMConnection(b *testing.B) { - msgSize := 1024 // in bytes - totalMsg := 100 // total number of messages to be sent + msgSize := 50 * 1024 // in bytes + totalMsg := 100 // total number of messages to be sent chID := byte(0x01) - SendQueueCapacity := 10 // in messages + SendQueueCapacity := 100 // in messages - b.Run("test capacity", func(b *testing.B) { + b.Run("benchmark MConnection", func(b *testing.B) { //cpuFile, _ := os.Create("cpu.pprof") //pprof.StartCPUProfile(cpuFile) //defer pprof.StopCPUProfile() @@ -832,7 +832,7 @@ func BenchmarkMConnection(b *testing.B) { //runtime.SetBlockProfileRate(1) for n := 0; n < b.N; n++ { - // set up two nodes + // set up two networked connections //server, client := NetPipe() server, client := tcpNetPipe() defer server.Close() @@ -840,7 +840,7 @@ func BenchmarkMConnection(b *testing.B) { // prepare call backs to receive messages allReceived := make(chan bool) - receivedLoad := 0 // in messages + receivedLoad := 0 // number of messages received onReceive := func(chID byte, msgBytes []byte) { receivedLoad++ log.TestingLogger().Info("onReceive: received message") @@ -870,7 +870,6 @@ func BenchmarkMConnection(b *testing.B) { clientMconn.SetLogger(log.TestingLogger()) serverMconn.SetLogger(log.TestingLogger()) - //b.ResetTimer() err := clientMconn.Start() require.Nil(b, err) defer clientMconn.Stop() //nolint:errcheck // ignore for tests @@ -881,13 +880,13 @@ func BenchmarkMConnection(b *testing.B) { // generate messages, is a blocking call go GenerateMessages(clientMconn, - 1*time.Millisecond, // the messaging rate - 1*time.Minute, // the total duration - totalMsg, // unlimited - msgSize, chID) // this mimics network load of 10KB per second + 1*time.Second, // the messaging rate + 1*time.Minute, // the total duration + totalMsg, // unlimited + msgSize, chID) // this mimics network load of 10KB per second // wait for all messages to be received - assert.True(b, <-allReceived) + <-allReceived } //pprof.Lookup("block").WriteTo(f, 0) @@ -895,7 +894,6 @@ func BenchmarkMConnection(b *testing.B) { //require.NoError(b, err) }) - //} } // testPipe creates a pair of connected net. From e1c3359b06890835e6ed5d9dc233170161451701 Mon Sep 17 00:00:00 2001 From: sanaz Date: Tue, 16 Jan 2024 17:56:40 -0800 Subject: [PATCH 05/34] edits the messaging rate --- p2p/conn/connection_test.go | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index fc6f351f6d..d43267ad2e 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -818,10 +818,10 @@ func GenerateMessages(mc *MConnection, messagingRate time.Duration, } func BenchmarkMConnection(b *testing.B) { - msgSize := 50 * 1024 // in bytes - totalMsg := 100 // total number of messages to be sent + msgSize := 5 * 1024 // in bytes + totalMsg := 100 // total number of messages to be sent chID := byte(0x01) - SendQueueCapacity := 100 // in messages + SendQueueCapacity := 10 // in messages b.Run("benchmark MConnection", func(b *testing.B) { //cpuFile, _ := os.Create("cpu.pprof") @@ -854,8 +854,8 @@ func BenchmarkMConnection(b *testing.B) { } cnfg := DefaultMConnConfig() - cnfg.SendRate = 500 * 1024 // 500 KB/s - cnfg.RecvRate = 500 * 1024 // 500 KB/s + cnfg.SendRate = 500 * 1024 * 1024 // 500 MB/s + cnfg.RecvRate = 500 * 1024 * 1024 // 500 MB/s chDescs := []*ChannelDescriptor{{ID: chID, Priority: 1, SendQueueCapacity: SendQueueCapacity}} clientMconn := NewMConnectionWithConfig(client, chDescs, onReceive, @@ -880,18 +880,17 @@ func BenchmarkMConnection(b *testing.B) { // generate messages, is a blocking call go GenerateMessages(clientMconn, - 1*time.Second, // the messaging rate - 1*time.Minute, // the total duration - totalMsg, // unlimited - msgSize, chID) // this mimics network load of 10KB per second + 1*time.Millisecond, // the messaging rate + 1*time.Minute, // the total duration + totalMsg, // unlimited + msgSize, chID) // this mimics network load of 10KB per second // wait for all messages to be received <-allReceived } //pprof.Lookup("block").WriteTo(f, 0) - //_, err = server.Read(make([]byte, len(msg))) - //require.NoError(b, err) + }) } @@ -900,17 +899,18 @@ func BenchmarkMConnection(b *testing.B) { // Conn objects that can be used in tests. func tcpNetPipe() (net.Conn, net.Conn) { ln, _ := net.Listen("tcp", "127.0.0.1:0") - var clientConn net.Conn + var conn1 net.Conn var wg sync.WaitGroup wg.Add(1) go func(c *net.Conn) { *c, _ = ln.Accept() wg.Done() - }(&clientConn) + }(&conn1) - serverAddr := ln.Addr().String() - serverConn, _ := net.Dial("tcp", serverAddr) + addr := ln.Addr().String() + conn2, _ := net.Dial("tcp", addr) wg.Wait() - return serverConn, clientConn // sender, receiver + + return conn2, conn1 } From 695b49f19ff03f716849669a0f3e7889044d4bf8 Mon Sep 17 00:00:00 2001 From: sanaz Date: Wed, 17 Jan 2024 08:13:40 -0800 Subject: [PATCH 06/34] removes a debugging remnants --- p2p/conn/connection_test.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index d43267ad2e..b3f12c088b 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -771,13 +771,6 @@ func stopAll(t *testing.T, stoppers ...stopper) func() { } } -func fib(n int) int { - if n < 2 { - return n - } - return fib(n-1) + fib(n-2) -} - // GenerateMessages generates messages of a given size at specified rate `messagingRate` // for a given duration `totalDuration` until the total number of messages // `totalNum` is reached. If `totalNum` is less than zero, From 58e7beb877b81c4ddf8d1d2746678c80cf4b9db1 Mon Sep 17 00:00:00 2001 From: sanaz Date: Wed, 17 Jan 2024 08:23:21 -0800 Subject: [PATCH 07/34] removes receiver callback functions from the client conn --- p2p/conn/connection_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index b3f12c088b..224ccb5080 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -851,8 +851,9 @@ func BenchmarkMConnection(b *testing.B) { cnfg.RecvRate = 500 * 1024 * 1024 // 500 MB/s chDescs := []*ChannelDescriptor{{ID: chID, Priority: 1, SendQueueCapacity: SendQueueCapacity}} - clientMconn := NewMConnectionWithConfig(client, chDescs, onReceive, - onError, + clientMconn := NewMConnectionWithConfig(client, chDescs, + func(chID byte, msgBytes []byte) {}, + func(r interface{}) {}, cnfg) serverChDescs := []*ChannelDescriptor{{ID: chID, Priority: 1, SendQueueCapacity: SendQueueCapacity}} From e1dd65ac8ce35a5c4daf97f5e713f61774ea068e Mon Sep 17 00:00:00 2001 From: sanaz Date: Wed, 17 Jan 2024 11:57:07 -0800 Subject: [PATCH 08/34] includes more test cases --- p2p/conn/connection_test.go | 274 ++++++++++++++++++++++++++---------- 1 file changed, 203 insertions(+), 71 deletions(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 224ccb5080..0064f8df33 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -775,7 +775,8 @@ func stopAll(t *testing.T, stoppers ...stopper) func() { // for a given duration `totalDuration` until the total number of messages // `totalNum` is reached. If `totalNum` is less than zero, // then the message generation continues until the `totalDuration` is reached. -func GenerateMessages(mc *MConnection, messagingRate time.Duration, +func GenerateMessages(mc *MConnection, + messagingRate time.Duration, totalDuration time.Duration, totalNum int, msgSize int, chID byte) { // all messages have an identical content msg := bytes.Repeat([]byte{'x'}, msgSize) @@ -811,81 +812,212 @@ func GenerateMessages(mc *MConnection, messagingRate time.Duration, } func BenchmarkMConnection(b *testing.B) { - msgSize := 5 * 1024 // in bytes - totalMsg := 100 // total number of messages to be sent chID := byte(0x01) - SendQueueCapacity := 10 // in messages - - b.Run("benchmark MConnection", func(b *testing.B) { - //cpuFile, _ := os.Create("cpu.pprof") - //pprof.StartCPUProfile(cpuFile) - //defer pprof.StopCPUProfile() - - //f, _ := os.Create("block.pprof") - //runtime.SetBlockProfileRate(1) - - for n := 0; n < b.N; n++ { - // set up two networked connections - //server, client := NetPipe() - server, client := tcpNetPipe() - defer server.Close() - defer client.Close() - - // prepare call backs to receive messages - allReceived := make(chan bool) - receivedLoad := 0 // number of messages received - onReceive := func(chID byte, msgBytes []byte) { - receivedLoad++ - log.TestingLogger().Info("onReceive: received message") - if receivedLoad >= totalMsg && totalMsg > 0 { - log.TestingLogger().Info("onReceive: received all messages") - allReceived <- true + + tests := []struct { + name string + msgSize int // size of each message in bytes + totalMsg int // total number of messages to be sent + sendQueueCapacity int // send queue capacity + messagingRate time.Duration + totalDuration time.Duration + sendRate int64 + recRate int64 + }{ + { + // in this test case, one message of size 1KB is sent every 20ms, + // resulting in 50 messages per second i.e., 50KB/s + // the time taken to send 50 messages ideally should be 1 seconds + name: "queue capacity = 1, " + + " total load = 50 KB, " + + "traffic rate = send rate", + msgSize: 1 * 1024, + totalMsg: 1 * 50, + sendQueueCapacity: 1, + messagingRate: 20 * time.Millisecond, + totalDuration: 1 * time.Minute, + sendRate: 50 * 1024, + recRate: 50 * 1024, + }, + { + // in this test case, one message of size 1KB is sent every 20ms, + // resulting in 50 messages per second i.e., 50KB/s + // the time taken to send 50 messages ideally should be 1 seconds + name: "queue capacity = 50, " + + " total load = 50 KB, " + + "traffic rate = send rate", + msgSize: 1 * 1024, + totalMsg: 1 * 50, + sendQueueCapacity: 50, + messagingRate: 20 * time.Millisecond, + totalDuration: 1 * time.Minute, + sendRate: 50 * 1024, + recRate: 50 * 1024, + }, + { + // in this test case, one message of size 1KB is sent every 20ms, + // resulting in 50 messages per second i.e., 50KB/s + // the time taken to send 50 messages ideally should be 1 seconds + name: "queue capacity = 100, " + + " total load = 50 KB, " + + "traffic rate = send rate", + msgSize: 1 * 1024, + totalMsg: 1 * 50, + sendQueueCapacity: 100, + messagingRate: 20 * time.Millisecond, + totalDuration: 1 * time.Minute, + sendRate: 50 * 1024, + recRate: 50 * 1024, + }, + { + // in this test case, one message of size 1KB is sent every 20ms, + // resulting in 50 messages per second i.e., 50KB/s + // the sending operation continues for 100 messages + // the time taken to send 100 messages is expected to be 2 times + // more than the first test case, and ideally should be 2 seconds + name: "queue capacity = 100, " + + " total load = 2 * 50 KB, " + + "traffic rate = send rate", + msgSize: 1 * 1024, + totalMsg: 2 * 50, + sendQueueCapacity: 10, + messagingRate: 20 * time.Millisecond, + totalDuration: 1 * time.Minute, + sendRate: 50 * 1024, + recRate: 50 * 1024, + }, + { + // in this test case, one message of size 1KB is sent every 20ms, + // resulting in 50 messages per second i.e., 50KB/s + // the sending operation continues for 400 messages + // the time taken to send 400 messages is expected to be 8 times + // more than the first test case, and ideally should be 8 seconds + name: "queue capacity = 100, " + + " total load = 8 * 50 KB, " + + "traffic rate = send rate", + msgSize: 1 * 1024, + totalMsg: 8 * 50, + sendQueueCapacity: 10, + messagingRate: 20 * time.Millisecond, + totalDuration: 1 * time.Minute, + sendRate: 50 * 1024, + recRate: 50 * 1024, + }, + { + // in this test case, one message of size 1KB is sent every 20ms, + // resulting in 50 messages per second i.e., 50KB/s + // the sending operation continues for 400 messages + // the time taken to send 400 messages is expected to be 8 times + // more than the first test case, and ideally should be 8 seconds + name: "queue capacity = 100, " + + " total load = 8 * 50 KB, " + + "traffic rate = 2 * send rate", + msgSize: 1 * 1024, + totalMsg: 8 * 50, + sendQueueCapacity: 10, + messagingRate: 10 * time.Millisecond, + totalDuration: 1 * time.Minute, + sendRate: 50 * 1024, + recRate: 50 * 1024, + }, + { + // in this test case, one message of size 1KB is sent every 20ms, + // resulting in 50 messages per second i.e., 50KB/s + // the sending operation continues for 400 messages + // the time taken to send 400 messages is expected to be 8 times + // more than the first test case, and ideally should be 8 seconds + name: "queue capacity = 100, " + + " total load = 8 * 50 KB, " + + "traffic rate = 10 * send rate", + msgSize: 1 * 1024, + totalMsg: 8 * 50, + sendQueueCapacity: 10, + messagingRate: 2 * time.Millisecond, + totalDuration: 1 * time.Minute, + sendRate: 50 * 1024, + recRate: 50 * 1024, + }, + } + + for _, tt := range tests { + b.Run(tt.name, func(b *testing.B) { + //cpuFile, _ := os.Create("cpu.pprof") + //pprof.StartCPUProfile(cpuFile) + //defer pprof.StopCPUProfile() + + //f, _ := os.Create("block.pprof") + //runtime.SetBlockProfileRate(1) + + for n := 0; n < b.N; n++ { + // set up two networked connections + //server, client := NetPipe() + server, client := tcpNetPipe() + defer server.Close() + defer client.Close() + + // prepare call backs to receive messages + allReceived := make(chan bool) + receivedLoad := 0 // number of messages received + onReceive := func(chID byte, msgBytes []byte) { + receivedLoad++ + //log.TestingLogger().Info("onReceive: received message") + if receivedLoad >= tt.totalMsg && tt.totalMsg > 0 { + //log.TestingLogger().Info("onReceive: received all messages") + allReceived <- true + } + } + onError := func(r interface{}) { + //log.TestingLogger().Info("onError: received error") } - } - onError := func(r interface{}) { - log.TestingLogger().Info("onError: received error") - } - cnfg := DefaultMConnConfig() - cnfg.SendRate = 500 * 1024 * 1024 // 500 MB/s - cnfg.RecvRate = 500 * 1024 * 1024 // 500 MB/s - chDescs := []*ChannelDescriptor{{ID: chID, Priority: 1, - SendQueueCapacity: SendQueueCapacity}} - clientMconn := NewMConnectionWithConfig(client, chDescs, - func(chID byte, msgBytes []byte) {}, - func(r interface{}) {}, - cnfg) - serverChDescs := []*ChannelDescriptor{{ID: chID, Priority: 1, - SendQueueCapacity: SendQueueCapacity}} - serverMconn := NewMConnectionWithConfig(server, serverChDescs, - onReceive, - onError, - cnfg) - clientMconn.SetLogger(log.TestingLogger()) - serverMconn.SetLogger(log.TestingLogger()) - - err := clientMconn.Start() - require.Nil(b, err) - defer clientMconn.Stop() //nolint:errcheck // ignore for tests - - err = serverMconn.Start() - require.Nil(b, err) - defer serverMconn.Stop() //nolint:errcheck // ignore for tests - - // generate messages, is a blocking call - go GenerateMessages(clientMconn, - 1*time.Millisecond, // the messaging rate - 1*time.Minute, // the total duration - totalMsg, // unlimited - msgSize, chID) // this mimics network load of 10KB per second - - // wait for all messages to be received - <-allReceived + cnfg := DefaultMConnConfig() + cnfg.SendRate = 50 * 1024 // 500 KB/s + cnfg.RecvRate = 50 * 1024 // 500 KB/s + chDescs := []*ChannelDescriptor{{ID: chID, Priority: 1, + SendQueueCapacity: tt.sendQueueCapacity}} + clientMconn := NewMConnectionWithConfig(client, chDescs, + func(chID byte, msgBytes []byte) {}, + func(r interface{}) {}, + cnfg) + serverChDescs := []*ChannelDescriptor{{ID: chID, Priority: 1, + SendQueueCapacity: tt.sendQueueCapacity}} + serverMconn := NewMConnectionWithConfig(server, serverChDescs, + onReceive, + onError, + cnfg) + clientMconn.SetLogger(log.TestingLogger()) + serverMconn.SetLogger(log.TestingLogger()) + + err := clientMconn.Start() + require.Nil(b, err) + defer func() { + _ = clientMconn.Stop() + }() + err = serverMconn.Start() + require.Nil(b, err) + defer func() { + _ = serverMconn.Stop() + }() + + b.StartTimer() + // generate messages, is a blocking call + go GenerateMessages(clientMconn, + tt.messagingRate, // the messaging rate + tt.totalDuration, // the total duration + tt.totalMsg, // unlimited + tt.msgSize, chID) // this mimics network load of 10KB per + // second + + // wait for all messages to be received + <-allReceived + b.StopTimer() - } - //pprof.Lookup("block").WriteTo(f, 0) + } + //pprof.Lookup("block").WriteTo(f, 0) - }) + }) + + } } From 593528a85f96f36e1efe70b664690671218c283a Mon Sep 17 00:00:00 2001 From: sanaz Date: Wed, 17 Jan 2024 15:40:54 -0800 Subject: [PATCH 09/34] adds more documentation pertaining test scenarios --- p2p/conn/connection_test.go | 84 ++++++++++++++++++++++--------------- 1 file changed, 51 insertions(+), 33 deletions(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 0064f8df33..6d6c41e49e 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -814,23 +814,38 @@ func GenerateMessages(mc *MConnection, func BenchmarkMConnection(b *testing.B) { chID := byte(0x01) + // Testcases 1-3 are designed to assess the impact of send queue capacity on + // message transmission delay. + + // Testcases 3-5 are focused on evaluating whether the maximum bandwidth can + // be fully utilized. This will be tested by increasing the total + // load while maintaining consistent send and receive rates. The expectation is + // that the time taken to transmit the messages will increase proportionally + // with the total load and the exact time should be close to the totalMsg*msgSize/ sendRate. + + // Testcases 7-9 are aimed at verifying that increasing the message rate beyond + // the available bandwidth does not lead to a reduction in transmission + // delay. The delay is expected to be the same for all the testcases. + tests := []struct { name string - msgSize int // size of each message in bytes - totalMsg int // total number of messages to be sent - sendQueueCapacity int // send queue capacity - messagingRate time.Duration - totalDuration time.Duration - sendRate int64 - recRate int64 + msgSize int // size of each message in bytes + totalMsg int // total number of messages to be sent + messagingRate time.Duration // rate at which messages are sent + totalDuration time.Duration // total duration for which messages are sent + sendQueueCapacity int // send queue capacity i.e., the number of messages that can be buffered + sendRate int64 // send rate in bytes per second + recRate int64 // receive rate in bytes per second }{ { + // testcase 1 // in this test case, one message of size 1KB is sent every 20ms, // resulting in 50 messages per second i.e., 50KB/s - // the time taken to send 50 messages ideally should be 1 seconds + // the time taken to send 50 messages ideally should be 1 second + // the sendQueueCapacity should have no impact on the transmission delay name: "queue capacity = 1, " + - " total load = 50 KB, " + - "traffic rate = send rate", + "total load = 50 KB, " + + "msg rate = send rate", msgSize: 1 * 1024, totalMsg: 1 * 50, sendQueueCapacity: 1, @@ -840,11 +855,13 @@ func BenchmarkMConnection(b *testing.B) { recRate: 50 * 1024, }, { + // testcase 2 // in this test case, one message of size 1KB is sent every 20ms, // resulting in 50 messages per second i.e., 50KB/s - // the time taken to send 50 messages ideally should be 1 seconds + // the time taken to send 50 messages ideally should be 1 second + // the sendQueueCapacity should have no impact on the transmission delay name: "queue capacity = 50, " + - " total load = 50 KB, " + + "total load = 50 KB, " + "traffic rate = send rate", msgSize: 1 * 1024, totalMsg: 1 * 50, @@ -855,11 +872,13 @@ func BenchmarkMConnection(b *testing.B) { recRate: 50 * 1024, }, { + // testcase 3 // in this test case, one message of size 1KB is sent every 20ms, // resulting in 50 messages per second i.e., 50KB/s - // the time taken to send 50 messages ideally should be 1 seconds + // the time taken to send 50 messages ideally should be 1 second + // the sendQueueCapacity should have no impact on the transmission delay name: "queue capacity = 100, " + - " total load = 50 KB, " + + "total load = 50 KB, " + "traffic rate = send rate", msgSize: 1 * 1024, totalMsg: 1 * 50, @@ -870,11 +889,11 @@ func BenchmarkMConnection(b *testing.B) { recRate: 50 * 1024, }, { + // testcase 4 // in this test case, one message of size 1KB is sent every 20ms, // resulting in 50 messages per second i.e., 50KB/s // the sending operation continues for 100 messages - // the time taken to send 100 messages is expected to be 2 times - // more than the first test case, and ideally should be 2 seconds + // the time taken to send 100 messages is expected to be 2 seconds name: "queue capacity = 100, " + " total load = 2 * 50 KB, " + "traffic rate = send rate", @@ -887,11 +906,11 @@ func BenchmarkMConnection(b *testing.B) { recRate: 50 * 1024, }, { + // testcase 5 // in this test case, one message of size 1KB is sent every 20ms, // resulting in 50 messages per second i.e., 50KB/s // the sending operation continues for 400 messages - // the time taken to send 400 messages is expected to be 8 times - // more than the first test case, and ideally should be 8 seconds + // the time taken to send 400 messages is expected to be 8 seconds name: "queue capacity = 100, " + " total load = 8 * 50 KB, " + "traffic rate = send rate", @@ -904,11 +923,11 @@ func BenchmarkMConnection(b *testing.B) { recRate: 50 * 1024, }, { - // in this test case, one message of size 1KB is sent every 20ms, - // resulting in 50 messages per second i.e., 50KB/s + // testcase 6 + // in this test case, one message of size 1KB is sent every 10ms, + // resulting in 100 messages per second i.e., 100KB/s // the sending operation continues for 400 messages - // the time taken to send 400 messages is expected to be 8 times - // more than the first test case, and ideally should be 8 seconds + // the time taken to send 400 messages is expected to be 8 seconds name: "queue capacity = 100, " + " total load = 8 * 50 KB, " + "traffic rate = 2 * send rate", @@ -921,11 +940,11 @@ func BenchmarkMConnection(b *testing.B) { recRate: 50 * 1024, }, { - // in this test case, one message of size 1KB is sent every 20ms, - // resulting in 50 messages per second i.e., 50KB/s + // testcase 7 + // in this test case, one message of size 1KB is sent every 2ms, + // resulting in 500 messages per second i.e., 500KB/s // the sending operation continues for 400 messages - // the time taken to send 400 messages is expected to be 8 times - // more than the first test case, and ideally should be 8 seconds + // the time taken to send 400 messages is expected to be 8 seconds name: "queue capacity = 100, " + " total load = 8 * 50 KB, " + "traffic rate = 10 * send rate", @@ -949,8 +968,8 @@ func BenchmarkMConnection(b *testing.B) { //runtime.SetBlockProfileRate(1) for n := 0; n < b.N; n++ { - // set up two networked connections - //server, client := NetPipe() + // set up two networked connections + // server, client := NetPipe() server, client := tcpNetPipe() defer server.Close() defer client.Close() @@ -1002,11 +1021,10 @@ func BenchmarkMConnection(b *testing.B) { b.StartTimer() // generate messages, is a blocking call go GenerateMessages(clientMconn, - tt.messagingRate, // the messaging rate - tt.totalDuration, // the total duration - tt.totalMsg, // unlimited - tt.msgSize, chID) // this mimics network load of 10KB per - // second + tt.messagingRate, + tt.totalDuration, + tt.totalMsg, + tt.msgSize, chID) // wait for all messages to be received <-allReceived From 46359da43a1f3c9d6c731b68a894e22e9ff86d16 Mon Sep 17 00:00:00 2001 From: sanaz Date: Wed, 17 Jan 2024 15:47:44 -0800 Subject: [PATCH 10/34] removes onError func definition --- p2p/conn/connection_test.go | 43 +++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 6d6c41e49e..e7b6b3227c 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -824,8 +824,8 @@ func BenchmarkMConnection(b *testing.B) { // with the total load and the exact time should be close to the totalMsg*msgSize/ sendRate. // Testcases 7-9 are aimed at verifying that increasing the message rate beyond - // the available bandwidth does not lead to a reduction in transmission - // delay. The delay is expected to be the same for all the testcases. + // the available bandwidth does not lead to a reduction or change in + // transmission delay. The delay is expected to be the same for all the testcases. tests := []struct { name string @@ -841,7 +841,7 @@ func BenchmarkMConnection(b *testing.B) { // testcase 1 // in this test case, one message of size 1KB is sent every 20ms, // resulting in 50 messages per second i.e., 50KB/s - // the time taken to send 50 messages ideally should be 1 second + // the time taken to send 50 messages ideally should be ~ 1 second // the sendQueueCapacity should have no impact on the transmission delay name: "queue capacity = 1, " + "total load = 50 KB, " + @@ -858,7 +858,7 @@ func BenchmarkMConnection(b *testing.B) { // testcase 2 // in this test case, one message of size 1KB is sent every 20ms, // resulting in 50 messages per second i.e., 50KB/s - // the time taken to send 50 messages ideally should be 1 second + // the time taken to send 50 messages ideally should be ~ 1 second // the sendQueueCapacity should have no impact on the transmission delay name: "queue capacity = 50, " + "total load = 50 KB, " + @@ -875,7 +875,7 @@ func BenchmarkMConnection(b *testing.B) { // testcase 3 // in this test case, one message of size 1KB is sent every 20ms, // resulting in 50 messages per second i.e., 50KB/s - // the time taken to send 50 messages ideally should be 1 second + // the time taken to send 50 messages ideally should be ~ 1 second // the sendQueueCapacity should have no impact on the transmission delay name: "queue capacity = 100, " + "total load = 50 KB, " + @@ -893,9 +893,9 @@ func BenchmarkMConnection(b *testing.B) { // in this test case, one message of size 1KB is sent every 20ms, // resulting in 50 messages per second i.e., 50KB/s // the sending operation continues for 100 messages - // the time taken to send 100 messages is expected to be 2 seconds + // the time taken to send 100 messages is expected to be ~ 2 seconds name: "queue capacity = 100, " + - " total load = 2 * 50 KB, " + + "total load = 2 * 50 KB, " + "traffic rate = send rate", msgSize: 1 * 1024, totalMsg: 2 * 50, @@ -910,9 +910,9 @@ func BenchmarkMConnection(b *testing.B) { // in this test case, one message of size 1KB is sent every 20ms, // resulting in 50 messages per second i.e., 50KB/s // the sending operation continues for 400 messages - // the time taken to send 400 messages is expected to be 8 seconds + // the time taken to send 400 messages is expected to be ~ 8 seconds name: "queue capacity = 100, " + - " total load = 8 * 50 KB, " + + "total load = 8 * 50 KB, " + "traffic rate = send rate", msgSize: 1 * 1024, totalMsg: 8 * 50, @@ -927,9 +927,9 @@ func BenchmarkMConnection(b *testing.B) { // in this test case, one message of size 1KB is sent every 10ms, // resulting in 100 messages per second i.e., 100KB/s // the sending operation continues for 400 messages - // the time taken to send 400 messages is expected to be 8 seconds + // the time taken to send 400 messages is expected to be ~ 8 seconds name: "queue capacity = 100, " + - " total load = 8 * 50 KB, " + + "total load = 8 * 50 KB, " + "traffic rate = 2 * send rate", msgSize: 1 * 1024, totalMsg: 8 * 50, @@ -944,9 +944,9 @@ func BenchmarkMConnection(b *testing.B) { // in this test case, one message of size 1KB is sent every 2ms, // resulting in 500 messages per second i.e., 500KB/s // the sending operation continues for 400 messages - // the time taken to send 400 messages is expected to be 8 seconds + // the time taken to send 400 messages is expected to be ~ 8 seconds name: "queue capacity = 100, " + - " total load = 8 * 50 KB, " + + "total load = 8 * 50 KB, " + "traffic rate = 10 * send rate", msgSize: 1 * 1024, totalMsg: 8 * 50, @@ -969,25 +969,20 @@ func BenchmarkMConnection(b *testing.B) { for n := 0; n < b.N; n++ { // set up two networked connections - // server, client := NetPipe() + // server, client := NetPipe() // can alternatively use this and comment out the line below server, client := tcpNetPipe() defer server.Close() defer client.Close() - // prepare call backs to receive messages + // prepare callback to receive messages allReceived := make(chan bool) receivedLoad := 0 // number of messages received onReceive := func(chID byte, msgBytes []byte) { receivedLoad++ - //log.TestingLogger().Info("onReceive: received message") if receivedLoad >= tt.totalMsg && tt.totalMsg > 0 { - //log.TestingLogger().Info("onReceive: received all messages") allReceived <- true } } - onError := func(r interface{}) { - //log.TestingLogger().Info("onError: received error") - } cnfg := DefaultMConnConfig() cnfg.SendRate = 50 * 1024 // 500 KB/s @@ -1002,7 +997,7 @@ func BenchmarkMConnection(b *testing.B) { SendQueueCapacity: tt.sendQueueCapacity}} serverMconn := NewMConnectionWithConfig(server, serverChDescs, onReceive, - onError, + func(r interface{}) {}, cnfg) clientMconn.SetLogger(log.TestingLogger()) serverMconn.SetLogger(log.TestingLogger()) @@ -1018,9 +1013,11 @@ func BenchmarkMConnection(b *testing.B) { _ = serverMconn.Stop() }() + // start measuring the time from here to exclude the time + // taken to set up the connections b.StartTimer() - // generate messages, is a blocking call - go GenerateMessages(clientMconn, + // start generating messages, it is a blocking call + GenerateMessages(clientMconn, tt.messagingRate, tt.totalDuration, tt.totalMsg, From c9a2e2d81b7670d16790085df09e87ee84e53bfb Mon Sep 17 00:00:00 2001 From: sanaz Date: Wed, 17 Jan 2024 15:50:22 -0800 Subject: [PATCH 11/34] removes undue line break --- p2p/conn/connection_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index e7b6b3227c..f989daa805 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -1036,8 +1036,7 @@ func BenchmarkMConnection(b *testing.B) { } -// testPipe creates a pair of connected net. -// Conn objects that can be used in tests. +// testPipe creates a pair of connected net.Conn objects that can be used in tests. func tcpNetPipe() (net.Conn, net.Conn) { ln, _ := net.Listen("tcp", "127.0.0.1:0") var conn1 net.Conn From 076877d5d57708ca6d6f1219d556a45576fdb0c7 Mon Sep 17 00:00:00 2001 From: sanaz Date: Wed, 17 Jan 2024 15:54:31 -0800 Subject: [PATCH 12/34] removes the code related to profiling --- p2p/conn/connection_test.go | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index f989daa805..5f5cd064d0 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -771,11 +771,11 @@ func stopAll(t *testing.T, stoppers ...stopper) func() { } } -// GenerateMessages generates messages of a given size at specified rate `messagingRate` +// generateMessages generates messages of a given size at specified rate `messagingRate` // for a given duration `totalDuration` until the total number of messages // `totalNum` is reached. If `totalNum` is less than zero, // then the message generation continues until the `totalDuration` is reached. -func GenerateMessages(mc *MConnection, +func generateMessages(mc *MConnection, messagingRate time.Duration, totalDuration time.Duration, totalNum int, msgSize int, chID byte) { // all messages have an identical content @@ -805,7 +805,8 @@ func GenerateMessages(mc *MConnection, } case <-timer.C: // time's up - log.TestingLogger().Info("Completed the message generation as the total " + "duration is reached") + log.TestingLogger().Info("Completed the message generation as the total " + + "duration is reached") return } } @@ -960,13 +961,6 @@ func BenchmarkMConnection(b *testing.B) { for _, tt := range tests { b.Run(tt.name, func(b *testing.B) { - //cpuFile, _ := os.Create("cpu.pprof") - //pprof.StartCPUProfile(cpuFile) - //defer pprof.StopCPUProfile() - - //f, _ := os.Create("block.pprof") - //runtime.SetBlockProfileRate(1) - for n := 0; n < b.N; n++ { // set up two networked connections // server, client := NetPipe() // can alternatively use this and comment out the line below @@ -1014,10 +1008,10 @@ func BenchmarkMConnection(b *testing.B) { }() // start measuring the time from here to exclude the time - // taken to set up the connections + // taken to set up the connections b.StartTimer() // start generating messages, it is a blocking call - GenerateMessages(clientMconn, + generateMessages(clientMconn, tt.messagingRate, tt.totalDuration, tt.totalMsg, @@ -1026,10 +1020,7 @@ func BenchmarkMConnection(b *testing.B) { // wait for all messages to be received <-allReceived b.StopTimer() - } - //pprof.Lookup("block").WriteTo(f, 0) - }) } From 923069252a5edb13a57ded6c83575334546a4061 Mon Sep 17 00:00:00 2001 From: sanaz Date: Wed, 17 Jan 2024 15:55:36 -0800 Subject: [PATCH 13/34] minor edit --- p2p/conn/connection_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 5f5cd064d0..d066603b91 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -822,7 +822,7 @@ func BenchmarkMConnection(b *testing.B) { // be fully utilized. This will be tested by increasing the total // load while maintaining consistent send and receive rates. The expectation is // that the time taken to transmit the messages will increase proportionally - // with the total load and the exact time should be close to the totalMsg*msgSize/ sendRate. + // with the total load and the exact time should be close to the totalMsg*msgSize/sendRate. // Testcases 7-9 are aimed at verifying that increasing the message rate beyond // the available bandwidth does not lead to a reduction or change in From 86dad6a3fb6c639d3dd4bb7cbcbeb683ac1b3ef7 Mon Sep 17 00:00:00 2001 From: sanaz Date: Wed, 17 Jan 2024 17:46:27 -0800 Subject: [PATCH 14/34] removes debugging logs --- p2p/conn/connection.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 72e2856539..fd6be3806c 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -228,9 +228,7 @@ func (c *MConnection) OnStart() error { c.quitSendRoutine = make(chan struct{}) c.doneSendRoutine = make(chan struct{}) c.quitRecvRoutine = make(chan struct{}) - c.Logger.Debug("Starting sendRoutine") go c.sendRoutine() - c.Logger.Debug("Starting recvRoutine") go c.recvRoutine() return nil } @@ -566,7 +564,6 @@ func (c *MConnection) recvRoutine() { FOR_LOOP: for { - c.Logger.Debug("recvRoutine", "address", c.conn.RemoteAddr()) // Block until .recvMonitor says we can read. c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true) @@ -608,7 +605,6 @@ FOR_LOOP: break FOR_LOOP } - c.Logger.Debug("Reading packet") // Read more depending on packet type. switch pkt := packet.Sum.(type) { case *tmp2p.Packet_PacketPing: @@ -639,7 +635,6 @@ FOR_LOOP: msgBytes, err := channel.recvPacketMsg(*pkt.PacketMsg) if err != nil { - c.Logger.Debug("error in reading packet message", "err", err) if c.IsRunning() { c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err) c.stopForError(err) From 9fb1853d890b2fb10105a9776500185528a36ea5 Mon Sep 17 00:00:00 2001 From: sanaz Date: Wed, 17 Jan 2024 17:51:45 -0800 Subject: [PATCH 15/34] cleans up debugging logs related to channel send queue size --- p2p/conn/connection.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index fd6be3806c..b20fc13bad 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -781,8 +781,6 @@ func (ch *Channel) sendBytes(bytes []byte) bool { select { case ch.sendQueue <- bytes: atomic.AddInt32(&ch.sendQueueSize, 1) - ch.Logger.Debug("send queue size changed", "size", - ch.loadSendQueueSize()) return true case <-time.After(defaultSendTimeout): return false @@ -796,7 +794,6 @@ func (ch *Channel) trySendBytes(bytes []byte) bool { select { case ch.sendQueue <- bytes: atomic.AddInt32(&ch.sendQueueSize, 1) - ch.Logger.Debug("send queue size changed", "size", ch.loadSendQueueSize()) return true default: return false @@ -837,7 +834,6 @@ func (ch *Channel) nextPacketMsg() tmp2p.PacketMsg { packet.EOF = true ch.sending = nil atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize - ch.Logger.Debug("send queue size changed", "size", ch.loadSendQueueSize()) } else { packet.EOF = false ch.sending = ch.sending[cmtmath.MinInt(maxSize, len(ch.sending)):] From 3435ad4cc003e221bd23e8b74dc126912777cfbb Mon Sep 17 00:00:00 2001 From: sanaz Date: Wed, 17 Jan 2024 17:52:50 -0800 Subject: [PATCH 16/34] brings back the empty line --- p2p/conn/connection.go | 1 + 1 file changed, 1 insertion(+) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index b20fc13bad..1ee489e73d 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -583,6 +583,7 @@ FOR_LOOP: // Read packet type var packet tmp2p.Packet + _n, err := protoReader.ReadMsg(&packet) c.recvMonitor.Update(_n) if err != nil { From 568bee469406164cc1b549d51a330c4acb8c12c7 Mon Sep 17 00:00:00 2001 From: sanaz Date: Wed, 17 Jan 2024 17:54:03 -0800 Subject: [PATCH 17/34] deletes empty line --- p2p/conn/connection.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 1ee489e73d..9f3f3a615d 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -583,7 +583,7 @@ FOR_LOOP: // Read packet type var packet tmp2p.Packet - + _n, err := protoReader.ReadMsg(&packet) c.recvMonitor.Update(_n) if err != nil { From 58fc3eb5e119f711ef78157d7d55e557779e074b Mon Sep 17 00:00:00 2001 From: sanaz Date: Wed, 17 Jan 2024 18:08:11 -0800 Subject: [PATCH 18/34] shortens descriptions --- p2p/conn/connection_test.go | 75 ++++++++++++++++--------------------- 1 file changed, 33 insertions(+), 42 deletions(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index d066603b91..0dfa8b4e4a 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -771,10 +771,11 @@ func stopAll(t *testing.T, stoppers ...stopper) func() { } } -// generateMessages generates messages of a given size at specified rate `messagingRate` -// for a given duration `totalDuration` until the total number of messages -// `totalNum` is reached. If `totalNum` is less than zero, -// then the message generation continues until the `totalDuration` is reached. +// generateMessages sends a sequence of messages to the specified multiplex connection `mc`. +// Each message has the given size and is sent at the specified rate +// `messagingRate`. This process continues for the duration `totalDuration` or +// until `totalNum` messages are sent. If `totalNum` is negative, +// messaging persists for the entire `totalDuration`. func generateMessages(mc *MConnection, messagingRate time.Duration, totalDuration time.Duration, totalNum int, msgSize int, chID byte) { @@ -815,18 +816,14 @@ func generateMessages(mc *MConnection, func BenchmarkMConnection(b *testing.B) { chID := byte(0x01) - // Testcases 1-3 are designed to assess the impact of send queue capacity on - // message transmission delay. + // Testcases 1-3 evaluate the effect of send queue capacity on message transmission delay. - // Testcases 3-5 are focused on evaluating whether the maximum bandwidth can - // be fully utilized. This will be tested by increasing the total - // load while maintaining consistent send and receive rates. The expectation is - // that the time taken to transmit the messages will increase proportionally - // with the total load and the exact time should be close to the totalMsg*msgSize/sendRate. + // Testcases 3-5 assess the full utilization of maximum bandwidth by + // incrementally increasing the total load while keeping send and receive + // rates constant. The transmission time should be ~ totalMsg*msgSize/sendRate. - // Testcases 7-9 are aimed at verifying that increasing the message rate beyond - // the available bandwidth does not lead to a reduction or change in - // transmission delay. The delay is expected to be the same for all the testcases. + // Testcases 5-7 verify that exceeding available bandwidth does not affect + // transmission delay, expecting consistent delay across all testcases. tests := []struct { name string @@ -840,10 +837,9 @@ func BenchmarkMConnection(b *testing.B) { }{ { // testcase 1 - // in this test case, one message of size 1KB is sent every 20ms, - // resulting in 50 messages per second i.e., 50KB/s - // the time taken to send 50 messages ideally should be ~ 1 second - // the sendQueueCapacity should have no impact on the transmission delay + // Sends one 1KB message every 20ms, totaling 50 messages (50KB/s) per second. + // Ideal transmission time for 50 messages is ~1 second. + // SendQueueCapacity should not affect transmission delay. name: "queue capacity = 1, " + "total load = 50 KB, " + "msg rate = send rate", @@ -857,10 +853,10 @@ func BenchmarkMConnection(b *testing.B) { }, { // testcase 2 - // in this test case, one message of size 1KB is sent every 20ms, - // resulting in 50 messages per second i.e., 50KB/s - // the time taken to send 50 messages ideally should be ~ 1 second - // the sendQueueCapacity should have no impact on the transmission delay + // Sends one 1KB message every 20ms, totaling 50 messages (50KB/s) per second. + // Ideal transmission time for 50 messages is ~1 second. + // Increasing SendQueueCapacity should not affect transmission + // delay. name: "queue capacity = 50, " + "total load = 50 KB, " + "traffic rate = send rate", @@ -874,10 +870,10 @@ func BenchmarkMConnection(b *testing.B) { }, { // testcase 3 - // in this test case, one message of size 1KB is sent every 20ms, - // resulting in 50 messages per second i.e., 50KB/s - // the time taken to send 50 messages ideally should be ~ 1 second - // the sendQueueCapacity should have no impact on the transmission delay + // Sends one 1KB message every 20ms, totaling 50 messages (50KB/s) per second. + // Ideal transmission time for 50 messages is ~1 second. + // Increasing SendQueueCapacity should not affect transmission + // delay. name: "queue capacity = 100, " + "total load = 50 KB, " + "traffic rate = send rate", @@ -891,10 +887,8 @@ func BenchmarkMConnection(b *testing.B) { }, { // testcase 4 - // in this test case, one message of size 1KB is sent every 20ms, - // resulting in 50 messages per second i.e., 50KB/s - // the sending operation continues for 100 messages - // the time taken to send 100 messages is expected to be ~ 2 seconds + // Sends one 1KB message every 20ms, totaling 50 messages (50KB/s) per second. + // The test runs for 100 messages, expecting a total transmission time of ~2 seconds. name: "queue capacity = 100, " + "total load = 2 * 50 KB, " + "traffic rate = send rate", @@ -908,10 +902,9 @@ func BenchmarkMConnection(b *testing.B) { }, { // testcase 5 - // in this test case, one message of size 1KB is sent every 20ms, - // resulting in 50 messages per second i.e., 50KB/s - // the sending operation continues for 400 messages - // the time taken to send 400 messages is expected to be ~ 8 seconds + // Sends one 1KB message every 20ms, totaling 50 messages (50KB/s) per second. + // The test runs for 400 messages, + // expecting a total transmission time of ~8 seconds. name: "queue capacity = 100, " + "total load = 8 * 50 KB, " + "traffic rate = send rate", @@ -925,10 +918,9 @@ func BenchmarkMConnection(b *testing.B) { }, { // testcase 6 - // in this test case, one message of size 1KB is sent every 10ms, - // resulting in 100 messages per second i.e., 100KB/s - // the sending operation continues for 400 messages - // the time taken to send 400 messages is expected to be ~ 8 seconds + // Sends one 1KB message every 10ms, totaling 100 messages (100KB/s) per second. + // The test runs for 400 messages, + // expecting a total transmission time of ~8 seconds. name: "queue capacity = 100, " + "total load = 8 * 50 KB, " + "traffic rate = 2 * send rate", @@ -942,10 +934,9 @@ func BenchmarkMConnection(b *testing.B) { }, { // testcase 7 - // in this test case, one message of size 1KB is sent every 2ms, - // resulting in 500 messages per second i.e., 500KB/s - // the sending operation continues for 400 messages - // the time taken to send 400 messages is expected to be ~ 8 seconds + // Sends one 1KB message every 2ms, totaling 500 messages (500KB/s) per second. + // The test runs for 400 messages, + // expecting a total transmission time of ~8 seconds. name: "queue capacity = 100, " + "total load = 8 * 50 KB, " + "traffic rate = 10 * send rate", From c40f4d812adf7394f6c0bf91d5ad818c07c6eb7f Mon Sep 17 00:00:00 2001 From: sanaz Date: Wed, 17 Jan 2024 18:14:54 -0800 Subject: [PATCH 19/34] clarifies testcases 5-7 --- p2p/conn/connection_test.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 0dfa8b4e4a..40da85bea7 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -819,11 +819,16 @@ func BenchmarkMConnection(b *testing.B) { // Testcases 1-3 evaluate the effect of send queue capacity on message transmission delay. // Testcases 3-5 assess the full utilization of maximum bandwidth by - // incrementally increasing the total load while keeping send and receive - // rates constant. The transmission time should be ~ totalMsg*msgSize/sendRate. - - // Testcases 5-7 verify that exceeding available bandwidth does not affect - // transmission delay, expecting consistent delay across all testcases. + // incrementally increasing the total load ( to higher than the bandwidth) + // while keeping send and receive rates constant. + // The transmission time should be ~ totalMsg*msgSize/sendRate, + // indicating that the actual sendRate is in effect and has been + // utilized. + + // Testcases 5-7 + // Assess if surpassing available bandwidth maintains consistent transmission + // delay without congestion or performance degradation. + // A uniform delay across these testcases is expected. tests := []struct { name string From 4b59722eb43b28b4c0a55f77570b9be58fbc6c1f Mon Sep 17 00:00:00 2001 From: sanaz Date: Wed, 17 Jan 2024 18:15:54 -0800 Subject: [PATCH 20/34] linter fix --- p2p/conn/connection_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 40da85bea7..fa4cb2822c 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -825,10 +825,9 @@ func BenchmarkMConnection(b *testing.B) { // indicating that the actual sendRate is in effect and has been // utilized. - // Testcases 5-7 - // Assess if surpassing available bandwidth maintains consistent transmission - // delay without congestion or performance degradation. - // A uniform delay across these testcases is expected. + // Testcases 5-7 assess if surpassing available bandwidth maintains + // consistent transmission delay without congestion or performance + // degradation. A uniform delay across these testcases is expected. tests := []struct { name string From cfd525ac9c274c9368b58c293d3d32290d782d53 Mon Sep 17 00:00:00 2001 From: sanaz Date: Thu, 18 Jan 2024 11:51:43 -0800 Subject: [PATCH 21/34] fixed send queue capacity in testcases 3-7 --- p2p/conn/connection_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index fa4cb2822c..c61b91a1fe 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -819,8 +819,8 @@ func BenchmarkMConnection(b *testing.B) { // Testcases 1-3 evaluate the effect of send queue capacity on message transmission delay. // Testcases 3-5 assess the full utilization of maximum bandwidth by - // incrementally increasing the total load ( to higher than the bandwidth) - // while keeping send and receive rates constant. + // incrementally increasing the total load while keeping send and + // receive rates constant. // The transmission time should be ~ totalMsg*msgSize/sendRate, // indicating that the actual sendRate is in effect and has been // utilized. @@ -898,7 +898,7 @@ func BenchmarkMConnection(b *testing.B) { "traffic rate = send rate", msgSize: 1 * 1024, totalMsg: 2 * 50, - sendQueueCapacity: 10, + sendQueueCapacity: 100, messagingRate: 20 * time.Millisecond, totalDuration: 1 * time.Minute, sendRate: 50 * 1024, @@ -914,7 +914,7 @@ func BenchmarkMConnection(b *testing.B) { "traffic rate = send rate", msgSize: 1 * 1024, totalMsg: 8 * 50, - sendQueueCapacity: 10, + sendQueueCapacity: 100, messagingRate: 20 * time.Millisecond, totalDuration: 1 * time.Minute, sendRate: 50 * 1024, @@ -930,7 +930,7 @@ func BenchmarkMConnection(b *testing.B) { "traffic rate = 2 * send rate", msgSize: 1 * 1024, totalMsg: 8 * 50, - sendQueueCapacity: 10, + sendQueueCapacity: 100, messagingRate: 10 * time.Millisecond, totalDuration: 1 * time.Minute, sendRate: 50 * 1024, @@ -946,7 +946,7 @@ func BenchmarkMConnection(b *testing.B) { "traffic rate = 10 * send rate", msgSize: 1 * 1024, totalMsg: 8 * 50, - sendQueueCapacity: 10, + sendQueueCapacity: 100, messagingRate: 2 * time.Millisecond, totalDuration: 1 * time.Minute, sendRate: 50 * 1024, From 4034ebed32cd5f79b3c3fb84b7ac53cd7e0ee3a0 Mon Sep 17 00:00:00 2001 From: sanaz Date: Thu, 18 Jan 2024 11:54:10 -0800 Subject: [PATCH 22/34] updates tcpNetPipe func description --- p2p/conn/connection_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index c61b91a1fe..b9d8a5fbbd 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -1022,7 +1022,7 @@ func BenchmarkMConnection(b *testing.B) { } -// testPipe creates a pair of connected net.Conn objects that can be used in tests. +// tcpNetPipe creates a pair of connected net.Conn objects that can be used in tests. func tcpNetPipe() (net.Conn, net.Conn) { ln, _ := net.Listen("tcp", "127.0.0.1:0") var conn1 net.Conn From 9ff6fad7233c1df54ce83f5dcc89ef60b9c03e9c Mon Sep 17 00:00:00 2001 From: sanaz Date: Thu, 18 Jan 2024 12:35:00 -0800 Subject: [PATCH 23/34] fixes a func doc --- libs/log/testing_logger.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/log/testing_logger.go b/libs/log/testing_logger.go index 7c6f661a74..5dbb381007 100644 --- a/libs/log/testing_logger.go +++ b/libs/log/testing_logger.go @@ -23,7 +23,7 @@ func TestingLogger() Logger { return TestingLoggerWithOutput(os.Stdout) } -// TestingLoggerWOutput returns a TMLogger which writes to (w io.Writer) if testing being run +// TestingLoggerWithOutput returns a TMLogger which writes to (w io.Writer) if testing being run // with the verbose (-v) flag, NopLogger otherwise. // // Note that the call to TestingLoggerWithOutput(w io.Writer) must be made From 78e4d75bf2d56d41850234821a9849451c83ad9f Mon Sep 17 00:00:00 2001 From: sanaz Date: Fri, 19 Jan 2024 12:55:33 -0800 Subject: [PATCH 24/34] adds two more benchmarks --- p2p/conn/connection_test.go | 272 ++++++++++++++++++++++++++++++++++-- 1 file changed, 264 insertions(+), 8 deletions(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index b9d8a5fbbd..0421f62109 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -771,16 +771,22 @@ func stopAll(t *testing.T, stoppers ...stopper) func() { } } -// generateMessages sends a sequence of messages to the specified multiplex connection `mc`. +// generateAndSendMessages sends a sequence of messages to the specified multiplex connection `mc`. // Each message has the given size and is sent at the specified rate // `messagingRate`. This process continues for the duration `totalDuration` or // until `totalNum` messages are sent. If `totalNum` is negative, // messaging persists for the entire `totalDuration`. -func generateMessages(mc *MConnection, +func generateAndSendMessages(mc *MConnection, messagingRate time.Duration, - totalDuration time.Duration, totalNum int, msgSize int, chID byte) { + totalDuration time.Duration, totalNum int, msgSize int, + msgContnet []byte, chID byte) { + var msg []byte // all messages have an identical content - msg := bytes.Repeat([]byte{'x'}, msgSize) + if msgContnet == nil { + msg = bytes.Repeat([]byte{'x'}, msgSize) + } else { + msg = msgContnet + } // message generation interval ticker ticker := time.NewTicker(messagingRate) @@ -974,8 +980,8 @@ func BenchmarkMConnection(b *testing.B) { } cnfg := DefaultMConnConfig() - cnfg.SendRate = 50 * 1024 // 500 KB/s - cnfg.RecvRate = 50 * 1024 // 500 KB/s + cnfg.SendRate = tt.sendRate + cnfg.RecvRate = tt.recRate chDescs := []*ChannelDescriptor{{ID: chID, Priority: 1, SendQueueCapacity: tt.sendQueueCapacity}} clientMconn := NewMConnectionWithConfig(client, chDescs, @@ -1006,11 +1012,12 @@ func BenchmarkMConnection(b *testing.B) { // taken to set up the connections b.StartTimer() // start generating messages, it is a blocking call - generateMessages(clientMconn, + generateAndSendMessages(clientMconn, tt.messagingRate, tt.totalDuration, tt.totalMsg, - tt.msgSize, chID) + tt.msgSize, + nil, chID) // wait for all messages to be received <-allReceived @@ -1040,3 +1047,252 @@ func tcpNetPipe() (net.Conn, net.Conn) { return conn2, conn1 } + +// generateExponentialSizedMessages creates a series of messages with sizes +// increasing exponentially. +// Each message is filled with the byte 'x'. +// The size of each message doubles, starting from 1 up to maxSize. +func generateExponentialSizedMessages(maxSize int, divisor int) [][]byte { + iterations := math.Ceil(math.Log2(float64(maxSize / divisor))) + msgs := make([][]byte, int(iterations)) + + for i := 0; i <= int(iterations); i++ { + size := int(math.Pow(2, + float64(i))) * divisor // calculate the size of the message for this + // iteration + msgs[i] = bytes.Repeat([]byte{'x'}, size) // create a message of the calculated size + } + + return msgs +} + +func BenchmarkMConnection_ScalingPayloadSizes_HighSendRate( (b *testing.B) { + // One aspect that could impact the performance of MConnection and the + // transmission rate is the size of the messages sent over the network, + // especially when they exceed the MConnection.MaxPacketMsgPayloadSize ( + // messages are sent in packets of maximum size MConnection. + // MaxPacketMsgPayloadSize). + // The test cases in this benchmark involve sending messages with sizes + // ranging exponentially from 1KB to 4096KB ( + // the max value of 4096KB is inspired by the largest possible PFB in a + // Celestia block with 128*18 number of 512-byte shares) + // The bandwidth is set significantly higher than the message load to ensure + // it does not become a limiting factor. + // All test cases are expected to complete in less than one second, + // indicating a healthy performance. + + type testCase struct { + name string + msgSize int // size of each message in bytes + msg []byte // message to be sent + totalMsg int // total number of messages to be sent + messagingRate time.Duration // rate at which messages are sent + totalDuration time.Duration // total duration for which messages are sent + sendQueueCapacity int // send queue capacity i.e., the number of messages that can be buffered + sendRate int64 // send rate in bytes per second + recRate int64 // receive rate in bytes per second + } + + squareSize := 128 // number of shares in a row/column + shareSize := 512 // bytes + maxSize := squareSize * squareSize * shareSize // bytes + msgs := generateExponentialSizedMessages(maxSize, 1024) + + // create test cases for each message size + var testCases []testCase + for _, msg := range msgs { + testCases = append(testCases, testCase{ + name: fmt.Sprintf("msgSize = %d KB", len(msg)/1024), + msgSize: len(msg), + msg: msg, + totalMsg: 10, + messagingRate: time.Millisecond, + totalDuration: 1 * time.Minute, + sendQueueCapacity: 100, + sendRate: 512 * 1024 * 1024, + recRate: 512 * 1024 * 1024, + }) + } + + chID := byte(0x01) + + for _, tt := range testCases { + b.Run(tt.name, func(b *testing.B) { + for n := 0; n < b.N; n++ { + // set up two networked connections + // server, client := NetPipe() // can alternatively use this and comment out the line below + server, client := tcpNetPipe() + defer server.Close() + defer client.Close() + + // prepare callback to receive messages + allReceived := make(chan bool) + receivedLoad := 0 // number of messages received + onReceive := func(chID byte, msgBytes []byte) { + receivedLoad++ + if receivedLoad >= tt.totalMsg && tt.totalMsg > 0 { + allReceived <- true + } + } + + cnfg := DefaultMConnConfig() + cnfg.SendRate = tt.sendRate // 500 KB/s + cnfg.RecvRate = tt.recRate // 500 KB/s + chDescs := []*ChannelDescriptor{{ID: chID, Priority: 1, + SendQueueCapacity: tt.sendQueueCapacity}} + clientMconn := NewMConnectionWithConfig(client, chDescs, + func(chID byte, msgBytes []byte) {}, + func(r interface{}) {}, + cnfg) + serverChDescs := []*ChannelDescriptor{{ID: chID, Priority: 1, + SendQueueCapacity: tt.sendQueueCapacity}} + serverMconn := NewMConnectionWithConfig(server, serverChDescs, + onReceive, + func(r interface{}) {}, + cnfg) + clientMconn.SetLogger(log.TestingLogger()) + serverMconn.SetLogger(log.TestingLogger()) + + err := clientMconn.Start() + require.Nil(b, err) + defer func() { + _ = clientMconn.Stop() + }() + err = serverMconn.Start() + require.Nil(b, err) + defer func() { + _ = serverMconn.Stop() + }() + + // start measuring the time from here to exclude the time + // taken to set up the connections + b.StartTimer() + // start generating messages, it is a blocking call + generateAndSendMessages(clientMconn, + tt.messagingRate, + tt.totalDuration, + tt.totalMsg, + tt.msgSize, + tt.msg, + chID) + + // wait for all messages to be received + <-allReceived + b.StopTimer() + } + }) + + } +} + +func BenchmarkMConnection_ScalingPayloadSizes_LowSendRate(b *testing.B) { + // This benchmark test builds upon the previous one i.e., + // BenchmarkMConnection_ScalingPayloadSizes_HighSendRate + // by setting the send/and receive rates lower than the message load. + // Test cases involve sending the same load of messages but with different message sizes. + // Since the message load and bandwidth are consistent across all test cases, + // they are expected to complete in the same amount of time. + + type testCase struct { + name string + msgSize int // size of each message in bytes + msg []byte // message to be sent + totalMsg int // total number of messages to be sent + messagingRate time.Duration // rate at which messages are sent + totalDuration time.Duration // total duration for which messages are sent + sendQueueCapacity int // send queue capacity i.e., the number of messages that can be buffered + sendRate int64 // send rate in bytes per second + recRate int64 // receive rate in bytes per second + } + + maxSize := 32 * 1024 // 32KB + msgs := generateExponentialSizedMessages(maxSize, 1024) + totalLoad := float64(maxSize) + // create test cases for each message size + var testCases []testCase + for _, msg := range msgs { + msgSize := len(msg) + totalMsg := int(math.Ceil(totalLoad / float64(msgSize))) + testCases = append(testCases, testCase{ + name: fmt.Sprintf("msgSize = %d KB", msgSize/1024), + msgSize: msgSize, + msg: msg, + totalMsg: totalMsg, + messagingRate: time.Millisecond, + totalDuration: 1 * time.Minute, + sendQueueCapacity: 100, + sendRate: 4 * 1024, + recRate: 4 * 1024, + }) + } + + chID := byte(0x01) + + for _, tt := range testCases { + b.Run(tt.name, func(b *testing.B) { + for n := 0; n < b.N; n++ { + // set up two networked connections + // server, client := NetPipe() // can alternatively use this and comment out the line below + server, client := tcpNetPipe() + defer server.Close() + defer client.Close() + + // prepare callback to receive messages + allReceived := make(chan bool) + receivedLoad := 0 // number of messages received + onReceive := func(chID byte, msgBytes []byte) { + receivedLoad++ + if receivedLoad >= tt.totalMsg && tt.totalMsg > 0 { + allReceived <- true + } + } + + cnfg := DefaultMConnConfig() + cnfg.SendRate = tt.sendRate // 500 KB/s + cnfg.RecvRate = tt.recRate // 500 KB/s + chDescs := []*ChannelDescriptor{{ID: chID, Priority: 1, + SendQueueCapacity: tt.sendQueueCapacity}} + clientMconn := NewMConnectionWithConfig(client, chDescs, + func(chID byte, msgBytes []byte) {}, + func(r interface{}) {}, + cnfg) + serverChDescs := []*ChannelDescriptor{{ID: chID, Priority: 1, + SendQueueCapacity: tt.sendQueueCapacity}} + serverMconn := NewMConnectionWithConfig(server, serverChDescs, + onReceive, + func(r interface{}) {}, + cnfg) + clientMconn.SetLogger(log.TestingLogger()) + serverMconn.SetLogger(log.TestingLogger()) + + err := clientMconn.Start() + require.Nil(b, err) + defer func() { + _ = clientMconn.Stop() + }() + err = serverMconn.Start() + require.Nil(b, err) + defer func() { + _ = serverMconn.Stop() + }() + + // start measuring the time from here to exclude the time + // taken to set up the connections + b.StartTimer() + // start generating messages, it is a blocking call + generateAndSendMessages(clientMconn, + tt.messagingRate, + tt.totalDuration, + tt.totalMsg, + tt.msgSize, + tt.msg, + chID) + + // wait for all messages to be received + <-allReceived + b.StopTimer() + } + }) + + } +} From 280fc25e698b22e2d94dffce38e155a372f4564b Mon Sep 17 00:00:00 2001 From: sanaz Date: Fri, 19 Jan 2024 12:58:49 -0800 Subject: [PATCH 25/34] makes some cleanup --- p2p/conn/connection_test.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 25e615374d..aaf8cf58c5 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -1066,7 +1066,7 @@ func generateExponentialSizedMessages(maxSize int, divisor int) [][]byte { return msgs } -func BenchmarkMConnection_ScalingPayloadSizes_HighSendRate( (b *testing.B) { +func BenchmarkMConnection_ScalingPayloadSizes_HighSendRate(b *testing.B) { // One aspect that could impact the performance of MConnection and the // transmission rate is the size of the messages sent over the network, // especially when they exceed the MConnection.MaxPacketMsgPayloadSize ( @@ -1115,7 +1115,7 @@ func BenchmarkMConnection_ScalingPayloadSizes_HighSendRate( (b *testing.B) { } chID := byte(0x01) - + for _, tt := range testCases { b.Run(tt.name, func(b *testing.B) { for n := 0; n < b.N; n++ { @@ -1296,5 +1296,3 @@ func BenchmarkMConnection_ScalingPayloadSizes_LowSendRate(b *testing.B) { } } - -// generateMessages sends a sequence of messages to the specified multiplex connection `mc`. \ No newline at end of file From 6f24c5f62e45f4c32e721a25aa1e7d41311ffbd1 Mon Sep 17 00:00:00 2001 From: sanaz Date: Fri, 19 Jan 2024 14:21:08 -0800 Subject: [PATCH 26/34] allocates space for the testcases --- p2p/conn/connection_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index aaf8cf58c5..97178f8c2e 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -1099,9 +1099,9 @@ func BenchmarkMConnection_ScalingPayloadSizes_HighSendRate(b *testing.B) { msgs := generateExponentialSizedMessages(maxSize, 1024) // create test cases for each message size - var testCases []testCase - for _, msg := range msgs { - testCases = append(testCases, testCase{ + var testCases = make([]testCase, len(msgs)) + for i, msg := range msgs { + testCases[i] = testCase{ name: fmt.Sprintf("msgSize = %d KB", len(msg)/1024), msgSize: len(msg), msg: msg, @@ -1111,7 +1111,7 @@ func BenchmarkMConnection_ScalingPayloadSizes_HighSendRate(b *testing.B) { sendQueueCapacity: 100, sendRate: 512 * 1024 * 1024, recRate: 512 * 1024 * 1024, - }) + } } chID := byte(0x01) @@ -1209,11 +1209,11 @@ func BenchmarkMConnection_ScalingPayloadSizes_LowSendRate(b *testing.B) { msgs := generateExponentialSizedMessages(maxSize, 1024) totalLoad := float64(maxSize) // create test cases for each message size - var testCases []testCase - for _, msg := range msgs { + var testCases = make([]testCase, len(msgs)) + for i, msg := range msgs { msgSize := len(msg) totalMsg := int(math.Ceil(totalLoad / float64(msgSize))) - testCases = append(testCases, testCase{ + testCases[i] = testCase{ name: fmt.Sprintf("msgSize = %d KB", msgSize/1024), msgSize: msgSize, msg: msg, @@ -1223,7 +1223,7 @@ func BenchmarkMConnection_ScalingPayloadSizes_LowSendRate(b *testing.B) { sendQueueCapacity: 100, sendRate: 4 * 1024, recRate: 4 * 1024, - }) + } } chID := byte(0x01) From aaae3d0b37f9757d7fb379a26aecce202eb3b164 Mon Sep 17 00:00:00 2001 From: sanaz Date: Mon, 22 Jan 2024 17:26:49 -0800 Subject: [PATCH 27/34] refactors the bench tests to consolidate duplicate parts --- p2p/conn/connection_test.go | 235 +++++++++++++----------------------- 1 file changed, 82 insertions(+), 153 deletions(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 97178f8c2e..34ab539702 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -1081,22 +1081,11 @@ func BenchmarkMConnection_ScalingPayloadSizes_HighSendRate(b *testing.B) { // All test cases are expected to complete in less than one second, // indicating a healthy performance. - type testCase struct { - name string - msgSize int // size of each message in bytes - msg []byte // message to be sent - totalMsg int // total number of messages to be sent - messagingRate time.Duration // rate at which messages are sent - totalDuration time.Duration // total duration for which messages are sent - sendQueueCapacity int // send queue capacity i.e., the number of messages that can be buffered - sendRate int64 // send rate in bytes per second - recRate int64 // receive rate in bytes per second - } - squareSize := 128 // number of shares in a row/column shareSize := 512 // bytes maxSize := squareSize * squareSize * shareSize // bytes msgs := generateExponentialSizedMessages(maxSize, 1024) + chID := byte(0x01) // create test cases for each message size var testCases = make([]testCase, len(msgs)) @@ -1111,78 +1100,94 @@ func BenchmarkMConnection_ScalingPayloadSizes_HighSendRate(b *testing.B) { sendQueueCapacity: 100, sendRate: 512 * 1024 * 1024, recRate: 512 * 1024 * 1024, + chID: chID, } } - chID := byte(0x01) - for _, tt := range testCases { - b.Run(tt.name, func(b *testing.B) { - for n := 0; n < b.N; n++ { - // set up two networked connections - // server, client := NetPipe() // can alternatively use this and comment out the line below - server, client := tcpNetPipe() - defer server.Close() - defer client.Close() - - // prepare callback to receive messages - allReceived := make(chan bool) - receivedLoad := 0 // number of messages received - onReceive := func(chID byte, msgBytes []byte) { - receivedLoad++ - if receivedLoad >= tt.totalMsg && tt.totalMsg > 0 { - allReceived <- true - } - } - - cnfg := DefaultMConnConfig() - cnfg.SendRate = tt.sendRate // 500 KB/s - cnfg.RecvRate = tt.recRate // 500 KB/s - chDescs := []*ChannelDescriptor{{ID: chID, Priority: 1, - SendQueueCapacity: tt.sendQueueCapacity}} - clientMconn := NewMConnectionWithConfig(client, chDescs, - func(chID byte, msgBytes []byte) {}, - func(r interface{}) {}, - cnfg) - serverChDescs := []*ChannelDescriptor{{ID: chID, Priority: 1, - SendQueueCapacity: tt.sendQueueCapacity}} - serverMconn := NewMConnectionWithConfig(server, serverChDescs, - onReceive, - func(r interface{}) {}, - cnfg) - clientMconn.SetLogger(log.TestingLogger()) - serverMconn.SetLogger(log.TestingLogger()) + runBenchmarkTest(b, tt) - err := clientMconn.Start() - require.Nil(b, err) - defer func() { - _ = clientMconn.Stop() - }() - err = serverMconn.Start() - require.Nil(b, err) - defer func() { - _ = serverMconn.Stop() - }() + } +} - // start measuring the time from here to exclude the time - // taken to set up the connections - b.StartTimer() - // start generating messages, it is a blocking call - generateAndSendMessages(clientMconn, - tt.messagingRate, - tt.totalDuration, - tt.totalMsg, - tt.msgSize, - tt.msg, - chID) +type testCase struct { + name string + msgSize int // size of each message in bytes + msg []byte // message to be sent + totalMsg int // total number of messages to be sent + messagingRate time.Duration // rate at which messages are sent + totalDuration time.Duration // total duration for which messages are sent + sendQueueCapacity int // send queue capacity i.e., the number of messages that can be buffered + sendRate int64 // send rate in bytes per second + recRate int64 // receive rate in bytes per second + chID byte // channel ID +} - // wait for all messages to be received - <-allReceived - b.StopTimer() +func runBenchmarkTest(b *testing.B, tt testCase) { + b.Run(tt.name, func(b *testing.B) { + for n := 0; n < b.N; n++ { + // set up two networked connections + // server, client := NetPipe() // can alternatively use this and comment out the line below + server, client := tcpNetPipe() + defer server.Close() + defer client.Close() + + // prepare callback to receive messages + allReceived := make(chan bool) + receivedLoad := 0 // number of messages received + onReceive := func(chID byte, msgBytes []byte) { + receivedLoad++ + if receivedLoad >= tt.totalMsg && tt.totalMsg > 0 { + allReceived <- true + } } - }) - } + cnfg := DefaultMConnConfig() + cnfg.SendRate = tt.sendRate // 500 KB/s + cnfg.RecvRate = tt.recRate // 500 KB/s + chDescs := []*ChannelDescriptor{{ID: tt.chID, Priority: 1, + SendQueueCapacity: tt.sendQueueCapacity}} + clientMconn := NewMConnectionWithConfig(client, chDescs, + func(chID byte, msgBytes []byte) {}, + func(r interface{}) {}, + cnfg) + serverChDescs := []*ChannelDescriptor{{ID: tt.chID, Priority: 1, + SendQueueCapacity: tt.sendQueueCapacity}} + serverMconn := NewMConnectionWithConfig(server, serverChDescs, + onReceive, + func(r interface{}) {}, + cnfg) + clientMconn.SetLogger(log.TestingLogger()) + serverMconn.SetLogger(log.TestingLogger()) + + err := clientMconn.Start() + require.Nil(b, err) + defer func() { + _ = clientMconn.Stop() + }() + err = serverMconn.Start() + require.Nil(b, err) + defer func() { + _ = serverMconn.Stop() + }() + + // start measuring the time from here to exclude the time + // taken to set up the connections + b.StartTimer() + // start generating messages, it is a blocking call + generateAndSendMessages(clientMconn, + tt.messagingRate, + tt.totalDuration, + tt.totalMsg, + tt.msgSize, + tt.msg, + tt.chID) + + // wait for all messages to be received + <-allReceived + b.StopTimer() + } + }) } func BenchmarkMConnection_ScalingPayloadSizes_LowSendRate(b *testing.B) { @@ -1193,18 +1198,6 @@ func BenchmarkMConnection_ScalingPayloadSizes_LowSendRate(b *testing.B) { // Since the message load and bandwidth are consistent across all test cases, // they are expected to complete in the same amount of time. - type testCase struct { - name string - msgSize int // size of each message in bytes - msg []byte // message to be sent - totalMsg int // total number of messages to be sent - messagingRate time.Duration // rate at which messages are sent - totalDuration time.Duration // total duration for which messages are sent - sendQueueCapacity int // send queue capacity i.e., the number of messages that can be buffered - sendRate int64 // send rate in bytes per second - recRate int64 // receive rate in bytes per second - } - maxSize := 32 * 1024 // 32KB msgs := generateExponentialSizedMessages(maxSize, 1024) totalLoad := float64(maxSize) @@ -1223,76 +1216,12 @@ func BenchmarkMConnection_ScalingPayloadSizes_LowSendRate(b *testing.B) { sendQueueCapacity: 100, sendRate: 4 * 1024, recRate: 4 * 1024, + chID: byte(0x01), } } - chID := byte(0x01) - for _, tt := range testCases { - b.Run(tt.name, func(b *testing.B) { - for n := 0; n < b.N; n++ { - // set up two networked connections - // server, client := NetPipe() // can alternatively use this and comment out the line below - server, client := tcpNetPipe() - defer server.Close() - defer client.Close() - - // prepare callback to receive messages - allReceived := make(chan bool) - receivedLoad := 0 // number of messages received - onReceive := func(chID byte, msgBytes []byte) { - receivedLoad++ - if receivedLoad >= tt.totalMsg && tt.totalMsg > 0 { - allReceived <- true - } - } - - cnfg := DefaultMConnConfig() - cnfg.SendRate = tt.sendRate // 500 KB/s - cnfg.RecvRate = tt.recRate // 500 KB/s - chDescs := []*ChannelDescriptor{{ID: chID, Priority: 1, - SendQueueCapacity: tt.sendQueueCapacity}} - clientMconn := NewMConnectionWithConfig(client, chDescs, - func(chID byte, msgBytes []byte) {}, - func(r interface{}) {}, - cnfg) - serverChDescs := []*ChannelDescriptor{{ID: chID, Priority: 1, - SendQueueCapacity: tt.sendQueueCapacity}} - serverMconn := NewMConnectionWithConfig(server, serverChDescs, - onReceive, - func(r interface{}) {}, - cnfg) - clientMconn.SetLogger(log.TestingLogger()) - serverMconn.SetLogger(log.TestingLogger()) - - err := clientMconn.Start() - require.Nil(b, err) - defer func() { - _ = clientMconn.Stop() - }() - err = serverMconn.Start() - require.Nil(b, err) - defer func() { - _ = serverMconn.Stop() - }() - - // start measuring the time from here to exclude the time - // taken to set up the connections - b.StartTimer() - // start generating messages, it is a blocking call - generateAndSendMessages(clientMconn, - tt.messagingRate, - tt.totalDuration, - tt.totalMsg, - tt.msgSize, - tt.msg, - chID) - - // wait for all messages to be received - <-allReceived - b.StopTimer() - } - }) + runBenchmarkTest(b, tt) } } From 570defb63013b72586e3bb0d9b88d05ffa32d696 Mon Sep 17 00:00:00 2001 From: sanaz Date: Mon, 22 Jan 2024 17:28:15 -0800 Subject: [PATCH 28/34] fixes boundary checks --- p2p/conn/connection_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 34ab539702..aa7427527a 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -1056,7 +1056,7 @@ func generateExponentialSizedMessages(maxSize int, divisor int) [][]byte { iterations := math.Ceil(math.Log2(float64(maxSize / divisor))) msgs := make([][]byte, int(iterations)) - for i := 0; i <= int(iterations); i++ { + for i := 0; i < int(iterations); i++ { size := int(math.Pow(2, float64(i))) * divisor // calculate the size of the message for this // iteration From 2f16bc935304c2e9ace6b2e98356a70b2e6761b9 Mon Sep 17 00:00:00 2001 From: sanaz Date: Mon, 22 Jan 2024 17:31:31 -0800 Subject: [PATCH 29/34] reorganizes the code --- p2p/conn/connection_test.go | 92 ++++++++++++++++++------------------- 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index aa7427527a..bd996ff439 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -1066,50 +1066,6 @@ func generateExponentialSizedMessages(maxSize int, divisor int) [][]byte { return msgs } -func BenchmarkMConnection_ScalingPayloadSizes_HighSendRate(b *testing.B) { - // One aspect that could impact the performance of MConnection and the - // transmission rate is the size of the messages sent over the network, - // especially when they exceed the MConnection.MaxPacketMsgPayloadSize ( - // messages are sent in packets of maximum size MConnection. - // MaxPacketMsgPayloadSize). - // The test cases in this benchmark involve sending messages with sizes - // ranging exponentially from 1KB to 4096KB ( - // the max value of 4096KB is inspired by the largest possible PFB in a - // Celestia block with 128*18 number of 512-byte shares) - // The bandwidth is set significantly higher than the message load to ensure - // it does not become a limiting factor. - // All test cases are expected to complete in less than one second, - // indicating a healthy performance. - - squareSize := 128 // number of shares in a row/column - shareSize := 512 // bytes - maxSize := squareSize * squareSize * shareSize // bytes - msgs := generateExponentialSizedMessages(maxSize, 1024) - chID := byte(0x01) - - // create test cases for each message size - var testCases = make([]testCase, len(msgs)) - for i, msg := range msgs { - testCases[i] = testCase{ - name: fmt.Sprintf("msgSize = %d KB", len(msg)/1024), - msgSize: len(msg), - msg: msg, - totalMsg: 10, - messagingRate: time.Millisecond, - totalDuration: 1 * time.Minute, - sendQueueCapacity: 100, - sendRate: 512 * 1024 * 1024, - recRate: 512 * 1024 * 1024, - chID: chID, - } - } - - for _, tt := range testCases { - runBenchmarkTest(b, tt) - - } -} - type testCase struct { name string msgSize int // size of each message in bytes @@ -1196,11 +1152,13 @@ func BenchmarkMConnection_ScalingPayloadSizes_LowSendRate(b *testing.B) { // by setting the send/and receive rates lower than the message load. // Test cases involve sending the same load of messages but with different message sizes. // Since the message load and bandwidth are consistent across all test cases, - // they are expected to complete in the same amount of time. + // they are expected to complete in the same amount of time. i.e., + //totalLoad/sendRate. maxSize := 32 * 1024 // 32KB msgs := generateExponentialSizedMessages(maxSize, 1024) totalLoad := float64(maxSize) + chID := byte(0x01) // create test cases for each message size var testCases = make([]testCase, len(msgs)) for i, msg := range msgs { @@ -1216,12 +1174,54 @@ func BenchmarkMConnection_ScalingPayloadSizes_LowSendRate(b *testing.B) { sendQueueCapacity: 100, sendRate: 4 * 1024, recRate: 4 * 1024, - chID: byte(0x01), + chID: chID, } } for _, tt := range testCases { runBenchmarkTest(b, tt) + } +} + +func BenchmarkMConnection_ScalingPayloadSizes_HighSendRate(b *testing.B) { + // One aspect that could impact the performance of MConnection and the + // transmission rate is the size of the messages sent over the network, + // especially when they exceed the MConnection.MaxPacketMsgPayloadSize ( + // messages are sent in packets of maximum size MConnection. + // MaxPacketMsgPayloadSize). + // The test cases in this benchmark involve sending messages with sizes + // ranging exponentially from 1KB to 4096KB ( + // the max value of 4096KB is inspired by the largest possible PFB in a + // Celestia block with 128*18 number of 512-byte shares) + // The bandwidth is set significantly higher than the message load to ensure + // it does not become a limiting factor. + // All test cases are expected to complete in less than one second, + // indicating a healthy performance. + + squareSize := 128 // number of shares in a row/column + shareSize := 512 // bytes + maxSize := squareSize * squareSize * shareSize // bytes + msgs := generateExponentialSizedMessages(maxSize, 1024) + chID := byte(0x01) + + // create test cases for each message size + var testCases = make([]testCase, len(msgs)) + for i, msg := range msgs { + testCases[i] = testCase{ + name: fmt.Sprintf("msgSize = %d KB", len(msg)/1024), + msgSize: len(msg), + msg: msg, + totalMsg: 10, + messagingRate: time.Millisecond, + totalDuration: 1 * time.Minute, + sendQueueCapacity: 100, + sendRate: 512 * 1024 * 1024, + recRate: 512 * 1024 * 1024, + chID: chID, + } + } + for _, tt := range testCases { + runBenchmarkTest(b, tt) } } From ec0501f6e30733306c25944f3bc5686a1d8fd405 Mon Sep 17 00:00:00 2001 From: sanaz Date: Mon, 22 Jan 2024 17:43:07 -0800 Subject: [PATCH 30/34] refactors the generateExponentialSizedMessages func --- p2p/conn/connection_test.go | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index bd996ff439..bb6be10a17 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -1048,21 +1048,17 @@ func tcpNetPipe() (net.Conn, net.Conn) { return conn2, conn1 } -// generateExponentialSizedMessages creates a series of messages with sizes -// increasing exponentially. -// Each message is filled with the byte 'x'. -// The size of each message doubles, starting from 1 up to maxSize. -func generateExponentialSizedMessages(maxSize int, divisor int) [][]byte { - iterations := math.Ceil(math.Log2(float64(maxSize / divisor))) - msgs := make([][]byte, int(iterations)) - - for i := 0; i < int(iterations); i++ { - size := int(math.Pow(2, - float64(i))) * divisor // calculate the size of the message for this - // iteration - msgs[i] = bytes.Repeat([]byte{'x'}, size) // create a message of the calculated size +// generateExponentialSizedMessages creates and returns a series of messages +// with sizes (in the specified unit) increasing exponentially. +// The size of each message doubles, starting from 1 up to maxSizeBytes. +// unit is expected to be a power of 2. +func generateExponentialSizedMessages(maxSizeBytes int, unit int) [][]byte { + maxSizeToUnit := int(math.Ceil(float64(maxSizeBytes / unit))) + msgs := make([][]byte, 0) + + for size := 1; size <= maxSizeToUnit; size *= 2 { + msgs = append(msgs, bytes.Repeat([]byte{'x'}, size*unit)) // create a message of the calculated size } - return msgs } From 00c48d2448c1d8091236bd9f9ac5dd11e5483229 Mon Sep 17 00:00:00 2001 From: sanaz Date: Mon, 22 Jan 2024 17:53:54 -0800 Subject: [PATCH 31/34] fixes a linter issue --- p2p/conn/connection_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index bb6be10a17..8e61256089 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -1053,7 +1053,7 @@ func tcpNetPipe() (net.Conn, net.Conn) { // The size of each message doubles, starting from 1 up to maxSizeBytes. // unit is expected to be a power of 2. func generateExponentialSizedMessages(maxSizeBytes int, unit int) [][]byte { - maxSizeToUnit := int(math.Ceil(float64(maxSizeBytes / unit))) + maxSizeToUnit := maxSizeBytes / unit msgs := make([][]byte, 0) for size := 1; size <= maxSizeToUnit; size *= 2 { From 85974953d7679449389f5da8e1c3f3c6bea8dac9 Mon Sep 17 00:00:00 2001 From: sanaz Date: Mon, 22 Jan 2024 18:05:34 -0800 Subject: [PATCH 32/34] removes an undo comment --- p2p/conn/connection_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 8e61256089..e3f6b2dcf0 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -1095,8 +1095,8 @@ func runBenchmarkTest(b *testing.B, tt testCase) { } cnfg := DefaultMConnConfig() - cnfg.SendRate = tt.sendRate // 500 KB/s - cnfg.RecvRate = tt.recRate // 500 KB/s + cnfg.SendRate = tt.sendRate + cnfg.RecvRate = tt.recRate chDescs := []*ChannelDescriptor{{ID: tt.chID, Priority: 1, SendQueueCapacity: tt.sendQueueCapacity}} clientMconn := NewMConnectionWithConfig(client, chDescs, From 71ba8507f230c0d0714eeb5b0ff43f600ee4dfdc Mon Sep 17 00:00:00 2001 From: sanaz Date: Mon, 22 Jan 2024 18:12:24 -0800 Subject: [PATCH 33/34] switches the order of benchmarks --- p2p/conn/connection_test.go | 74 ++++++++++++++++++------------------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index e3f6b2dcf0..7c2a231f21 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -1142,43 +1142,6 @@ func runBenchmarkTest(b *testing.B, tt testCase) { }) } -func BenchmarkMConnection_ScalingPayloadSizes_LowSendRate(b *testing.B) { - // This benchmark test builds upon the previous one i.e., - // BenchmarkMConnection_ScalingPayloadSizes_HighSendRate - // by setting the send/and receive rates lower than the message load. - // Test cases involve sending the same load of messages but with different message sizes. - // Since the message load and bandwidth are consistent across all test cases, - // they are expected to complete in the same amount of time. i.e., - //totalLoad/sendRate. - - maxSize := 32 * 1024 // 32KB - msgs := generateExponentialSizedMessages(maxSize, 1024) - totalLoad := float64(maxSize) - chID := byte(0x01) - // create test cases for each message size - var testCases = make([]testCase, len(msgs)) - for i, msg := range msgs { - msgSize := len(msg) - totalMsg := int(math.Ceil(totalLoad / float64(msgSize))) - testCases[i] = testCase{ - name: fmt.Sprintf("msgSize = %d KB", msgSize/1024), - msgSize: msgSize, - msg: msg, - totalMsg: totalMsg, - messagingRate: time.Millisecond, - totalDuration: 1 * time.Minute, - sendQueueCapacity: 100, - sendRate: 4 * 1024, - recRate: 4 * 1024, - chID: chID, - } - } - - for _, tt := range testCases { - runBenchmarkTest(b, tt) - } -} - func BenchmarkMConnection_ScalingPayloadSizes_HighSendRate(b *testing.B) { // One aspect that could impact the performance of MConnection and the // transmission rate is the size of the messages sent over the network, @@ -1221,3 +1184,40 @@ func BenchmarkMConnection_ScalingPayloadSizes_HighSendRate(b *testing.B) { runBenchmarkTest(b, tt) } } + +func BenchmarkMConnection_ScalingPayloadSizes_LowSendRate(b *testing.B) { + // This benchmark test builds upon the previous one i.e., + // BenchmarkMConnection_ScalingPayloadSizes_HighSendRate + // by setting the send/and receive rates lower than the message load. + // Test cases involve sending the same load of messages but with different message sizes. + // Since the message load and bandwidth are consistent across all test cases, + // they are expected to complete in the same amount of time. i.e., + //totalLoad/sendRate. + + maxSize := 32 * 1024 // 32KB + msgs := generateExponentialSizedMessages(maxSize, 1024) + totalLoad := float64(maxSize) + chID := byte(0x01) + // create test cases for each message size + var testCases = make([]testCase, len(msgs)) + for i, msg := range msgs { + msgSize := len(msg) + totalMsg := int(math.Ceil(totalLoad / float64(msgSize))) + testCases[i] = testCase{ + name: fmt.Sprintf("msgSize = %d KB", msgSize/1024), + msgSize: msgSize, + msg: msg, + totalMsg: totalMsg, + messagingRate: time.Millisecond, + totalDuration: 1 * time.Minute, + sendQueueCapacity: 100, + sendRate: 4 * 1024, + recRate: 4 * 1024, + chID: chID, + } + } + + for _, tt := range testCases { + runBenchmarkTest(b, tt) + } +} From 91265109e6225e34a9f2955d6f52c12c9c2b4f0e Mon Sep 17 00:00:00 2001 From: sanaz Date: Tue, 23 Jan 2024 09:48:01 -0800 Subject: [PATCH 34/34] addresses comments --- p2p/conn/connection_test.go | 72 +++++++++++++++++++------------------ 1 file changed, 38 insertions(+), 34 deletions(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 7c2a231f21..23cdb29ee9 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -771,6 +771,11 @@ func stopAll(t *testing.T, stoppers ...stopper) func() { } } +const ( + kibibyte = 1024 + mebibyte = 1024 * 1024 +) + // generateAndSendMessages sends a sequence of messages to the specified multiplex connection `mc`. // Each message has the given size and is sent at the specified rate // `messagingRate`. This process continues for the duration `totalDuration` or @@ -781,7 +786,6 @@ func generateAndSendMessages(mc *MConnection, totalDuration time.Duration, totalNum int, msgSize int, msgContnet []byte, chID byte) { var msg []byte - // all messages have an identical content if msgContnet == nil { msg = bytes.Repeat([]byte{'x'}, msgSize) } else { @@ -853,13 +857,13 @@ func BenchmarkMConnection(b *testing.B) { name: "queue capacity = 1, " + "total load = 50 KB, " + "msg rate = send rate", - msgSize: 1 * 1024, + msgSize: 1 * kibibyte, totalMsg: 1 * 50, sendQueueCapacity: 1, messagingRate: 20 * time.Millisecond, totalDuration: 1 * time.Minute, - sendRate: 50 * 1024, - recRate: 50 * 1024, + sendRate: 50 * kibibyte, + recRate: 50 * kibibyte, }, { // testcase 2 @@ -870,13 +874,13 @@ func BenchmarkMConnection(b *testing.B) { name: "queue capacity = 50, " + "total load = 50 KB, " + "traffic rate = send rate", - msgSize: 1 * 1024, + msgSize: 1 * kibibyte, totalMsg: 1 * 50, sendQueueCapacity: 50, messagingRate: 20 * time.Millisecond, totalDuration: 1 * time.Minute, - sendRate: 50 * 1024, - recRate: 50 * 1024, + sendRate: 50 * kibibyte, + recRate: 50 * kibibyte, }, { // testcase 3 @@ -887,13 +891,13 @@ func BenchmarkMConnection(b *testing.B) { name: "queue capacity = 100, " + "total load = 50 KB, " + "traffic rate = send rate", - msgSize: 1 * 1024, + msgSize: 1 * kibibyte, totalMsg: 1 * 50, sendQueueCapacity: 100, messagingRate: 20 * time.Millisecond, totalDuration: 1 * time.Minute, - sendRate: 50 * 1024, - recRate: 50 * 1024, + sendRate: 50 * kibibyte, + recRate: 50 * kibibyte, }, { // testcase 4 @@ -902,13 +906,13 @@ func BenchmarkMConnection(b *testing.B) { name: "queue capacity = 100, " + "total load = 2 * 50 KB, " + "traffic rate = send rate", - msgSize: 1 * 1024, + msgSize: 1 * kibibyte, totalMsg: 2 * 50, sendQueueCapacity: 100, messagingRate: 20 * time.Millisecond, totalDuration: 1 * time.Minute, - sendRate: 50 * 1024, - recRate: 50 * 1024, + sendRate: 50 * kibibyte, + recRate: 50 * kibibyte, }, { // testcase 5 @@ -918,13 +922,13 @@ func BenchmarkMConnection(b *testing.B) { name: "queue capacity = 100, " + "total load = 8 * 50 KB, " + "traffic rate = send rate", - msgSize: 1 * 1024, + msgSize: 1 * kibibyte, totalMsg: 8 * 50, sendQueueCapacity: 100, messagingRate: 20 * time.Millisecond, totalDuration: 1 * time.Minute, - sendRate: 50 * 1024, - recRate: 50 * 1024, + sendRate: 50 * kibibyte, + recRate: 50 * kibibyte, }, { // testcase 6 @@ -934,13 +938,13 @@ func BenchmarkMConnection(b *testing.B) { name: "queue capacity = 100, " + "total load = 8 * 50 KB, " + "traffic rate = 2 * send rate", - msgSize: 1 * 1024, + msgSize: 1 * kibibyte, totalMsg: 8 * 50, sendQueueCapacity: 100, messagingRate: 10 * time.Millisecond, totalDuration: 1 * time.Minute, - sendRate: 50 * 1024, - recRate: 50 * 1024, + sendRate: 50 * kibibyte, + recRate: 50 * kibibyte, }, { // testcase 7 @@ -950,13 +954,13 @@ func BenchmarkMConnection(b *testing.B) { name: "queue capacity = 100, " + "total load = 8 * 50 KB, " + "traffic rate = 10 * send rate", - msgSize: 1 * 1024, + msgSize: 1 * kibibyte, totalMsg: 8 * 50, sendQueueCapacity: 100, messagingRate: 2 * time.Millisecond, totalDuration: 1 * time.Minute, - sendRate: 50 * 1024, - recRate: 50 * 1024, + sendRate: 50 * kibibyte, + recRate: 50 * kibibyte, }, } @@ -1149,9 +1153,9 @@ func BenchmarkMConnection_ScalingPayloadSizes_HighSendRate(b *testing.B) { // messages are sent in packets of maximum size MConnection. // MaxPacketMsgPayloadSize). // The test cases in this benchmark involve sending messages with sizes - // ranging exponentially from 1KB to 4096KB ( - // the max value of 4096KB is inspired by the largest possible PFB in a - // Celestia block with 128*18 number of 512-byte shares) + // ranging exponentially from 1KB to 8192KB ( + // the max value of 8192KB is inspired by the largest possible PFB in a + // Celestia block with 128*128 number of 512-byte shares) // The bandwidth is set significantly higher than the message load to ensure // it does not become a limiting factor. // All test cases are expected to complete in less than one second, @@ -1160,22 +1164,22 @@ func BenchmarkMConnection_ScalingPayloadSizes_HighSendRate(b *testing.B) { squareSize := 128 // number of shares in a row/column shareSize := 512 // bytes maxSize := squareSize * squareSize * shareSize // bytes - msgs := generateExponentialSizedMessages(maxSize, 1024) + msgs := generateExponentialSizedMessages(maxSize, kibibyte) chID := byte(0x01) // create test cases for each message size var testCases = make([]testCase, len(msgs)) for i, msg := range msgs { testCases[i] = testCase{ - name: fmt.Sprintf("msgSize = %d KB", len(msg)/1024), + name: fmt.Sprintf("msgSize = %d KB", len(msg)/kibibyte), msgSize: len(msg), msg: msg, totalMsg: 10, messagingRate: time.Millisecond, totalDuration: 1 * time.Minute, sendQueueCapacity: 100, - sendRate: 512 * 1024 * 1024, - recRate: 512 * 1024 * 1024, + sendRate: 512 * mebibyte, + recRate: 512 * mebibyte, chID: chID, } } @@ -1194,8 +1198,8 @@ func BenchmarkMConnection_ScalingPayloadSizes_LowSendRate(b *testing.B) { // they are expected to complete in the same amount of time. i.e., //totalLoad/sendRate. - maxSize := 32 * 1024 // 32KB - msgs := generateExponentialSizedMessages(maxSize, 1024) + maxSize := 32 * kibibyte // 32KB + msgs := generateExponentialSizedMessages(maxSize, kibibyte) totalLoad := float64(maxSize) chID := byte(0x01) // create test cases for each message size @@ -1204,15 +1208,15 @@ func BenchmarkMConnection_ScalingPayloadSizes_LowSendRate(b *testing.B) { msgSize := len(msg) totalMsg := int(math.Ceil(totalLoad / float64(msgSize))) testCases[i] = testCase{ - name: fmt.Sprintf("msgSize = %d KB", msgSize/1024), + name: fmt.Sprintf("msgSize = %d KB", msgSize/kibibyte), msgSize: msgSize, msg: msg, totalMsg: totalMsg, messagingRate: time.Millisecond, totalDuration: 1 * time.Minute, sendQueueCapacity: 100, - sendRate: 4 * 1024, - recRate: 4 * 1024, + sendRate: 4 * kibibyte, + recRate: 4 * kibibyte, chID: chID, } }