Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Benchmark MConnection with large size messages #1179

Merged
merged 36 commits into from
Jan 24, 2024
Merged
Changes from 34 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
e67a1bd
adds tests
staheri14 Jan 16, 2024
81f8918
adds logs
staheri14 Jan 16, 2024
02df8e7
includes a lot of updates on the message generation load control and …
staheri14 Jan 17, 2024
2aee45b
updates the test setting and adds comments
staheri14 Jan 17, 2024
e1c3359
edits the messaging rate
staheri14 Jan 17, 2024
695b49f
removes a debugging remnants
staheri14 Jan 17, 2024
58e7beb
removes receiver callback functions from the client conn
staheri14 Jan 17, 2024
e1dd65a
includes more test cases
staheri14 Jan 17, 2024
593528a
adds more documentation pertaining test scenarios
staheri14 Jan 17, 2024
46359da
removes onError func definition
staheri14 Jan 17, 2024
c9a2e2d
removes undue line break
staheri14 Jan 17, 2024
076877d
removes the code related to profiling
staheri14 Jan 17, 2024
9230692
minor edit
staheri14 Jan 17, 2024
86dad6a
removes debugging logs
staheri14 Jan 18, 2024
9fb1853
cleans up debugging logs related to channel send queue size
staheri14 Jan 18, 2024
3435ad4
brings back the empty line
staheri14 Jan 18, 2024
568bee4
deletes empty line
staheri14 Jan 18, 2024
58fc3eb
shortens descriptions
staheri14 Jan 18, 2024
c40f4d8
clarifies testcases 5-7
staheri14 Jan 18, 2024
4b59722
linter fix
staheri14 Jan 18, 2024
cfd525a
fixed send queue capacity in testcases 3-7
staheri14 Jan 18, 2024
4034ebe
updates tcpNetPipe func description
staheri14 Jan 18, 2024
9ff6fad
fixes a func doc
staheri14 Jan 18, 2024
78e4d75
adds two more benchmarks
staheri14 Jan 19, 2024
ce4efe3
Merge remote-tracking branch 'origin/main' into sanaz/mconnection-lar…
staheri14 Jan 19, 2024
280fc25
makes some cleanup
staheri14 Jan 19, 2024
6f24c5f
allocates space for the testcases
staheri14 Jan 19, 2024
aaae3d0
refactors the bench tests to consolidate duplicate parts
staheri14 Jan 23, 2024
570defb
fixes boundary checks
staheri14 Jan 23, 2024
2f16bc9
reorganizes the code
staheri14 Jan 23, 2024
ec0501f
refactors the generateExponentialSizedMessages func
staheri14 Jan 23, 2024
00c48d2
fixes a linter issue
staheri14 Jan 23, 2024
8597495
removes an undo comment
staheri14 Jan 23, 2024
71ba850
switches the order of benchmarks
staheri14 Jan 23, 2024
9126510
addresses comments
staheri14 Jan 23, 2024
bdb800f
Merge remote-tracking branch 'origin/main' into sanaz/mconnection-lar…
staheri14 Jan 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 189 additions & 8 deletions p2p/conn/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,16 +771,22 @@ func stopAll(t *testing.T, stoppers ...stopper) func() {
}
}

