diff --git a/p2p/conn/connection_bench_test.go b/p2p/conn/connection_bench_test.go index c8ab74bdfe..5fa6a62942 100644 --- a/p2p/conn/connection_bench_test.go +++ b/p2p/conn/connection_bench_test.go @@ -91,6 +91,8 @@ func sendMessages(mc *MConnection, case <-ticker.C: // generate message if mc.Send(chIDs[i], msgs[i]) { + log.TestingLogger().Info("Sent message ", i, " on channel ", + chIDs[i]) i++ if i >= total { log.TestingLogger().Info("Completed the message generation as the" + @@ -717,3 +719,97 @@ func TestMConnection_Message_Order_ChannelID(t *testing.T) { require.Equal(t, chIDs, recvChIds) // assert that the order of received messages is the same as the order of sent messages } + +func TestMConnection_Failing_Large_Messages(t *testing.T) { + // This test evaluates how MConnection handles messages exceeding channel + // ID's receive message capacity i.e., `RecvMessageCapacity`. + // It involves two connections, each with two channels: Channel ID 1 ( + // capacity 1024 bytes) and Channel ID 2 (capacity 1023 bytes). + // All the other channel ID's and MConnection's configurations are set high + // enough to not be a limiting factor. + // A 1KB message is sent over the first and second channels in succession. + // Message on Channel ID 1 (capacity equal to message size) is received, + // while the message on Channel ID 2 (capacity less than message size) is dropped. + + totalMsgs := 2 + msgSize := 1 * kibibyte + sendRate := 50 * kibibyte + recRate := 50 * kibibyte + chDesc := []*ChannelDescriptor{ + {ID: 0x01, Priority: 1, SendQueueCapacity: 50, + RecvMessageCapacity: msgSize, + RecvBufferCapacity: defaultRecvBufferCapacity}, + {ID: 0x02, Priority: 1, SendQueueCapacity: 50, + RecvMessageCapacity: msgSize - 1, + RecvBufferCapacity: defaultRecvBufferCapacity}, + } + + // prepare messages and channel IDs + // 1 message on channel ID 1 and 1 message on channel ID 2 + msgs := make([][]byte, totalMsgs) + chIDs := make([]byte, totalMsgs) + msgs[0] = bytes.Repeat([]byte{'x'}, msgSize) + chIDs[0] = 0x01 + msgs[1] = bytes.Repeat([]byte{'y'}, msgSize) + chIDs[1] = 0x02 + + // set up two networked connections + // server, client := NetPipe() // can alternatively use this and comment out the line below + server, client := tcpNetPipe() + defer server.Close() + defer client.Close() + + // prepare callback to receive messages + allReceived := make(chan bool) + recvChIds := make(chan byte, totalMsgs) + onReceive := func(chID byte, msgBytes []byte) { + recvChIds <- chID + if len(recvChIds) >= totalMsgs { + allReceived <- true + } + } + + cnfg := DefaultMConnConfig() + cnfg.SendRate = int64(sendRate) + cnfg.RecvRate = int64(recRate) + + // mount the channel descriptors to the connections + clientMconn := NewMConnectionWithConfig(client, chDesc, + func(chID byte, msgBytes []byte) {}, + func(r interface{}) {}, + cnfg) + serverMconn := NewMConnectionWithConfig(server, chDesc, + onReceive, + func(r interface{}) {}, + cnfg) + clientMconn.SetLogger(log.TestingLogger()) + serverMconn.SetLogger(log.TestingLogger()) + + err := clientMconn.Start() + require.Nil(t, err) + defer func() { + _ = clientMconn.Stop() + }() + err = serverMconn.Start() + require.Nil(t, err) + defer func() { + _ = serverMconn.Stop() + }() + + // start sending messages + go sendMessages(clientMconn, + time.Millisecond, + 1*time.Second, + msgs, chIDs) + + // wait for messages to be received + select { + case <-allReceived: + require.Fail(t, "All messages should not have been received") // the message sent + // on channel ID 2 should have been dropped + case <-time.After(500 * time.Millisecond): + require.Equal(t, 1, len(recvChIds)) + require.Equal(t, chIDs[0], <-recvChIds) // the first message should be received + require.True(t, !serverMconn.IsRunning()) // the serverMconn should have stopped due to the error + } +}