Skip to content

Commit

Permalink
[ICD] Server side subscription persistence and resumption (#24361)
Browse files Browse the repository at this point in the history
* [ICD] Server side subscription persistence and resumption

* restyled change

* Correct SimpleSubscriptionResumptionStorage TLV format documentation

* Fix ReadHandler resumption

* Correct state size estimate

* Replace %zu in log format

* Move TLV buffer off stack

* restyled

* Replaced vector with ScopedMemoryBufferWithSize and shim structs

* Fix struct member order

* Fix one more struct member order

* Fixed more stack buffer

* Fix copy/paste bug

* Moved SubscriptionList array to unique_ptr and dynamically allocated / off stack

* Moved SubscriptionIndex array to unique_ptr and dynamically allocated / off stack

* Fixed error condition checks

* Fixed array size check

* Addressed CI issues, and disabled subscription persistence and resumption for cc13x2_26x2 and CYW30739

* Addressed PR review comments, including:

 - ReadHandler constructor side effect moved to separate function
 - SubscriptionList and SubscriptionIndex member initialization moved to runtome
 - Improved error handling - remove stored info on error
 - Changed for loop indices to more descriptive names
 - Disabled feature by default except Linux and Darwin for CI testing
 - Added operator[] getter with index to access SubscriptionList and SubscriptionIndex elements

* Restyled and ReadHandler include

* ReadHandler Callback fix

* Fix ReadHandler Callback const argument

* Explicitly disable subscription persistence and address review comments

* Fixed priming reports on resumption

* Revamp subscription storage into flat structure and add unit test

* Fix unit test build warning and minor PR comment change

* Update src/app/SubscriptionResumptionStorage.h

Co-authored-by: Michael Sandstedt <michael.sandstedt@gmail.com>

* Minor changes to address PR comments

* Address PR review comments:

  Unit test structs constructed explicitly in place for clarity
  IM engine ResumeSubscriptions nullptr check and exit conditions fix

* Address PR comments:

  Nullptr checks
  Minor refactor
  Unit test fix

* Changed storage MaxCount mechanics to Init time clean up

* Clean up comments and unused commented-out old code

* Addressed PR comments:

  Removed AllocatedCount, and made AllocatedSize return count of elements

* Update src/app/SimpleSubscriptionResumptionStorage.cpp

Co-authored-by: Michael Sandstedt <michael.sandstedt@gmail.com>

* Update src/app/InteractionModelEngine.cpp

Co-authored-by: Michael Sandstedt <michael.sandstedt@gmail.com>

* Remove reference to previously removed variable for config that turns the feature off

* Addressed PR comments and enabled chip-tool for testing

  Added setters for SubscriptionInfo attribute and event paths
  Fixed wrong constant
  Enabled server interactions for chiptool

* Changed storage of attribute/event paths to proper List/Structure TLV

* Fixed attribute load

* Make Unit Test names more unique and tighten CHIP_CONFIG_PERSIST_SUBSCRIPTIONS usage

* Addressed PR comments and CI issues:

  Delete() error return clarification
  Comment doc cleanup
  Fix loop variable build warning
  Revert chip-tool server interactions enablement

Co-authored-by: Michael Sandstedt <michael.sandstedt@gmail.com>
  • Loading branch information
2 people authored and pull[bot] committed Dec 7, 2023
1 parent 661cafe commit 1439156
Show file tree
Hide file tree
Showing 19 changed files with 1,596 additions and 27 deletions.
3 changes: 3 additions & 0 deletions src/app/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,11 @@ static_library("app") {
"ReadHandler.cpp",
"RequiredPrivilege.cpp",
"RequiredPrivilege.h",
"SimpleSubscriptionResumptionStorage.cpp",
"SimpleSubscriptionResumptionStorage.h",
"StatusResponse.cpp",
"StatusResponse.h",
"SubscriptionResumptionStorage.h",
"TimedHandler.cpp",
"TimedHandler.h",
"TimedRequest.cpp",
Expand Down
47 changes: 43 additions & 4 deletions src/app/InteractionModelEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,16 @@ InteractionModelEngine * InteractionModelEngine::GetInstance()
}

CHIP_ERROR InteractionModelEngine::Init(Messaging::ExchangeManager * apExchangeMgr, FabricTable * apFabricTable,
CASESessionManager * apCASESessionMgr)
CASESessionManager * apCASESessionMgr,
SubscriptionResumptionStorage * subscriptionResumptionStorage)
{
VerifyOrReturnError(apFabricTable != nullptr, CHIP_ERROR_INVALID_ARGUMENT);
VerifyOrReturnError(apExchangeMgr != nullptr, CHIP_ERROR_INVALID_ARGUMENT);

mpExchangeMgr = apExchangeMgr;
mpFabricTable = apFabricTable;
mpCASESessionMgr = apCASESessionMgr;
mpExchangeMgr = apExchangeMgr;
mpFabricTable = apFabricTable;
mpCASESessionMgr = apCASESessionMgr;
mpSubscriptionResumptionStorage = subscriptionResumptionStorage;

ReturnErrorOnFailure(mpFabricTable->AddFabricDelegate(this));
ReturnErrorOnFailure(mpExchangeMgr->RegisterUnsolicitedMessageHandlerForProtocol(Protocols::InteractionModel::Id, this));
Expand Down Expand Up @@ -664,6 +666,8 @@ Status InteractionModelEngine::OnUnsolicitedReportData(Messaging::ExchangeContex
return Status::Success;
}

ChipLogDetail(InteractionModel, "Received report with invalid subscriptionId %" PRIu32, subscriptionId);

return Status::InvalidSubscription;
}

