Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make WriteHandler an ExchangeDelegate #14821

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/app/InteractionModelEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ void InteractionModelEngine::Shutdown()

for (auto & writeHandler : mWriteHandlers)
{
VerifyOrDie(writeHandler.IsFree());
writeHandler.Abort();
}

mReportingEngine.Shutdown();
Expand Down
81 changes: 78 additions & 3 deletions src/app/WriteHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* limitations under the License.
*/

#include "messaging/ExchangeContext.h"
#include <app/AppBuildConfig.h>
#include <app/InteractionModelEngine.h>
#include <app/MessageDef/EventPathIB.h>
Expand Down Expand Up @@ -48,19 +49,66 @@ CHIP_ERROR WriteHandler::Init()
return CHIP_NO_ERROR;
}

void WriteHandler::Shutdown()
void WriteHandler::Close()
{
VerifyOrReturn(mState != State::Uninitialized);

mMessageWriter.Reset();
mpExchangeCtx = nullptr;

if (mpExchangeCtx != nullptr)
{
mpExchangeCtx->SetDelegate(nullptr);
mpExchangeCtx = nullptr;
}

ClearState();
}

void WriteHandler::Abort()
{
#if 0
// TODO: When chunking gets added, we should add this back.
//
// If the exchange context hasn't already been gracefully closed
// (signaled by setting it to null), then we need to forcibly
// tear it down.
//
if (mpExchangeCtx != nullptr)
{
// We might be a delegate for this exchange, and we don't want the
// OnExchangeClosing notification in that case. Null out the delegate
// to avoid that.
//
// TODO: This makes all sorts of assumptions about what the delegate is
// (notice the "might" above!) that might not hold in practice. We
// really need a better solution here....
mpExchangeCtx->SetDelegate(nullptr);
mpExchangeCtx->Abort();
mpExchangeCtx = nullptr;
}

ClearState();
#else
//
// The WriteHandler should get synchronously allocated and destroyed in the same execution
// context given that it's just a 2 message exchange (request + response). Consequently, we should
// never arrive at a situation where we have active handlers at any time Abort() is called.
//
VerifyOrDie(mState == State::Uninitialized);
#endif
}

Status WriteHandler::OnWriteRequest(Messaging::ExchangeContext * apExchangeContext, System::PacketBufferHandle && aPayload,
bool aIsTimedWrite)
{
mpExchangeCtx = apExchangeContext;

//
// Let's take over further message processing on this exchange from the IM.
// This is only relevant during chunked requests.
//
mpExchangeCtx->SetDelegate(this);

Status status = ProcessWriteRequest(std::move(aPayload), aIsTimedWrite);

// Do not send response on Group Write
Expand All @@ -73,10 +121,37 @@ Status WriteHandler::OnWriteRequest(Messaging::ExchangeContext * apExchangeConte
}
}

Shutdown();
Close();
return status;
}

CHIP_ERROR WriteHandler::OnMessageReceived(Messaging::ExchangeContext * apExchangeContext, const PayloadHeader & aPayloadHeader,
System::PacketBufferHandle && aPayload)
{
//
// As part of write handling, the exchange should get closed sychronously given there is always
// just a single response to a Write Request message before the exchange gets closed. There-after,
// even if we get any more messages on that exchange from a non-compliant client, our exchange layer
// should correctly discard those. If there is a bug there, this function here may get invoked.
//
// NOTE: Once chunking gets implemented, this will no longer be true.
//
VerifyOrDieWithMsg(false, DataManagement, "This function should never get invoked");
}

void WriteHandler::OnResponseTimeout(Messaging::ExchangeContext * apExchangeContext)
{
//
// As part of write handling, the exchange should get closed sychronously given there is always
// just a single response to a Write Request message before the exchange gets closed. That response
// does not solicit any further responses back. Consequently, we never expect to get notified
// of any response timeouts.
//
// NOTE: Once chunking gets implemented, this will no longer be true.
//
VerifyOrDieWithMsg(false, DataManagement, "This function should never get invoked");
}

