Skip to content

Commit

Permalink
Ensure that MRP logic is not executed for messages over other Transpo…
Browse files Browse the repository at this point in the history
…rts (project-chip#30124)

* Ensure that MRP logic is not executed for messages over other Transports

MRP based Ack handling and retransmissions should not be performed when
messages are not sent using MRP, e.g., over TCP.
Install guards on Send/Receive paths to steer traffic to bypass MRP
logic when going over TCP.

* Rename IsTransportTCP() to AllowsLargePayload()
  • Loading branch information
pidarped authored Nov 24, 2023
1 parent b5dfe72 commit d82ce57
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 77 deletions.
84 changes: 45 additions & 39 deletions src/messaging/ExchangeContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ CHIP_ERROR ExchangeContext::SendMessage(Protocols::Id protocolId, uint8_t msgTyp
// If session requires MRP, NoAutoRequestAck send flag is not specified and is not a group exchange context, request reliable
// transmission.
bool reliableTransmissionRequested =
GetSessionHandle()->RequireMRP() && !sendFlags.Has(SendMessageFlags::kNoAutoRequestAck) && !IsGroupExchangeContext();
GetSessionHandle()->AllowsMRP() && !sendFlags.Has(SendMessageFlags::kNoAutoRequestAck) && !IsGroupExchangeContext();

bool currentMessageExpectResponse = false;
// If a response message is expected...
Expand Down Expand Up @@ -322,8 +322,8 @@ ExchangeContext::ExchangeContext(ExchangeManager * em, uint16_t ExchangeId, cons

SetAckPending(false);

// Do not request Ack for multicast
SetAutoRequestAck(!session->IsGroupSession());
// Try to use MRP by default, if it is allowed.
SetAutoRequestAck(session->AllowsMRP());

#if CHIP_CONFIG_ENABLE_ICD_SERVER
app::ICDNotifier::GetInstance().BroadcastActiveRequestNotification(app::ICDListener::KeepActiveFlag::kExchangeContextOpen);
Expand Down Expand Up @@ -531,58 +531,64 @@ CHIP_ERROR ExchangeContext::HandleMessage(uint32_t messageCounter, const Payload
MessageHandled();
});

if (mDispatch.IsReliableTransmissionAllowed() && !IsGroupExchangeContext())
if (mSession->AllowsMRP())
{
if (!msgFlags.Has(MessageFlagValues::kDuplicateMessage) && payloadHeader.IsAckMsg() &&
payloadHeader.GetAckMessageCounter().HasValue())
if (mDispatch.IsReliableTransmissionAllowed())
{
HandleRcvdAck(payloadHeader.GetAckMessageCounter().Value());
if (!msgFlags.Has(MessageFlagValues::kDuplicateMessage) && payloadHeader.IsAckMsg() &&
payloadHeader.GetAckMessageCounter().HasValue())
{
HandleRcvdAck(payloadHeader.GetAckMessageCounter().Value());
}

if (payloadHeader.NeedsAck())
{
// An acknowledgment needs to be sent back to the peer for this message on this exchange,
HandleNeedsAck(messageCounter, msgFlags);
}
}

if (payloadHeader.NeedsAck())
if (IsAckPending() && !mDelegate)
{
// An acknowledgment needs to be sent back to the peer for this message on this exchange,
HandleNeedsAck(messageCounter, msgFlags);
// The incoming message wants an ack, but we have no delegate, so
// there's not going to be a response to piggyback on. Just flush the
// ack out right now.
ReturnErrorOnFailure(FlushAcks());
}
}

if (IsAckPending() && !mDelegate)
{
// The incoming message wants an ack, but we have no delegate, so
// there's not going to be a response to piggyback on. Just flush the
// ack out right now.
ReturnErrorOnFailure(FlushAcks());
}

// The SecureChannel::StandaloneAck message type is only used for MRP; do not pass such messages to the application layer.
if (isStandaloneAck)
{
return CHIP_NO_ERROR;
}
// The SecureChannel::StandaloneAck message type is only used for MRP; do not pass such messages to the application layer.
if (isStandaloneAck)
{
return CHIP_NO_ERROR;
}
} // AllowsMRP

// Since the message is duplicate, let's not forward it up the stack
if (isDuplicate)
{
return CHIP_NO_ERROR;
}

if (IsEphemeralExchange())
if (mSession->AllowsMRP())
{
// The EphemeralExchange has done its job, since StandaloneAck is sent in previous FlushAcks() call.
return CHIP_NO_ERROR;
}
if (IsEphemeralExchange())
{
// The EphemeralExchange has done its job, since StandaloneAck is sent in previous FlushAcks() call.
return CHIP_NO_ERROR;
}

if (IsWaitingForAck())
{
// The only way we can get here is a spec violation on the other side:
// we sent a message that needs an ack, and the other side responded
// with a message that does not contain an ack for the message we sent.
// Just drop this message; if we delivered it to our delegate it might
// try to send another message-needing-an-ack in response, which would
// violate our internal invariants.
ChipLogError(ExchangeManager, "Dropping message without piggyback ack when we are waiting for an ack.");
return CHIP_ERROR_INCORRECT_STATE;
}
if (IsWaitingForAck())
{
// The only way we can get here is a spec violation on the other side:
// we sent a message that needs an ack, and the other side responded
// with a message that does not contain an ack for the message we sent.
// Just drop this message; if we delivered it to our delegate it might
// try to send another message-needing-an-ack in response, which would
// violate our internal invariants.
ChipLogError(ExchangeManager, "Dropping message without piggyback ack when we are waiting for an ack.");
return CHIP_ERROR_INCORRECT_STATE;
}
} // AllowsMRP