// generateMessages sends a sequence of messages to the specified multiplex connection `mc`.
// generateAndSendMessages sends a sequence of messages to the specified multiplex connection `mc`.
// Each message has the given size and is sent at the specified rate
// `messagingRate`. This process continues for the duration `totalDuration` or
// until `totalNum` messages are sent. If `totalNum` is negative,
// messaging persists for the entire `totalDuration`.
func generateMessages(mc *MConnection,
func generateAndSendMessages(mc *MConnection,
messagingRate time.Duration,
totalDuration time.Duration, totalNum int, msgSize int, chID byte) {
totalDuration time.Duration, totalNum int, msgSize int,
msgContnet []byte, chID byte) {
var msg []byte
// all messages have an identical content
staheri14 marked this conversation as resolved.
Show resolved Hide resolved
msg := bytes.Repeat([]byte{'x'}, msgSize)
if msgContnet == nil {
msg = bytes.Repeat([]byte{'x'}, msgSize)
} else {
msg = msgContnet
}

// message generation interval ticker
ticker := time.NewTicker(messagingRate)
Expand Down Expand Up @@ -974,8 +980,8 @@ func BenchmarkMConnection(b *testing.B) {
}

cnfg := DefaultMConnConfig()
cnfg.SendRate = 50 * 1024 // 500 KB/s
cnfg.RecvRate = 50 * 1024 // 500 KB/s
cnfg.SendRate = tt.sendRate
cnfg.RecvRate = tt.recRate
chDescs := []*ChannelDescriptor{{ID: chID, Priority: 1,
SendQueueCapacity: tt.sendQueueCapacity}}
clientMconn := NewMConnectionWithConfig(client, chDescs,
Expand Down Expand Up @@ -1006,11 +1012,12 @@ func BenchmarkMConnection(b *testing.B) {
// taken to set up the connections
b.StartTimer()
// start generating messages, it is a blocking call
generateMessages(clientMconn,
generateAndSendMessages(clientMconn,
tt.messagingRate,
tt.totalDuration,
tt.totalMsg,
tt.msgSize, chID)
tt.msgSize,
nil, chID)

// wait for all messages to be received
<-allReceived
Expand Down Expand Up @@ -1040,3 +1047,177 @@ func tcpNetPipe() (net.Conn, net.Conn) {

return conn2, conn1
}

// generateExponentialSizedMessages creates and returns a series of messages
// with sizes (in the specified unit) increasing exponentially.
// The size of each message doubles, starting from 1 up to maxSizeBytes.
// unit is expected to be a power of 2.
func generateExponentialSizedMessages(maxSizeBytes int, unit int) [][]byte {
maxSizeToUnit := maxSizeBytes / unit
msgs := make([][]byte, 0)

for size := 1; size <= maxSizeToUnit; size *= 2 {
msgs = append(msgs, bytes.Repeat([]byte{'x'}, size*unit)) // create a message of the calculated size
}
return msgs
}

type testCase struct {
name string
msgSize int // size of each message in bytes
msg []byte // message to be sent
totalMsg int // total number of messages to be sent
messagingRate time.Duration // rate at which messages are sent
totalDuration time.Duration // total duration for which messages are sent
sendQueueCapacity int // send queue capacity i.e., the number of messages that can be buffered
sendRate int64 // send rate in bytes per second
recRate int64 // receive rate in bytes per second
chID byte // channel ID
}

func runBenchmarkTest(b *testing.B, tt testCase) {
b.Run(tt.name, func(b *testing.B) {
for n := 0; n < b.N; n++ {
// set up two networked connections
// server, client := NetPipe() // can alternatively use this and comment out the line below
server, client := tcpNetPipe()
defer server.Close()
defer client.Close()

// prepare callback to receive messages
allReceived := make(chan bool)
receivedLoad := 0 // number of messages received
onReceive := func(chID byte, msgBytes []byte) {
receivedLoad++
if receivedLoad >= tt.totalMsg && tt.totalMsg > 0 {
allReceived <- true
}
}

cnfg := DefaultMConnConfig()
cnfg.SendRate = tt.sendRate
cnfg.RecvRate = tt.recRate
chDescs := []*ChannelDescriptor{{ID: tt.chID, Priority: 1,
SendQueueCapacity: tt.sendQueueCapacity}}
clientMconn := NewMConnectionWithConfig(client, chDescs,
func(chID byte, msgBytes []byte) {},
func(r interface{}) {},
cnfg)
serverChDescs := []*ChannelDescriptor{{ID: tt.chID, Priority: 1,
SendQueueCapacity: tt.sendQueueCapacity}}
serverMconn := NewMConnectionWithConfig(server, serverChDescs,
onReceive,
func(r interface{}) {},
cnfg)
clientMconn.SetLogger(log.TestingLogger())
serverMconn.SetLogger(log.TestingLogger())

err := clientMconn.Start()
require.Nil(b, err)
defer func() {
_ = clientMconn.Stop()
}()
err = serverMconn.Start()
require.Nil(b, err)
defer func() {
_ = serverMconn.Stop()
}()

// start measuring the time from here to exclude the time
// taken to set up the connections
b.StartTimer()
// start generating messages, it is a blocking call
generateAndSendMessages(clientMconn,
tt.messagingRate,
tt.totalDuration,
tt.totalMsg,
tt.msgSize,
tt.msg,
tt.chID)

// wait for all messages to be received
<-allReceived
b.StopTimer()
}
})
}

func BenchmarkMConnection_ScalingPayloadSizes_HighSendRate(b *testing.B) {
// One aspect that could impact the performance of MConnection and the
// transmission rate is the size of the messages sent over the network,
// especially when they exceed the MConnection.MaxPacketMsgPayloadSize (
// messages are sent in packets of maximum size MConnection.
// MaxPacketMsgPayloadSize).
// The test cases in this benchmark involve sending messages with sizes
// ranging exponentially from 1KB to 4096KB (
// the max value of 4096KB is inspired by the largest possible PFB in a
staheri14 marked this conversation as resolved.
Show resolved Hide resolved
// Celestia block with 128*18 number of 512-byte shares)
staheri14 marked this conversation as resolved.
Show resolved Hide resolved
// The bandwidth is set significantly higher than the message load to ensure
// it does not become a limiting factor.
// All test cases are expected to complete in less than one second,
// indicating a healthy performance.

squareSize := 128 // number of shares in a row/column
shareSize := 512 // bytes
maxSize := squareSize * squareSize * shareSize // bytes
msgs := generateExponentialSizedMessages(maxSize, 1024)
chID := byte(0x01)

// create test cases for each message size
var testCases = make([]testCase, len(msgs))
for i, msg := range msgs {
testCases[i] = testCase{
name: fmt.Sprintf("msgSize = %d KB", len(msg)/1024),
msgSize: len(msg),
msg: msg,
totalMsg: 10,
messagingRate: time.Millisecond,
totalDuration: 1 * time.Minute,
sendQueueCapacity: 100,
sendRate: 512 * 1024 * 1024,
recRate: 512 * 1024 * 1024,
chID: chID,
}
}

for _, tt := range testCases {
runBenchmarkTest(b, tt)
}
}

func BenchmarkMConnection_ScalingPayloadSizes_LowSendRate(b *testing.B) {
// This benchmark test builds upon the previous one i.e.,
// BenchmarkMConnection_ScalingPayloadSizes_HighSendRate
// by setting the send/and receive rates lower than the message load.
// Test cases involve sending the same load of messages but with different message sizes.
// Since the message load and bandwidth are consistent across all test cases,
// they are expected to complete in the same amount of time. i.e.,
//totalLoad/sendRate.

maxSize := 32 * 1024 // 32KB
staheri14 marked this conversation as resolved.
Show resolved Hide resolved
msgs := generateExponentialSizedMessages(maxSize, 1024)
totalLoad := float64(maxSize)
chID := byte(0x01)
// create test cases for each message size
var testCases = make([]testCase, len(msgs))
for i, msg := range msgs {
msgSize := len(msg)
totalMsg := int(math.Ceil(totalLoad / float64(msgSize)))
testCases[i] = testCase{
name: fmt.Sprintf("msgSize = %d KB", msgSize/1024),
msgSize: msgSize,
msg: msg,
totalMsg: totalMsg,
messagingRate: time.Millisecond,
totalDuration: 1 * time.Minute,
sendQueueCapacity: 100,
sendRate: 4 * 1024,
recRate: 4 * 1024,
chID: chID,
}
}

for _, tt := range testCases {
runBenchmarkTest(b, tt)
}
}
Loading