CHIP_ERROR WriteHandler::FinalizeMessage(System::PacketBufferHandle & packet)
{
VerifyOrReturnError(mState == State::AddStatus, CHIP_ERROR_INCORRECT_STATE);
Expand Down
20 changes: 16 additions & 4 deletions src/app/WriteHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ namespace app {
/**
* @brief The write handler is responsible for processing a write request and sending a write reply.
*/
class WriteHandler
class WriteHandler : public Messaging::ExchangeDelegate
mrjerryjohns marked this conversation as resolved.
Show resolved Hide resolved
{
public:
/**
* Initialize the WriteHandler. Within the lifetime
* of this instance, this method is invoked once after object
* construction until a call to Shutdown is made to terminate the
* construction until a call to Close is made to terminate the
* instance.
*
* @retval #CHIP_ERROR_INCORRECT_STATE If the state is not equal to
Expand All @@ -53,7 +53,7 @@ class WriteHandler

/**
* Process a write request. Parts of the processing may end up being asynchronous, but the WriteHandler
* guarantees that it will call Shutdown on itself when processing is done (including if OnWriteRequest
* guarantees that it will call Close on itself when processing is done (including if OnWriteRequest
* returns an error).
*
* @param[in] apExchangeContext A pointer to the ExchangeContext.
Expand All @@ -66,6 +66,12 @@ class WriteHandler
Protocols::InteractionModel::Status OnWriteRequest(Messaging::ExchangeContext * apExchangeContext,
System::PacketBufferHandle && aPayload, bool aIsTimedWrite);

/*
* This forcibly closes the exchange context if a valid one is pointed to and de-initializes the object. Such a situation does
* not arise during normal message processing flows that all normally call Close() below.
*/
void Abort();

bool IsFree() const { return mState == State::Uninitialized; }

virtual ~WriteHandler() = default;
Expand Down Expand Up @@ -111,8 +117,14 @@ class WriteHandler
/**
* Clean up state when we are done sending the write response.
*/
void Shutdown();
void Close();

private: // ExchangeDelegate
CHIP_ERROR OnMessageReceived(Messaging::ExchangeContext * apExchangeContext, const PayloadHeader & aPayloadHeader,
System::PacketBufferHandle && aPayload) override;
void OnResponseTimeout(Messaging::ExchangeContext * apExchangeContext) override;

private:
Messaging::ExchangeContext * mpExchangeCtx = nullptr;
WriteResponseMessage::Builder mWriteResponseBuilder;
System::PacketBufferTLVWriter mMessageWriter;
Expand Down
40 changes: 32 additions & 8 deletions src/app/tests/TestWriteInteraction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,22 @@ void TestWriteInteraction::TestWriteHandler(nlTestSuite * apSuite, void * apCont

TestContext & ctx = *static_cast<TestContext *>(apContext);

//
// We have to enable async dispatch here to ensure that the exchange
// gets correctly closed out in the test below. Otherwise, the following happens:
//
// 1. WriteHandler generates a response upon OnWriteRequest being called.
// 2. Since there is no matching active client-side exchange for that request, the IM engine
// handles it incorrectly and treats it like an unsolicited message.
// 3. It is invalid to receive a WriteResponse as an unsolicited message so it correctly sends back
// a StatusResponse containing an error to that message.
// 4. Without unwinding the existing call stack, a response is received on the same exchange that the handler
// generated a WriteResponse on. This exchange should have been closed in a normal execution model, but in
// a synchronous model, the exchange is still open, and the status response is sent to the WriteHandler.
// 5. WriteHandler::OnMessageReceived is invoked, and it correctly asserts.
//
ctx.EnableAsyncDispatch();

constexpr bool allBooleans[] = { true, false };
for (auto messageIsTimed : allBooleans)
{
Expand All @@ -278,27 +294,35 @@ void TestWriteInteraction::TestWriteHandler(nlTestSuite * apSuite, void * apCont

TestExchangeDelegate delegate;
Messaging::ExchangeContext * exchange = ctx.NewExchangeToBob(&delegate);
Status status = writeHandler.OnWriteRequest(exchange, std::move(buf), transactionIsTimed);

Status status = writeHandler.OnWriteRequest(exchange, std::move(buf), transactionIsTimed);
if (messageIsTimed == transactionIsTimed)
{
NL_TEST_ASSERT(apSuite, status == Status::Success);
}
else
{
NL_TEST_ASSERT(apSuite, status == Status::UnsupportedAccess);
// In the normal code flow, the exchange would now get closed
// when we send the error status on it (of if that fails when
// the stack unwinds). In the success case it's been closed
// already by the WriteHandler sending the response on it, but
// if we are in the error case we need to make sure it gets
// closed.
//
// In a normal execution flow, the exchange manager would have closed out the exchange after the
// message dispatch call path had unwound. In this test however, we've manually allocated the exchange
// ourselves (as opposed to the exchange manager), so we need to take ownership of closing out the exchange.
//
// Note that this doesn't happen in the success case above, since that results in a call to send a message through
// the exchange context, which results in the exchange manager correctly closing it.
//
exchange->Close();
NL_TEST_ASSERT(apSuite, status == Status::UnsupportedAccess);
}

ctx.DrainAndServiceIO();
ctx.DrainAndServiceIO();
bzbarsky-apple marked this conversation as resolved.
Show resolved Hide resolved

Messaging::ReliableMessageMgr * rm = ctx.GetExchangeManager().GetReliableMessageMgr();
NL_TEST_ASSERT(apSuite, rm->TestGetCountRetransTable() == 0);
}
}

ctx.DisableAsyncDispatch();
}

CHIP_ERROR WriteSingleClusterData(const Access::SubjectDescriptor & aSubjectDescriptor, ClusterInfo & aClusterInfo,
Expand Down
4 changes: 2 additions & 2 deletions src/controller/tests/data_model/TestRead.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ void TestReadInteraction::TestReadHandlerResourceExhaustion_MultipleSubscription
auto onFailureCb = [&apSuite, &numFailureCalls](const app::ConcreteAttributePath * attributePath, CHIP_ERROR aError) {
numFailureCalls++;

NL_TEST_ASSERT(apSuite, aError == CHIP_IM_GLOBAL_STATUS(Protocols::InteractionModel::Status::ResourceExhausted));
NL_TEST_ASSERT(apSuite, aError == CHIP_IM_GLOBAL_STATUS(ResourceExhausted));
NL_TEST_ASSERT(apSuite, attributePath == nullptr);
};

Expand Down Expand Up @@ -499,7 +499,7 @@ void TestReadInteraction::TestReadHandlerResourceExhaustion_MultipleReads(nlTest
auto onFailureCb = [&apSuite, &numFailureCalls](const app::ConcreteAttributePath * attributePath, CHIP_ERROR aError) {
numFailureCalls++;

NL_TEST_ASSERT(apSuite, aError == CHIP_IM_GLOBAL_STATUS(Protocols::InteractionModel::Status::ResourceExhausted));
NL_TEST_ASSERT(apSuite, aError == CHIP_IM_GLOBAL_STATUS(ResourceExhausted));
NL_TEST_ASSERT(apSuite, attributePath == nullptr);
};

Expand Down
8 changes: 6 additions & 2 deletions src/lib/core/CHIPError.h
Original file line number Diff line number Diff line change
Expand Up @@ -400,9 +400,13 @@ using CHIP_ERROR = ::chip::ChipError;

#define CHIP_CORE_ERROR(e) CHIP_SDK_ERROR(::chip::ChipError::SdkPart::kCore, (e))

#define CHIP_IM_GLOBAL_STATUS(e) CHIP_SDK_ERROR(::chip::ChipError::SdkPart::kIMGlobalStatus, to_underlying(e))
#define CHIP_IM_GLOBAL_STATUS(type) \
CHIP_SDK_ERROR(::chip::ChipError::SdkPart::kIMGlobalStatus, to_underlying(Protocols::InteractionModel::Status::type))

#define CHIP_IM_CLUSTER_STATUS(e) CHIP_SDK_ERROR(::chip::ChipError::SdkPart::kIMClusterStatus, e)
//
// type must be a compile-time constant as mandated by CHIP_SDK_ERROR.
//
#define CHIP_IM_CLUSTER_STATUS(type) CHIP_SDK_ERROR(::chip::ChipError::SdkPart::kIMClusterStatus, type)

// clang-format off

Expand Down
14 changes: 14 additions & 0 deletions src/messaging/tests/MessagingContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,20 @@ class LoopbackMessagingContext : public MessagingContext
impl.EnableAsyncDispatch(&mIOContext.GetSystemLayer());
}

/*
* Reset the dispatch back to a model that synchronously dispatches received messages up the stack.
*
* NOTE: This results in highly atypical/complex call stacks that are not representative of what happens on real
* devices and can cause subtle and complex bugs to either appear or get masked in the system. Where possible, please
* use this sparingly!
*
*/
void DisableAsyncDispatch()
{
auto & impl = GetLoopback();
impl.DisableAsyncDispatch();
}

/*
* This drives the servicing of events using the embedded IOContext while there are pending
* messages in the loopback transport's pending message queue. This should run to completion
Expand Down
9 changes: 9 additions & 0 deletions src/transport/raw/tests/NetworkTestHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ class LoopbackTransport : public Transport::Base
mAsyncMessageDispatch = true;
}

/*
* Reset the dispatch back to a model that synchronously dispatches received messages up the stack.
*
* NOTE: This results in highly atypical/complex call stacks that are not representative of what happens on real
* devices and can cause subtle and complex bugs to either appear or get masked in the system. Where possible, please
* use this sparingly!
*/
void DisableAsyncDispatch() { mAsyncMessageDispatch = false; }

bool HasPendingMessages() { return !mPendingMessageQueue.empty(); }

static void OnMessageReceived(System::Layer * aSystemLayer, void * aAppState)
Expand Down