Expand Down Expand Up @@ -1578,5 +1582,40 @@ void InteractionModelEngine::OnFabricRemoved(const FabricTable & fabricTable, Fa
// the fabric removal, though, so they will fail when they try to actually send their command response
// and will close at that point.
}

CHIP_ERROR InteractionModelEngine::ResumeSubscriptions()
{
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
ReturnErrorCodeIf(!mpSubscriptionResumptionStorage, CHIP_NO_ERROR);

SubscriptionResumptionStorage::SubscriptionInfo subscriptionInfo;
auto * iterator = mpSubscriptionResumptionStorage->IterateSubscriptions();
while (iterator->Next(subscriptionInfo))
{
auto requestedAttributePathCount = subscriptionInfo.mAttributePaths.AllocatedSize();
auto requestedEventPathCount = subscriptionInfo.mEventPaths.AllocatedSize();
if (!EnsureResourceForSubscription(subscriptionInfo.mFabricIndex, requestedAttributePathCount, requestedEventPathCount))
{
ChipLogProgress(InteractionModel, "no resource for Subscription resumption");
iterator->Release();
return CHIP_ERROR_NO_MEMORY;
}

ReadHandler * handler = mReadHandlers.CreateObject(*this);
if (handler == nullptr)
{
ChipLogProgress(InteractionModel, "no resource for ReadHandler creation");
iterator->Release();
return CHIP_ERROR_NO_MEMORY;
}

handler->ResumeSubscription(*mpCASESessionMgr, subscriptionInfo);
}
iterator->Release();
#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS

return CHIP_NO_ERROR;
}

} // namespace app
} // namespace chip
9 changes: 8 additions & 1 deletion src/app/InteractionModelEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,
*
*/
CHIP_ERROR Init(Messaging::ExchangeManager * apExchangeMgr, FabricTable * apFabricTable,
CASESessionManager * apCASESessionMgr = nullptr);
CASESessionManager * apCASESessionMgr = nullptr,
SubscriptionResumptionStorage * subscriptionResumptionStorage = nullptr);

void Shutdown();

Expand Down Expand Up @@ -292,6 +293,10 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,
// virtual method from FabricTable::Delegate
void OnFabricRemoved(const FabricTable & fabricTable, FabricIndex fabricIndex) override;