#if CHIP_CONFIG_ENABLE_ICD_SERVER
// message received
Expand Down
78 changes: 47 additions & 31 deletions src/messaging/ExchangeMessageDispatch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,45 +51,61 @@ CHIP_ERROR ExchangeMessageDispatch::SendMessage(SessionManager * sessionManager,
PayloadHeader payloadHeader;
payloadHeader.SetExchangeID(exchangeId).SetMessageType(protocol, type).SetInitiator(isInitiator);

// If there is a pending acknowledgment piggyback it on this message.
if (reliableMessageContext->HasPiggybackAckPending())
if (session->AllowsMRP())
{
payloadHeader.SetAckMessageCounter(reliableMessageContext->TakePendingPeerAckMessageCounter());
}

if (IsReliableTransmissionAllowed() && reliableMessageContext->AutoRequestAck() &&
reliableMessageContext->GetReliableMessageMgr() != nullptr && isReliableTransmission)
{
auto * reliableMessageMgr = reliableMessageContext->GetReliableMessageMgr();

payloadHeader.SetNeedsAck(true);

ReliableMessageMgr::RetransTableEntry * entry = nullptr;

// Add to Table for subsequent sending
ReturnErrorOnFailure(reliableMessageMgr->AddToRetransTable(reliableMessageContext, &entry));
auto deleter = [reliableMessageMgr](ReliableMessageMgr::RetransTableEntry * e) {
reliableMessageMgr->ClearRetransTable(*e);
};
std::unique_ptr<ReliableMessageMgr::RetransTableEntry, decltype(deleter)> entryOwner(entry, deleter);

ReturnErrorOnFailure(sessionManager->PrepareMessage(session, payloadHeader, std::move(message), entryOwner->retainedBuf));
CHIP_ERROR err = sessionManager->SendPreparedMessage(session, entryOwner->retainedBuf);
err = ReliableMessageMgr::MapSendError(err, exchangeId, isInitiator);
ReturnErrorOnFailure(err);
reliableMessageMgr->StartRetransmision(entryOwner.release());
// If there is a pending acknowledgment piggyback it on this message.
if (reliableMessageContext->HasPiggybackAckPending())
{
payloadHeader.SetAckMessageCounter(reliableMessageContext->TakePendingPeerAckMessageCounter());
}

if (IsReliableTransmissionAllowed() && reliableMessageContext->AutoRequestAck() &&
reliableMessageContext->GetReliableMessageMgr() != nullptr && isReliableTransmission)
{
auto * reliableMessageMgr = reliableMessageContext->GetReliableMessageMgr();

payloadHeader.SetNeedsAck(true);

ReliableMessageMgr::RetransTableEntry * entry = nullptr;

// Add to Table for subsequent sending
ReturnErrorOnFailure(reliableMessageMgr->AddToRetransTable(reliableMessageContext, &entry));
auto deleter = [reliableMessageMgr](ReliableMessageMgr::RetransTableEntry * e) {
reliableMessageMgr->ClearRetransTable(*e);
};
std::unique_ptr<ReliableMessageMgr::RetransTableEntry, decltype(deleter)> entryOwner(entry, deleter);

ReturnErrorOnFailure(
sessionManager->PrepareMessage(session, payloadHeader, std::move(message), entryOwner->retainedBuf));
CHIP_ERROR err = sessionManager->SendPreparedMessage(session, entryOwner->retainedBuf);
err = ReliableMessageMgr::MapSendError(err, exchangeId, isInitiator);
ReturnErrorOnFailure(err);
reliableMessageMgr->StartRetransmision(entryOwner.release());
}
else
{
ReturnErrorOnFailure(PrepareAndSendNonMRPMessage(sessionManager, session, payloadHeader, std::move(message)));
}
}
else
{
// If the channel itself is providing reliability, let's not request MRP acks
payloadHeader.SetNeedsAck(false);
EncryptedPacketBufferHandle preparedMessage;
ReturnErrorOnFailure(sessionManager->PrepareMessage(session, payloadHeader, std::move(message), preparedMessage));
ReturnErrorOnFailure(sessionManager->SendPreparedMessage(session, preparedMessage));
ReturnErrorOnFailure(PrepareAndSendNonMRPMessage(sessionManager, session, payloadHeader, std::move(message)));
}

return CHIP_NO_ERROR;
}

