diff --git a/test/test_buffer.cpp b/test/test_buffer.cpp index 851fe1167..896303d59 100644 --- a/test/test_buffer.cpp +++ b/test/test_buffer.cpp @@ -2,6 +2,7 @@ #include #include "gtest/gtest.h" #include "buffer.h" +#include "buffer_rcv.h" using namespace srt; using namespace std; @@ -28,7 +29,14 @@ class CRcvBufferReadMsg m_unit_queue = unique_ptr(new CUnitQueue); ASSERT_NE(m_unit_queue.get(), nullptr); m_unit_queue->init(m_buff_size_pkts, 1500, AF_INET); + +#if ENABLE_NEW_RCVBUFFER + const bool enable_msg_api = true; + const bool enable_peer_rexmit = true; + m_rcv_buffer = unique_ptr(new CRcvBufferNew(m_init_seqno, m_buff_size_pkts, m_unit_queue.get(), enable_peer_rexmit, enable_msg_api)); +#else m_rcv_buffer = unique_ptr(new CRcvBuffer(m_unit_queue.get(), m_buff_size_pkts)); +#endif ASSERT_NE(m_rcv_buffer.get(), nullptr); } @@ -36,8 +44,8 @@ class CRcvBufferReadMsg { // Code here will be called just after the test completes. // OK to throw exceptions from here if needed. - m_unit_queue.reset(); m_rcv_buffer.reset(); + m_unit_queue.reset(); } public: @@ -68,8 +76,12 @@ class CRcvBufferReadMsg EXPECT_TRUE(packet.getMsgOrderFlag()); } +#if ENABLE_NEW_RCVBUFFER + return m_rcv_buffer->insert(unit); +#else const int offset = CSeqNo::seqoff(m_first_unack_seqno, seqno); return m_rcv_buffer->addData(unit, offset); +#endif } /// @returns 0 on success, the result of rcv_buffer::insert(..) once it failed @@ -104,31 +116,54 @@ class CRcvBufferReadMsg int ackPackets(int num_pkts) { m_first_unack_seqno = CSeqNo::incseq(m_first_unack_seqno, num_pkts); +#if ENABLE_NEW_RCVBUFFER + return 0; +#else return m_rcv_buffer->ackData(num_pkts); +#endif } int getAvailBufferSize() { +#if ENABLE_NEW_RCVBUFFER + return m_rcv_buffer->getAvailSize(m_first_unack_seqno); +#else return m_rcv_buffer->getAvailBufSize(); +#endif } int readMessage(char* data, size_t len) { +#if ENABLE_NEW_RCVBUFFER + return m_rcv_buffer->readMessage(data, len); +#else return m_rcv_buffer->readMsg(data, len); +#endif } bool hasAvailablePackets() { +#if ENABLE_NEW_RCVBUFFER + return m_rcv_buffer->hasAvailablePackets(); +#else return m_rcv_buffer->isRcvDataAvailable(); +#endif } protected: unique_ptr m_unit_queue; +#if ENABLE_NEW_RCVBUFFER + unique_ptr m_rcv_buffer; +#else unique_ptr m_rcv_buffer; +#endif const int m_buff_size_pkts = 16; const int m_init_seqno = 1000; int m_first_unack_seqno = m_init_seqno; static const size_t m_payload_sz = 1456; + + const sync::steady_clock::time_point m_tsbpd_base = sync::steady_clock::now(); // now() - HS.timestamp, microseconds + const sync::steady_clock::duration m_delay = sync::milliseconds_from(200); }; // Check the available size of the receiver buffer. @@ -137,6 +172,19 @@ TEST_F(CRcvBufferReadMsg, Create) EXPECT_EQ(getAvailBufferSize(), m_buff_size_pkts - 1); } +// Check that destroying the buffer also frees memory units. +TEST_F(CRcvBufferReadMsg, Destroy) +{ + EXPECT_EQ(getAvailBufferSize(), m_buff_size_pkts - 1); + // Add a number of units (packets) to the buffer + // equal to the buffer size in packets + for (int i = 0; i < getAvailBufferSize(); ++i) + EXPECT_EQ(addMessage(1, CSeqNo::incseq(m_init_seqno, i)), 0); + + m_rcv_buffer.reset(); + EXPECT_EQ(m_unit_queue->size(), m_unit_queue->capacity()); +} + // Fill the buffer full, and check adding more data results in an error. TEST_F(CRcvBufferReadMsg, FullBuffer) { @@ -163,9 +211,11 @@ TEST_F(CRcvBufferReadMsg, FullBuffer) EXPECT_TRUE(size_t(res) == m_payload_sz); EXPECT_TRUE(verifyPayload(buff.data(), res, CSeqNo::incseq(m_init_seqno, i))); } + + EXPECT_EQ(m_unit_queue->size(), m_unit_queue->capacity()); } -// BUG in the new RCV buffer!!! +// BUG in the old RCV buffer!!! // In this test case a packet is added to receiver buffer with offset 1, // thus leaving offset 0 with an empty pointer. // The buffer says it is not empty, and the data is available @@ -187,42 +237,112 @@ TEST_F(CRcvBufferReadMsg, OnePacketGap) EXPECT_EQ(res, 0); // BUG. Acknowledging an empty position must not result in a read-readiness. + // TODO: Actually we should not acknowledge, but must drop instead. ackPackets(1); - // Wrong behavior (BUG) +#if ENABLE_NEW_RCVBUFFER // Expected behavior + EXPECT_FALSE(hasAvailablePackets()); + EXPECT_FALSE(rcv_buffer.isRcvDataReady()); + + const auto next_packet = m_rcv_buffer->getFirstValidPacketInfo(); + EXPECT_EQ(next_packet.seqno, m_init_seqno + 1); +#else // Wrong behavior (BUG) EXPECT_TRUE(hasAvailablePackets()); EXPECT_TRUE(rcv_buffer.isRcvDataReady()); +#endif EXPECT_EQ(getAvailBufferSize(), m_buff_size_pkts - 2); +#if ENABLE_NEW_RCVBUFFER + // The new buffer will return 0 as reading is not available. + res = rcv_buffer.readBuffer(buff.data(), buff.size()); + EXPECT_EQ(res, 0); +#else cerr << "Expecting IPE from readBuffer(..): \n"; res = rcv_buffer.readBuffer(buff.data(), buff.size()); EXPECT_EQ(res, -1); +#endif res = readMessage(buff.data(), buff.size()); EXPECT_EQ(res, 0); + +#if ENABLE_NEW_RCVBUFFER + // Add a missing packet (can't add before an acknowledged position in the old buffer). + EXPECT_EQ(addMessage(1, m_init_seqno), 0); + + for (int pktno = 0; pktno < 2; ++pktno) + { + const size_t msg_bytelen = m_payload_sz; + EXPECT_TRUE(rcv_buffer.isRcvDataReady()); + EXPECT_EQ(readMessage(buff.data(), buff.size()), msg_bytelen); + EXPECT_TRUE(verifyPayload(buff.data(), msg_bytelen, CSeqNo::incseq(m_init_seqno, pktno))); + } + EXPECT_EQ(m_unit_queue->size(), m_unit_queue->capacity()); +#endif + + // Further read is not possible + EXPECT_FALSE(rcv_buffer.isRcvDataReady()); +} + +/// One packet is added to the buffer after 1-packet gap. Should be read only after ACK. +/// 1. insert (1) +/// | +/// +---+---+ ---+---+---+---+ +---+ +/// | 0 | 1 | 0 | 0 | 0 | 0 |...| 0 | m_pUnit[] +/// +---+---+ ---+---+---+---+ +---+ +/// 2. drop (0) +/// 2. read (1) +/// +TEST_F(CRcvBufferReadMsg, OnePacketGapDrop) +{ + // Add one packet message to to the buffer + // with a gap of one packet. + EXPECT_EQ(addMessage(1, CSeqNo::incseq(m_init_seqno)), 0); + auto& rcv_buffer = *m_rcv_buffer.get(); + EXPECT_FALSE(hasAvailablePackets()); + EXPECT_FALSE(rcv_buffer.isRcvDataReady()); +#if ENABLE_NEW_RCVBUFFER + rcv_buffer.dropUpTo(CSeqNo::incseq(m_init_seqno)); +#else + rcv_buffer.dropData(1); +#endif + + EXPECT_TRUE(hasAvailablePackets()); + EXPECT_TRUE(rcv_buffer.isRcvDataReady()); + array buff; + EXPECT_TRUE(readMessage(buff.data(), buff.size()) == m_payload_sz); + EXPECT_TRUE(verifyPayload(buff.data(), m_payload_sz, CSeqNo::incseq(m_init_seqno))); + EXPECT_EQ(m_unit_queue->size(), m_unit_queue->capacity()); } // Add one packet to the buffer and read it once it is acknowledged. // Confirm the data read is valid. +// Don't allow to add packet with the same sequence number. TEST_F(CRcvBufferReadMsg, OnePacket) { const size_t msg_pkts = 1; // Adding one message without acknowledging - addMessage(msg_pkts, m_init_seqno, false); + EXPECT_EQ(addMessage(msg_pkts, m_init_seqno, false), 0); + // Adding a packet into the same position must return an error. + EXPECT_EQ(addMessage(msg_pkts, m_init_seqno, false), -1); const size_t msg_bytelen = msg_pkts * m_payload_sz; array buff; + // The new receiver buffer allows reading without ACK. +#if !ENABLE_NEW_RCVBUFFER EXPECT_FALSE(hasAvailablePackets()); + const int res1 = readMessage(buff.data(), buff.size()); EXPECT_EQ(res1, 0); // Full ACK ackPackets(msg_pkts); +#endif EXPECT_TRUE(hasAvailablePackets()); const int res2 = readMessage(buff.data(), buff.size()); EXPECT_EQ(res2, msg_bytelen); EXPECT_TRUE(verifyPayload(buff.data(), res2, m_init_seqno)); + EXPECT_EQ(m_unit_queue->size(), m_unit_queue->capacity()); } // Add ten packets to the buffer, acknowledge and read some of them. @@ -243,7 +363,12 @@ TEST_F(CRcvBufferReadMsg, AddData) // The value is reported by SRT receiver like this: // data[ACKD_BUFFERLEFT] = m_pRcvBuffer->getAvailBufSize(); EXPECT_EQ(getAvailBufferSize(), m_buff_size_pkts - 1); +#if ENABLE_NEW_RCVBUFFER + // The new receiver buffer does not need ACK to allow reading. + EXPECT_TRUE(hasAvailablePackets()); +#else EXPECT_FALSE(hasAvailablePackets()); +#endif // Now acknowledge two packets const int ack_pkts = 2; @@ -261,11 +386,24 @@ TEST_F(CRcvBufferReadMsg, AddData) } // Add packet to the position of oackets already read. - // Can't check, as negative offset is an error not handled by the receiver buffer. - //EXPECT_EQ(addPacket(m_init_seqno), -1); + // Can't check the old buffer, as it does not handle a negative offset. +#if ENABLE_NEW_RCVBUFFER + EXPECT_EQ(addPacket(m_init_seqno), -2); +#endif // Add packet to a non-empty position. EXPECT_EQ(addPacket(CSeqNo::incseq(m_init_seqno, ack_pkts)), -1); + + const int num_pkts_left = num_pkts - ack_pkts; + ackPackets(num_pkts_left); + for (int i = 0; i < num_pkts_left; ++i) + { + const int res = readMessage(buff.data(), buff.size()); + EXPECT_TRUE(size_t(res) == m_payload_sz); + EXPECT_EQ(getAvailBufferSize(), m_buff_size_pkts - num_pkts_left + i); + EXPECT_TRUE(verifyPayload(buff.data(), res, CSeqNo::incseq(m_init_seqno, ack_pkts + i))); + } + EXPECT_EQ(m_unit_queue->size(), m_unit_queue->capacity()); } // Check reading the whole message (consisting of several packets) from the buffer. @@ -291,6 +429,7 @@ TEST_F(CRcvBufferReadMsg, MsgAcked) const ptrdiff_t offset = i * m_payload_sz; EXPECT_TRUE(verifyPayload(buff.data() + offset, m_payload_sz, CSeqNo::incseq(m_init_seqno, i))); } + EXPECT_EQ(m_unit_queue->size(), m_unit_queue->capacity()); } // Check reading the whole message (consisting of several packets) into @@ -320,6 +459,7 @@ TEST_F(CRcvBufferReadMsg, SmallReadBuffer) EXPECT_FALSE(m_rcv_buffer->isRcvDataReady()); EXPECT_FALSE(hasAvailablePackets()); EXPECT_EQ(getAvailBufferSize(), m_buff_size_pkts - 1); + EXPECT_EQ(m_unit_queue->size(), m_unit_queue->capacity()); } // BUG!!! @@ -336,6 +476,19 @@ TEST_F(CRcvBufferReadMsg, MsgHalfAck) // Nothing to read (0 for zero bytes read). const size_t msg_bytelen = msg_pkts * m_payload_sz; array buff; +#if ENABLE_NEW_RCVBUFFER + // The new receiver buffer does not care about ACK. + EXPECT_TRUE(m_rcv_buffer->isRcvDataReady()); + EXPECT_TRUE(hasAvailablePackets()); + + const int res = readMessage(buff.data(), buff.size()); + EXPECT_EQ(res, msg_bytelen); + for (size_t i = 0; i < msg_pkts; ++i) + { + const ptrdiff_t offset = i * m_payload_sz; + EXPECT_TRUE(verifyPayload(buff.data() + offset, m_payload_sz, CSeqNo::incseq(m_init_seqno, i))); + } +#else EXPECT_FALSE(m_rcv_buffer->isRcvDataReady()); EXPECT_FALSE(hasAvailablePackets()); const int res = readMessage(buff.data(), buff.size()); @@ -344,14 +497,24 @@ TEST_F(CRcvBufferReadMsg, MsgHalfAck) // ACK half of the message and check read-readiness. ackPackets(2); // FIXME: Sadly RCV buffer says the data is ready to be read. - // EXPECT_FALSE(m_rcv_buffer->isRcvDataReady()); - // EXPECT_FALSE(hasAvailablePackets()); EXPECT_TRUE(m_rcv_buffer->isRcvDataReady()); EXPECT_TRUE(hasAvailablePackets()); // Actually must be nothing to read (can't read half a message). const int res2 = readMessage(buff.data(), buff.size()); EXPECT_EQ(res2, 0); + + // ACK the remaining half of the message and check read-readiness. + ackPackets(2); + const int res3 = readMessage(buff.data(), buff.size()); + EXPECT_EQ(res3, msg_bytelen); + for (size_t i = 0; i < msg_pkts; ++i) + { + const ptrdiff_t offset = i * m_payload_sz; + EXPECT_TRUE(verifyPayload(buff.data() + offset, m_payload_sz, CSeqNo::incseq(m_init_seqno, i))); + } +#endif + EXPECT_EQ(m_unit_queue->size(), m_unit_queue->capacity()); } // BUG!!! @@ -363,12 +526,39 @@ TEST_F(CRcvBufferReadMsg, OutOfOrderMsgNoACK) // Adding one message with the Out-Of-Order flag set, but without acknowledging. addMessage(msg_pkts, m_init_seqno, true); +#if ENABLE_NEW_RCVBUFFER + EXPECT_TRUE(m_rcv_buffer->isRcvDataReady()); + EXPECT_TRUE(hasAvailablePackets()); +#else EXPECT_FALSE(m_rcv_buffer->isRcvDataReady()); EXPECT_FALSE(hasAvailablePackets()); +#endif const size_t msg_bytelen = msg_pkts * m_payload_sz; array buff; const int res = readMessage(buff.data(), buff.size()); EXPECT_EQ(res, msg_bytelen); + for (size_t i = 0; i < msg_pkts; ++i) + { + const ptrdiff_t offset = i * m_payload_sz; + EXPECT_TRUE(verifyPayload(buff.data() + offset, m_payload_sz, CSeqNo::incseq(m_init_seqno, i))); + } + + EXPECT_FALSE(m_rcv_buffer->isRcvDataReady()); + EXPECT_FALSE(hasAvailablePackets()); + +#if ENABLE_NEW_RCVBUFFER + EXPECT_EQ(m_unit_queue->size(), m_unit_queue->capacity()); +#else + ackPackets(msg_pkts); + // The old buffer still does not free units. + EXPECT_NE(m_unit_queue->size(), m_unit_queue->capacity()); + // BUG: wrong read-ready state. + EXPECT_TRUE(m_rcv_buffer->isRcvDataReady()); + EXPECT_TRUE(hasAvailablePackets()); + // Nothing read, but empty units are freed. + EXPECT_EQ(readMessage(buff.data(), buff.size()), 0); + EXPECT_EQ(m_unit_queue->size(), m_unit_queue->capacity()); +#endif } // Adding a message with the out-of-order flag set. @@ -379,8 +569,13 @@ TEST_F(CRcvBufferReadMsg, OutOfOrderMsgGap) // Adding one message with the Out-Of-Order flag set, but without acknowledging. addMessage(msg_pkts, CSeqNo::incseq(m_init_seqno, 1), true); +#if ENABLE_NEW_RCVBUFFER + EXPECT_TRUE(m_rcv_buffer->isRcvDataReady()); + EXPECT_TRUE(hasAvailablePackets()); +#else EXPECT_FALSE(m_rcv_buffer->isRcvDataReady()); EXPECT_FALSE(hasAvailablePackets()); +#endif const size_t msg_bytelen = msg_pkts * m_payload_sz; array buff; const int res = readMessage(buff.data(), buff.size()); @@ -407,10 +602,15 @@ TEST_F(CRcvBufferReadMsg, OutOfOrderMsgGap) EXPECT_TRUE(res3 == m_payload_sz); EXPECT_TRUE(verifyPayload(buff.data(), m_payload_sz, m_init_seqno)); - // Only "passack" packets remain in the buffer. + // Only "passack" or EntryState_Read packets remain in the buffer. // They are falsely signalled as read-ready. +#if ENABLE_NEW_RCVBUFFER + EXPECT_FALSE(m_rcv_buffer->isRcvDataReady()); + EXPECT_FALSE(hasAvailablePackets()); +#else EXPECT_TRUE(m_rcv_buffer->isRcvDataReady()); // BUG: nothing to read. EXPECT_TRUE(hasAvailablePackets()); // BUG: nothing to read. +#endif // Adding a packet right after the EntryState_Read packets. const int seqno = CSeqNo::incseq(m_init_seqno, msg_pkts + 1); @@ -422,4 +622,216 @@ TEST_F(CRcvBufferReadMsg, OutOfOrderMsgGap) EXPECT_TRUE(verifyPayload(buff.data(), m_payload_sz, seqno)); EXPECT_FALSE(m_rcv_buffer->isRcvDataReady()); EXPECT_FALSE(hasAvailablePackets()); + EXPECT_EQ(m_unit_queue->size(), m_unit_queue->capacity()); +} + +// One message (4 packets) are added to the buffer. +// Check if reading is only possible once the whole message is present in the buffer. +TEST_F(CRcvBufferReadMsg, LongMsgReadReady) +{ + const size_t msg_pkts = 4; + const size_t msg_bytelen = msg_pkts * m_payload_sz; + array buff; + for (size_t i = 0; i < msg_pkts; ++i) + { + // int addPacket(int seqno, bool pb_first = true, bool pb_last = true, bool out_of_order = false, int ts = 0) + const bool pb_first = (i == 0); + const bool pb_last = (i == (msg_pkts - 1)); + EXPECT_EQ(addPacket(CSeqNo::incseq(m_init_seqno, i), pb_first, pb_last), 0); + ackPackets(1); + if (!pb_last) + { +#if ENABLE_NEW_RCVBUFFER + EXPECT_FALSE(m_rcv_buffer->isRcvDataReady()); + EXPECT_FALSE(hasAvailablePackets()); +#else + // BUG: The old buffer returns true (read-readiness). + EXPECT_TRUE(m_rcv_buffer->isRcvDataReady()); + EXPECT_TRUE(hasAvailablePackets()); +#endif + EXPECT_EQ(readMessage(buff.data(), buff.size()), 0); + } + } + + // Read the whole message. + EXPECT_TRUE(m_rcv_buffer->isRcvDataReady()); + EXPECT_TRUE(hasAvailablePackets()); + + const int res = readMessage(buff.data(), buff.size()); + EXPECT_EQ(res, msg_bytelen); + for (size_t i = 0; i < msg_pkts; ++i) + { + const ptrdiff_t offset = i * m_payload_sz; + EXPECT_TRUE(verifyPayload(buff.data() + offset, m_payload_sz, CSeqNo::incseq(m_init_seqno, i))); + } + EXPECT_EQ(m_unit_queue->size(), m_unit_queue->capacity()); +} + +#if ENABLE_NEW_RCVBUFFER +// One message (4 packets) is added to the buffer. Can be read out of order. +// Reading should be possible even before the missing packet is dropped. +TEST_F(CRcvBufferReadMsg, MsgOutOfOrderDrop) +{ + const size_t msg_pkts = 4; + // 1. Add one message (4 packets) without acknowledging + const int msg_seqno = m_init_seqno + 1; // seqno of the first packet in the message + EXPECT_EQ(addMessage(msg_pkts, msg_seqno, true), 0); + EXPECT_TRUE(m_rcv_buffer->isRcvDataReady()); + + // 2. Read full message after gap. + const size_t msg_bytelen = msg_pkts * m_payload_sz; + array buff; + int res = m_rcv_buffer->readMessage(buff.data(), buff.size()); + EXPECT_EQ(res, msg_bytelen); + for (size_t i = 0; i < msg_pkts; ++i) + { + EXPECT_TRUE(verifyPayload(buff.data() + i * m_payload_sz, m_payload_sz, msg_seqno + i)); + } + + EXPECT_FALSE(m_rcv_buffer->isRcvDataReady()); + + // Can't add to the same message + EXPECT_EQ(addMessage(msg_pkts, msg_seqno, true), -1); + + const auto pkt_info = m_rcv_buffer->getFirstValidPacketInfo(); + EXPECT_EQ(pkt_info.seqno, -1); // Nothing to read + EXPECT_TRUE(srt::sync::is_zero(pkt_info.tsbpd_time)); + + // Drop missing packet + m_rcv_buffer->dropUpTo(msg_seqno); + EXPECT_FALSE(m_rcv_buffer->isRcvDataReady()); + // All memory units are expected to be freed. + EXPECT_EQ(m_unit_queue->size(), m_unit_queue->capacity()); +} + +// One message (4 packets) is added to the buffer after a message with "in order" flag. +// Read in order +TEST_F(CRcvBufferReadMsg, MsgOutOfOrderAfterInOrder) +{ + const size_t msg_pkts = 4; + // 1. Add one packet with inOrder=true and one message (4 packets) with inOrder=false + EXPECT_EQ(addMessage(msg_pkts, m_init_seqno + 2 * msg_pkts, true), 0); + EXPECT_EQ(addMessage(msg_pkts, m_init_seqno, false), 0); + EXPECT_EQ(addMessage(msg_pkts, m_init_seqno + msg_pkts, true), 0); + EXPECT_TRUE(m_rcv_buffer->isRcvDataReady()); + + // 2. Read messages in order + const size_t msg_bytelen = msg_pkts * m_payload_sz; + std::array buff; + for (int msg_i = 0; msg_i < 3; ++msg_i) + { + EXPECT_TRUE(m_rcv_buffer->isRcvDataReady()); + EXPECT_EQ(m_rcv_buffer->readMessage(buff.data(), buff.size()), msg_bytelen); + for (size_t i = 0; i < msg_pkts; ++i) + { + EXPECT_TRUE(verifyPayload(buff.data() + i * m_payload_sz, m_payload_sz, m_init_seqno + msg_i * msg_pkts + i)); + } + } + + EXPECT_FALSE(m_rcv_buffer->isRcvDataReady()); } + +/// One packet is added to the buffer. Can be read on TSBPD-readiness. +/// +/// 1. insert +/// | +/// +---+ ---+---+---+---+---+ +---+ +/// | 1 | 0 | 0 | 0 | 0 | 0 |...| 0 | m_pUnit[] +/// +---+ ---+---+---+---+---+ +---+ +/// | +/// 2. read +/// +TEST_F(CRcvBufferReadMsg, OnePacketTSBPD) +{ + const size_t msg_pkts = 1; + + m_rcv_buffer->setTsbPdMode(m_tsbpd_base, false, m_delay); + + const int packet_ts = 0; + // Adding one message. Note that all packets have the out of order flag + // set to false by default in TSBPD mode, but this flag is ignored. + EXPECT_EQ(addMessage(msg_pkts, m_init_seqno, true, packet_ts), 0); + + const size_t msg_bytelen = msg_pkts * m_payload_sz; + array buff; + + // Confirm adding to the same location returns an error. + EXPECT_EQ(addMessage(msg_pkts, m_init_seqno, true, packet_ts), -1); + + // There is one packet in the buffer, but not ready to read after delay/2 + EXPECT_FALSE(m_rcv_buffer->isRcvDataReady(m_tsbpd_base + (m_delay / 2))); + EXPECT_FALSE(m_rcv_buffer->isRcvDataReady(m_tsbpd_base + m_delay - sync::microseconds_from(1))); + // There is one packet in the buffer ready to read after delay + EXPECT_TRUE(m_rcv_buffer->isRcvDataReady(m_tsbpd_base + m_delay)); + EXPECT_TRUE(m_rcv_buffer->isRcvDataReady(m_tsbpd_base + m_delay + sync::microseconds_from(1))); + + // Read out the first message + const int read_len = m_rcv_buffer->readMessage(buff.data(), buff.size()); + EXPECT_EQ(read_len, msg_bytelen); + EXPECT_TRUE(verifyPayload(buff.data(), read_len, m_init_seqno)); + + // Check the state after a packet was read + EXPECT_FALSE(m_rcv_buffer->isRcvDataReady(m_tsbpd_base + m_delay)); + EXPECT_EQ(addMessage(msg_pkts, m_init_seqno, false), -2); + + EXPECT_FALSE(m_rcv_buffer->isRcvDataReady(m_tsbpd_base + m_delay)); +} + +/// TSBPD = ON, a ready to play packet is preceeded by a missing packet. +/// The read-rediness must be signalled, and a packet must be read after the missing +/// one is dropped. +/// The TSBPD delay is set to 200 ms. This means, that the packet can be played +/// not earlier than after 200200 microseconds from the peer start time. +/// The peer start time is set to 100000 us. +/// +/// +/// || +/// | / +/// | / +/// | | +/// +---+---+---+---+---+---+ +---+ +/// | 0 | 1 | 0 | 0 | 0 | 0 |...| 0 | m_pUnit[] +/// +---+---+---+---+---+---+ +---+ +/// | | +/// | \__last pkt received +/// | +/// \___ m_iStartPos: first message to read +/// \___ m_iLastAckPos: last ack sent +/// +/// m_pUnit[i]->m_iFlag: 0:free, 1:good, 2:passack, 3:dropped +/// +TEST_F(CRcvBufferReadMsg, TSBPDGapBeforeValid) +{ + m_rcv_buffer->setTsbPdMode(m_tsbpd_base, false, m_delay); + // Add a solo packet to position m_init_seqno + 1 with timestamp 200 us + const int seqno = m_init_seqno + 1; + const int32_t pkt_ts = 200; + EXPECT_EQ(addMessage(1, seqno, false, pkt_ts), 0); + + const auto readready_timestamp = m_tsbpd_base + sync::microseconds_from(pkt_ts) + m_delay; + // Check that getFirstValidPacketInfo() returns first valid packet. + const auto pkt_info = m_rcv_buffer->getFirstValidPacketInfo(); + EXPECT_EQ(pkt_info.tsbpd_time, readready_timestamp); + EXPECT_EQ(pkt_info.seqno, seqno); + EXPECT_TRUE(pkt_info.seq_gap); + + // The packet can't be read because there is a missing packet preceeding. + EXPECT_FALSE(m_rcv_buffer->isRcvDataReady(readready_timestamp)); + + const int seq_gap_len = CSeqNo::seqoff(m_rcv_buffer->getStartSeqNo(), pkt_info.seqno); + EXPECT_GT(seq_gap_len, 0); + if (seq_gap_len > 0) + { + m_rcv_buffer->dropUpTo(pkt_info.seqno); + } + + EXPECT_TRUE(m_rcv_buffer->isRcvDataReady(readready_timestamp)); + + const size_t msg_bytelen = m_payload_sz; + array buff; + EXPECT_EQ(readMessage(buff.data(), buff.size()), msg_bytelen); + EXPECT_TRUE(verifyPayload(buff.data(), m_payload_sz, seqno)); + EXPECT_EQ(m_unit_queue->size(), m_unit_queue->capacity()); +} + +#endif // ENABEL_NEW_RCVBUFFER diff --git a/test/test_epoll.cpp b/test/test_epoll.cpp index 8fb36be53..4644e3267 100644 --- a/test/test_epoll.cpp +++ b/test/test_epoll.cpp @@ -718,7 +718,7 @@ class TestEPoll: public testing::Test ASSERT_EQ(rlen, 1); // get exactly one read event without writes ASSERT_EQ(wlen, 0); // get exactly one read event without writes - ASSERT_EQ(read[0], acpsock); // read event is for bind socket + ASSERT_EQ(read[0], acpsock); // read event is for bind socket } char buffer[1316];