SubscriptionResumptionStorage * GetSubscriptionResumptionStorage() { return mpSubscriptionResumptionStorage; };

CHIP_ERROR ResumeSubscriptions();

#if CONFIG_BUILD_FOR_HOST_UNIT_TEST
//
// Get direct access to the underlying read handler pool
Expand Down Expand Up @@ -596,6 +601,8 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,

CASESessionManager * mpCASESessionMgr = nullptr;

SubscriptionResumptionStorage * mpSubscriptionResumptionStorage = nullptr;

// A magic number for tracking values between stack Shutdown()-s and Init()-s.
// An ObjectHandle is valid iff. its magic equals to this one.
uint32_t mMagic = 0;
Expand Down
122 changes: 121 additions & 1 deletion src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon
InteractionType aInteractionType) :
mExchangeCtx(*this),
mManagementCallback(apCallback)
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
,
mOnConnectedCallback(HandleDeviceConnected, this), mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
#endif
{
VerifyOrDie(apExchangeContext != nullptr);

Expand All @@ -61,6 +65,54 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon
mSessionHandle.Grab(mExchangeCtx->GetSessionHandle());
}

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
ReadHandler::ReadHandler(ManagementCallback & apCallback) :
mExchangeCtx(*this), mManagementCallback(apCallback), mOnConnectedCallback(HandleDeviceConnected, this),
mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
{
mInteractionType = InteractionType::Subscribe;
mFlags.ClearAll();
}

void ReadHandler::ResumeSubscription(CASESessionManager & caseSessionManager,
SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo)
{
mSubscriptionId = subscriptionInfo.mSubscriptionId;
mMinIntervalFloorSeconds = subscriptionInfo.mMinInterval;
mMaxInterval = subscriptionInfo.mMaxInterval;
SetStateFlag(ReadHandlerFlags::FabricFiltered, subscriptionInfo.mFabricFiltered);

// Move dynamically allocated attributes and events from the SubscriptionInfo struct into
// the object pool managed by the IM engine
for (size_t i = 0; i < subscriptionInfo.mAttributePaths.AllocatedSize(); i++)
{
AttributePathParams attributePathParams = subscriptionInfo.mAttributePaths[i].GetParams();
CHIP_ERROR err =
InteractionModelEngine::GetInstance()->PushFrontAttributePathList(mpAttributePathList, attributePathParams);
if (err != CHIP_NO_ERROR)
{
Close();
return;
}
}
for (size_t i = 0; i < subscriptionInfo.mEventPaths.AllocatedSize(); i++)
{
EventPathParams eventPathParams = subscriptionInfo.mEventPaths[i].GetParams();
CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontEventPathParamsList(mpEventPathList, eventPathParams);
if (err != CHIP_NO_ERROR)
{
Close();
return;
}
}

// Ask IM engine to start CASE session with subscriber
ScopedNodeId peerNode = ScopedNodeId(subscriptionInfo.mNodeId, subscriptionInfo.mFabricIndex);
caseSessionManager.FindOrEstablishSession(peerNode, &mOnConnectedCallback, &mOnConnectionFailureCallback);
}

#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS

ReadHandler::~ReadHandler()
{
auto * appCallback = mManagementCallback.GetAppCallback();
Expand All @@ -87,8 +139,18 @@ ReadHandler::~ReadHandler()
InteractionModelEngine::GetInstance()->ReleaseDataVersionFilterList(mpDataVersionFilterList);
}