CHIP_ERROR ExchangeMessageDispatch::PrepareAndSendNonMRPMessage(SessionManager * sessionManager, const SessionHandle & session,
PayloadHeader & payloadHeader,
System::PacketBufferHandle && message)
{
payloadHeader.SetNeedsAck(false);
EncryptedPacketBufferHandle preparedMessage;
ReturnErrorOnFailure(sessionManager->PrepareMessage(session, payloadHeader, std::move(message), preparedMessage));
ReturnErrorOnFailure(sessionManager->SendPreparedMessage(session, preparedMessage));

return CHIP_NO_ERROR;
}

} // namespace Messaging
} // namespace chip
4 changes: 4 additions & 0 deletions src/messaging/ExchangeMessageDispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ class ExchangeMessageDispatch

// TODO: remove IsReliableTransmissionAllowed, this function should be provided over session.
virtual bool IsReliableTransmissionAllowed() const { return true; }

private:
CHIP_ERROR PrepareAndSendNonMRPMessage(SessionManager * sessionManager, const SessionHandle & session,
PayloadHeader & payloadHeader, System::PacketBufferHandle && message);
};

} // namespace Messaging
Expand Down
6 changes: 4 additions & 2 deletions src/messaging/ExchangeMgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,10 @@ void ExchangeManager::SendStandaloneAckIfNeeded(const PacketHeader & packetHeade
const SessionHandle & session, MessageFlags msgFlags,
System::PacketBufferHandle && msgBuf)
{
// If we need to send a StandaloneAck, create a EphemeralExchange for the purpose to send the StandaloneAck
if (!payloadHeader.NeedsAck())

// If using the MRP protocol and we need to send a StandaloneAck, create an EphemeralExchange to send
// the StandaloneAck.
if (!session->AllowsMRP() || !payloadHeader.NeedsAck())
return;

// If rcvd msg is from initiator then this exchange is created as not Initiator.
Expand Down
6 changes: 4 additions & 2 deletions src/transport/GroupSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class IncomingGroupSession : public Session, public ReferenceCounted<IncomingGro
return subjectDescriptor;
}

bool RequireMRP() const override { return false; }
bool AllowsMRP() const override { return false; }
bool AllowsLargePayload() const override { return false; }

const SessionParameters & GetRemoteSessionParameters() const override
{
Expand Down Expand Up @@ -108,7 +109,8 @@ class OutgoingGroupSession : public Session, public ReferenceCounted<OutgoingGro
return Access::SubjectDescriptor(); // no subject exists for outgoing group session.
}

bool RequireMRP() const override { return false; }
bool AllowsMRP() const override { return false; }
bool AllowsLargePayload() const override { return false; }

const SessionParameters & GetRemoteSessionParameters() const override
{
Expand Down
4 changes: 3 additions & 1 deletion src/transport/SecureSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@ class SecureSession : public Session, public ReferenceCounted<SecureSession, Sec

Access::SubjectDescriptor GetSubjectDescriptor() const override;

bool RequireMRP() const override { return GetPeerAddress().GetTransportType() == Transport::Type::kUdp; }
bool AllowsMRP() const override { return GetPeerAddress().GetTransportType() == Transport::Type::kUdp; }

bool AllowsLargePayload() const override { return GetPeerAddress().GetTransportType() == Transport::Type::kTcp; }

System::Clock::Milliseconds32 GetAckTimeout() const override
{
Expand Down
3 changes: 2 additions & 1 deletion src/transport/Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ class Session
virtual ScopedNodeId GetPeer() const = 0;
virtual ScopedNodeId GetLocalScopedNodeId() const = 0;
virtual Access::SubjectDescriptor GetSubjectDescriptor() const = 0;
virtual bool RequireMRP() const = 0;
virtual bool AllowsMRP() const = 0;
virtual bool AllowsLargePayload() const = 0;
virtual const SessionParameters & GetRemoteSessionParameters() const = 0;
virtual System::Clock::Timestamp GetMRPBaseTimeout() const = 0;
virtual System::Clock::Milliseconds32 GetAckTimeout() const = 0;
Expand Down
4 changes: 3 additions & 1 deletion src/transport/UnauthenticatedSessionTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ class UnauthenticatedSession : public Session, public ReferenceCounted<Unauthent
return Access::SubjectDescriptor(); // return an empty ISD for unauthenticated session.
}

bool RequireMRP() const override { return GetPeerAddress().GetTransportType() == Transport::Type::kUdp; }
bool AllowsMRP() const override { return GetPeerAddress().GetTransportType() == Transport::Type::kUdp; }

bool AllowsLargePayload() const override { return GetPeerAddress().GetTransportType() == Transport::Type::kTcp; }

System::Clock::Milliseconds32 GetAckTimeout() const override
{
Expand Down

0 comments on commit d82ce57

Please sign in to comment.