void ReadHandler::Close()
void ReadHandler::Close(CloseOptions options)
{
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
if (options == CloseOptions::kDropPersistedSubscription)
{
auto * subscriptionResumptionStorage = InteractionModelEngine::GetInstance()->GetSubscriptionResumptionStorage();
if (subscriptionResumptionStorage)
{
subscriptionResumptionStorage->Delete(GetInitiatorNodeId(), GetAccessingFabricIndex(), mSubscriptionId);
}
}
#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
MoveToState(HandlerState::AwaitingDestruction);
mManagementCallback.OnDone(*this);
}
Expand Down Expand Up @@ -306,7 +368,12 @@ void ReadHandler::OnResponseTimeout(Messaging::ExchangeContext * apExchangeConte
{
ChipLogError(DataManagement, "Time out! failed to receive status response from Exchange: " ChipLogFormatExchange,
ChipLogValueExchange(apExchangeContext));
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
// TODO: Have a retry mechanism tied to wake interval for IC devices
Close(CloseOptions::kKeepPersistedSubscription);
#else
Close();
#endif
}

CHIP_ERROR ReadHandler::ProcessReadRequest(System::PacketBufferHandle && aPayload)
Expand Down Expand Up @@ -652,9 +719,34 @@ CHIP_ERROR ReadHandler::ProcessSubscribeRequest(System::PacketBufferHandle && aP

mExchangeCtx->WillSendMessage();

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
PersistSubscription();
#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS

return CHIP_NO_ERROR;
}

void ReadHandler::PersistSubscription()
{
auto * subscriptionResumptionStorage = InteractionModelEngine::GetInstance()->GetSubscriptionResumptionStorage();
VerifyOrReturn(subscriptionResumptionStorage != nullptr);

SubscriptionResumptionStorage::SubscriptionInfo subscriptionInfo = { .mNodeId = GetInitiatorNodeId(),
.mFabricIndex = GetAccessingFabricIndex(),
.mSubscriptionId = mSubscriptionId,
.mMinInterval = mMinIntervalFloorSeconds,
.mMaxInterval = mMaxInterval,
.mFabricFiltered = IsFabricFiltered() };
VerifyOrReturn(subscriptionInfo.SetAttributePaths(mpAttributePathList) == CHIP_NO_ERROR);
VerifyOrReturn(subscriptionInfo.SetEventPaths(mpEventPathList) == CHIP_NO_ERROR);

CHIP_ERROR err = subscriptionResumptionStorage->Save(subscriptionInfo);
if (err != CHIP_NO_ERROR)
{
ChipLogError(DataManagement, "Failed to save subscription info error: '%" CHIP_ERROR_FORMAT, err.Format());
}
}

void ReadHandler::OnUnblockHoldReportCallback(System::Layer * apSystemLayer, void * apAppState)
{
VerifyOrReturn(apAppState != nullptr);
Expand Down Expand Up @@ -767,5 +859,33 @@ void ReadHandler::ClearStateFlag(ReadHandlerFlags aFlag)
SetStateFlag(aFlag, false);
}

void ReadHandler::HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr,
const SessionHandle & sessionHandle)
{
ReadHandler * const _this = static_cast<ReadHandler *>(context);

_this->mSessionHandle.Grab(sessionHandle);

_this->MoveToState(HandlerState::GeneratingReports);

ObjectList<AttributePathParams> * attributePath = _this->mpAttributePathList;
while (attributePath)
{
InteractionModelEngine::GetInstance()->GetReportingEngine().SetDirty(attributePath->mValue);
attributePath = attributePath->mpNext;
}
}

void ReadHandler::HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR err)
{
ReadHandler * const _this = static_cast<ReadHandler *>(context);
VerifyOrDie(_this != nullptr);

// TODO: Have a retry mechanism tied to wake interval for IC devices
ChipLogError(DataManagement, "Failed to establish CASE for subscription-resumption with error '%" CHIP_ERROR_FORMAT "'",
err.Format());
_this->Close();
}

} // namespace app
} // namespace chip
50 changes: 49 additions & 1 deletion src/app/ReadHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <app/AttributeAccessInterface.h>
#include <app/AttributePathExpandIterator.h>
#include <app/AttributePathParams.h>
#include <app/CASESessionManager.h>
#include <app/DataVersionFilter.h>
#include <app/EventManagement.h>
#include <app/EventPathParams.h>
Expand All @@ -36,6 +37,9 @@
#include <app/MessageDef/EventFilterIBs.h>
#include <app/MessageDef/EventPathIBs.h>
#include <app/ObjectList.h>
#include <app/OperationalSessionSetup.h>
#include <app/SubscriptionResumptionStorage.h>
#include <lib/core/CHIPCallback.h>
#include <lib/core/CHIPCore.h>
#include <lib/core/TLVDebug.h>
#include <lib/support/CodeUtils.h>
Expand Down Expand Up @@ -165,6 +169,17 @@ class ReadHandler : public Messaging::ExchangeDelegate
*/
ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeContext * apExchangeContext, InteractionType aInteractionType);

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
/**
*
* Constructor in preparation for resuming a persisted subscription
*
* The callback passed in has to outlive this handler object.
*
*/
ReadHandler(ManagementCallback & apCallback);
#endif

const ObjectList<AttributePathParams> * GetAttributePathList() const { return mpAttributePathList; }
const ObjectList<EventPathParams> * GetEventPathList() const { return mpEventPathList; }
const ObjectList<DataVersionFilter> * GetDataVersionFilterList() const { return mpDataVersionFilterList; }
Expand Down Expand Up @@ -243,6 +258,18 @@ class ReadHandler : public Messaging::ExchangeDelegate
*/
void OnInitialRequest(System::PacketBufferHandle && aPayload);

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
/**
*
* @brief Resume a persisted subscription
*
* Used after ReadHandler(ManagementCallback & apCallback). This will start a CASE session
* with the subscriber if one doesn't already exist, and send full priming report when connected.
*/
void ResumeSubscription(CASESessionManager & caseSessionManager,
SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo);
#endif

/**
* Send ReportData to initiator
*
Expand Down Expand Up @@ -355,11 +382,19 @@ class ReadHandler : public Messaging::ExchangeDelegate
AwaitingDestruction, ///< The object has completed its work and is awaiting destruction by the application.
};

enum class CloseOptions
{
kDropPersistedSubscription,
kKeepPersistedSubscription
};
/**
* Called internally to signal the completion of all work on this objecta and signal to a registered callback that it's
* safe to release this object.
*
* @param options This specifies whether to drop or keep the subscription
*
*/
void Close();
void Close(CloseOptions options = CloseOptions::kDropPersistedSubscription);

static void OnUnblockHoldReportCallback(System::Layer * apSystemLayer, void * apAppState);
static void OnRefreshSubscribeTimerSyncCallback(System::Layer * apSystemLayer, void * apAppState);
Expand All @@ -379,10 +414,17 @@ class ReadHandler : public Messaging::ExchangeDelegate

const char * GetStateStr() const;

void PersistSubscription();

// Helpers for managing our state flags properly.
void SetStateFlag(ReadHandlerFlags aFlag, bool aValue = true);
void ClearStateFlag(ReadHandlerFlags aFlag);

// Helpers for continuing the subscription resumption
static void HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr,
const SessionHandle & sessionHandle);
static void HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR error);

AttributePathExpandIterator mAttributePathExpandIterator = AttributePathExpandIterator(nullptr);

// The current generation of the reporting engine dirty set the last time we were notified that a path we're interested in was
Expand Down Expand Up @@ -461,6 +503,12 @@ class ReadHandler : public Messaging::ExchangeDelegate
PriorityLevel mCurrentPriority = PriorityLevel::Invalid;
BitFlags<ReadHandlerFlags> mFlags;
InteractionType mInteractionType = InteractionType::Read;

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
// Callbacks to handle server-initiated session success/failure
chip::Callback::Callback<OnDeviceConnected> mOnConnectedCallback;
chip::Callback::Callback<OnDeviceConnectionFailure> mOnConnectionFailureCallback;
#endif
};
} // namespace app
} // namespace chip
Loading

0 comments on commit 1439156

Please sign in to comment.