Skip to content

Commit

Permalink
add inital keepSubscription feature (#10339)
Browse files Browse the repository at this point in the history
  • Loading branch information
yunhanw-google authored and pull[bot] committed Nov 10, 2021
1 parent c7811a4 commit f5ff9b1
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 36 deletions.
5 changes: 5 additions & 0 deletions src/app/InteractionModelDelegate.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ class InteractionModelDelegate
*/
virtual CHIP_ERROR SubscriptionEstablished(const ReadHandler * apReadHandler) { return CHIP_ERROR_NOT_IMPLEMENTED; }

/**
* Notification that Subscription has been terminated in handler side.
*/
virtual CHIP_ERROR SubscriptionTerminated(const ReadHandler * apReadHandler) { return CHIP_ERROR_NOT_IMPLEMENTED; }

/**
* Notification that a read interaction was completed on the client successfully.
* @param[in] apReadClient A current read client which can identify the read client to the consumer, particularly
Expand Down
20 changes: 20 additions & 0 deletions src/app/InteractionModelEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,26 @@ CHIP_ERROR InteractionModelEngine::OnReadInitialRequest(Messaging::ExchangeConte
ChipLogDetail(InteractionModel, "Receive %s request",
aInteractionType == ReadHandler::InteractionType::Subscribe ? "Subscribe" : "Read");

for (auto & readHandler : mReadHandlers)
{
if (!readHandler.IsFree() && readHandler.IsSubscriptionType() &&
readHandler.GetInitiatorNodeId() == apExchangeContext->GetSecureSession().GetPeerNodeId() &&
readHandler.GetFabricIndex() == apExchangeContext->GetSecureSession().GetFabricIndex())
{
bool keepSubscriptions = true;
System::PacketBufferTLVReader reader;
reader.Init(aPayload.Retain());
SuccessOrExit(err = reader.Next());
SubscribeRequest::Parser subscribeRequestParser;
SuccessOrExit(err = subscribeRequestParser.Init(reader));
err = subscribeRequestParser.GetKeepSubscriptions(&keepSubscriptions);
if (err == CHIP_NO_ERROR && !keepSubscriptions)
{
readHandler.Shutdown(ReadHandler::ShutdownOptions::AbortCurrentExchange);
}
}
}

for (auto & readHandler : mReadHandlers)
{
if (readHandler.IsFree())
Expand Down
20 changes: 10 additions & 10 deletions src/app/MessageDef/SubscribeRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,15 @@ CHIP_ERROR SubscribeRequest::Parser::CheckSchemaValidity() const
}
#endif // CHIP_DETAIL_LOGGING
break;
case kCsTag_KeepExistingSubscriptions:
VerifyOrReturnLogError(!(TagPresenceMask & (1 << kCsTag_KeepExistingSubscriptions)), CHIP_ERROR_INVALID_TLV_TAG);
TagPresenceMask |= (1 << kCsTag_KeepExistingSubscriptions);
case kCsTag_KeepSubscriptions:
VerifyOrReturnLogError(!(TagPresenceMask & (1 << kCsTag_KeepSubscriptions)), CHIP_ERROR_INVALID_TLV_TAG);
TagPresenceMask |= (1 << kCsTag_KeepSubscriptions);
VerifyOrReturnLogError(chip::TLV::kTLVType_Boolean == reader.GetType(), CHIP_ERROR_WRONG_TLV_TYPE);
#if CHIP_DETAIL_LOGGING
{
bool keepExistingSubscriptions;
ReturnLogErrorOnFailure(reader.Get(keepExistingSubscriptions));
PRETTY_PRINT("\tKeepExistingSubscriptions = %s, ", keepExistingSubscriptions ? "true" : "false");
bool keepSubscriptions;
ReturnLogErrorOnFailure(reader.Get(keepSubscriptions));
PRETTY_PRINT("\tKeepSubscriptions = %s, ", keepSubscriptions ? "true" : "false");
}
#endif // CHIP_DETAIL_LOGGING
break;
Expand Down Expand Up @@ -205,9 +205,9 @@ CHIP_ERROR SubscribeRequest::Parser::GetMaxIntervalSeconds(uint16_t * const apMa
return GetUnsignedInteger(kCsTag_MaxIntervalSeconds, apMaxIntervalSeconds);
}

CHIP_ERROR SubscribeRequest::Parser::GetKeepExistingSubscriptions(bool * const apKeepExistingSubscription) const
CHIP_ERROR SubscribeRequest::Parser::GetKeepSubscriptions(bool * const apKeepExistingSubscription) const
{
return GetSimpleValue(kCsTag_KeepExistingSubscriptions, chip::TLV::kTLVType_Boolean, apKeepExistingSubscription);
return GetSimpleValue(kCsTag_KeepSubscriptions, chip::TLV::kTLVType_Boolean, apKeepExistingSubscription);
}

CHIP_ERROR SubscribeRequest::Parser::GetIsProxy(bool * const apIsProxy) const
Expand Down Expand Up @@ -277,11 +277,11 @@ SubscribeRequest::Builder & SubscribeRequest::Builder::MaxIntervalSeconds(const
return *this;
}

SubscribeRequest::Builder & SubscribeRequest::Builder::KeepExistingSubscriptions(const bool aKeepExistingSubscriptions)
SubscribeRequest::Builder & SubscribeRequest::Builder::KeepSubscriptions(const bool aKeepSubscriptions)
{
if (mError == CHIP_NO_ERROR)
{
mError = mpWriter->PutBoolean(chip::TLV::ContextTag(kCsTag_KeepExistingSubscriptions), aKeepExistingSubscriptions);
mError = mpWriter->PutBoolean(chip::TLV::ContextTag(kCsTag_KeepSubscriptions), aKeepSubscriptions);
}
return *this;
}
Expand Down
20 changes: 10 additions & 10 deletions src/app/MessageDef/SubscribeRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ namespace app {
namespace SubscribeRequest {
enum
{
kCsTag_AttributePathList = 0,
kCsTag_EventPathList = 1,
kCsTag_AttributeDataVersionList = 2,
kCsTag_EventNumber = 3,
kCsTag_MinIntervalSeconds = 4,
kCsTag_MaxIntervalSeconds = 5,
kCsTag_KeepExistingSubscriptions = 6,
kCsTag_IsProxy = 7,
kCsTag_AttributePathList = 0,
kCsTag_EventPathList = 1,
kCsTag_AttributeDataVersionList = 2,
kCsTag_EventNumber = 3,
kCsTag_MinIntervalSeconds = 4,
kCsTag_MaxIntervalSeconds = 5,
kCsTag_KeepSubscriptions = 6,
kCsTag_IsProxy = 7,
};

class Parser : public chip::app::Parser
Expand Down Expand Up @@ -118,7 +118,7 @@ class Parser : public chip::app::Parser
* @return #CHIP_NO_ERROR on success
* #CHIP_END_OF_TLV if there is no such element
*/
CHIP_ERROR GetKeepExistingSubscriptions(bool * const apKeepExistingSubscription) const;
CHIP_ERROR GetKeepSubscriptions(bool * const apKeepExistingSubscription) const;

/**
* @brief Check if subscription is kept. Next() must be called before accessing them.
Expand Down Expand Up @@ -159,7 +159,7 @@ class Builder : public chip::app::Builder
* @brief This is set to 'true' by the subscriber to indicate preservation of previous subscriptions. If omitted, it implies
* 'false' as a value.
*/
SubscribeRequest::Builder & KeepExistingSubscriptions(const bool aKeepExistingSubscriptions);
SubscribeRequest::Builder & KeepSubscriptions(const bool aKeepSubscriptions);

/**
* @brief This is set to true by the subscriber if it is a proxy-type device proxying for another client. This
Expand Down
1 change: 1 addition & 0 deletions src/app/ReadClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ CHIP_ERROR ReadClient::SendSubscribeRequest(ReadPrepareParams & aReadPreparePara

request.MinIntervalSeconds(aReadPrepareParams.mMinIntervalFloorSeconds)
.MaxIntervalSeconds(aReadPrepareParams.mMaxIntervalCeilingSeconds)
.KeepSubscriptions(aReadPrepareParams.mKeepSubscriptions)
.EndOfSubscribeRequest();
SuccessOrExit(err = request.GetError());

Expand Down
30 changes: 21 additions & 9 deletions src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,15 @@ CHIP_ERROR ReadHandler::Init(Messaging::ExchangeManager * apExchangeMgr, Interac
mCurrentPriority = PriorityLevel::Invalid;
mInitialReport = true;
MoveToState(HandlerState::Initialized);
mpDelegate = apDelegate;
mSubscriptionId = 0;
mHoldReport = false;
mDirty = false;
mInteractionType = aInteractionType;
mpDelegate = apDelegate;
mSubscriptionId = 0;
mHoldReport = false;
mDirty = false;
mActiveSubscription = false;
mInteractionType = aInteractionType;
mInitiatorNodeId = apExchangeContext->GetSecureSession().GetPeerNodeId();
mFabricIndex = apExchangeContext->GetSecureSession().GetFabricIndex();

if (apExchangeContext != nullptr)
{
apExchangeContext->SetDelegate(this);
Expand All @@ -67,7 +71,12 @@ void ReadHandler::Shutdown(ShutdownOptions aOptions)
{
InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer(
OnRefreshSubscribeTimerSyncCallback, this);
if (mpDelegate != nullptr)
{
mpDelegate->SubscriptionTerminated(this);
}
}

if (aOptions == ShutdownOptions::AbortCurrentExchange)
{
if (mpExchangeCtx != nullptr)
Expand Down Expand Up @@ -96,6 +105,8 @@ void ReadHandler::Shutdown(ShutdownOptions aOptions)
mpDelegate = nullptr;
mHoldReport = false;
mDirty = false;
mActiveSubscription = false;
mInitiatorNodeId = kUndefinedNodeId;
}

CHIP_ERROR ReadHandler::OnReadInitialRequest(System::PacketBufferHandle && aPayload)
Expand Down Expand Up @@ -149,12 +160,15 @@ CHIP_ERROR ReadHandler::OnStatusResponse(Messaging::ExchangeContext * apExchange
InteractionModelEngine::GetInstance()->GetReportingEngine().OnReportConfirm();
if (IsInitialReport())
{
err = SendSubscribeResponse();
err = SendSubscribeResponse();
mpExchangeCtx = nullptr;
SuccessOrExit(err);
mActiveSubscription = true;
}
else
{
MoveToState(HandlerState::GeneratingReports);
mpExchangeCtx = nullptr;
}
}
else
Expand All @@ -180,14 +194,13 @@ CHIP_ERROR ReadHandler::OnStatusResponse(Messaging::ExchangeContext * apExchange
CHIP_ERROR ReadHandler::SendReportData(System::PacketBufferHandle && aPayload)
{
VerifyOrReturnLogError(IsReportable(), CHIP_ERROR_INCORRECT_STATE);
VerifyOrReturnLogError(mpExchangeCtx != nullptr, CHIP_ERROR_INCORRECT_STATE);
if (IsInitialReport())
{
VerifyOrReturnLogError(mpExchangeCtx != nullptr, CHIP_ERROR_INCORRECT_STATE);
mSessionHandle.SetValue(mpExchangeCtx->GetSecureSession());
}
else
{
VerifyOrReturnLogError(mpExchangeCtx == nullptr, CHIP_ERROR_INCORRECT_STATE);
mpExchangeCtx = mpExchangeMgr->NewContext(mSessionHandle.Value(), this);
mpExchangeCtx->SetResponseTimeout(kImMessageTimeoutMsec);
}
Expand Down Expand Up @@ -543,7 +556,6 @@ CHIP_ERROR ReadHandler::ProcessSubscribeRequest(System::PacketBufferHandle && aP

ReturnLogErrorOnFailure(subscribeRequestParser.GetMinIntervalSeconds(&mMinIntervalFloorSeconds));
ReturnLogErrorOnFailure(subscribeRequestParser.GetMaxIntervalSeconds(&mMaxIntervalCeilingSeconds));

ReturnLogErrorOnFailure(Crypto::DRBG_get_bytes(reinterpret_cast<uint8_t *>(&mSubscriptionId), sizeof(mSubscriptionId)));

MoveToState(HandlerState::GeneratingReports);
Expand Down
10 changes: 8 additions & 2 deletions src/app/ReadHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,14 @@ class ReadHandler : public Messaging::ExchangeDelegate
bool IsReadType() { return mInteractionType == InteractionType::Read; }
bool IsSubscriptionType() { return mInteractionType == InteractionType::Subscribe; }
bool IsInitialReport() { return mInitialReport; }
bool IsActiveSubscription() const { return mActiveSubscription; }
CHIP_ERROR OnSubscribeRequest(Messaging::ExchangeContext * apExchangeContext, System::PacketBufferHandle && aPayload);
void GetSubscriptionId(uint64_t & aSubscriptionId) { aSubscriptionId = mSubscriptionId; }
void SetDirty() { mDirty = true; }
void ClearDirty() { mDirty = false; }
bool IsDirty() { return mDirty; }
NodeId GetInitiatorNodeId() const { return mInitiatorNodeId; }
FabricIndex GetFabricIndex() const { return mFabricIndex; }

private:
friend class TestReadInteraction;
Expand Down Expand Up @@ -186,8 +189,11 @@ class ReadHandler : public Messaging::ExchangeDelegate
uint16_t mMinIntervalFloorSeconds = 0;
uint16_t mMaxIntervalCeilingSeconds = 0;
Optional<SessionHandle> mSessionHandle;
bool mHoldReport = false;
bool mDirty = false;
bool mHoldReport = false;
bool mDirty = false;
bool mActiveSubscription = false;
NodeId mInitiatorNodeId = kUndefinedNodeId;
FabricIndex mFabricIndex = 0;
};
} // namespace app
} // namespace chip
3 changes: 3 additions & 0 deletions src/app/ReadPrepareParams.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ struct ReadPrepareParams
uint32_t mTimeout = kImMessageTimeoutMsec;
uint16_t mMinIntervalFloorSeconds = 0;
uint16_t mMaxIntervalCeilingSeconds = 0;
bool mKeepSubscriptions = true;

ReadPrepareParams(SessionHandle sessionHandle) : mSessionHandle(sessionHandle) {}
ReadPrepareParams(ReadPrepareParams && other) : mSessionHandle(other.mSessionHandle)
{
mKeepSubscriptions = other.mKeepSubscriptions;
mpEventPathParamsList = other.mpEventPathParamsList;
mEventPathParamsListSize = other.mEventPathParamsListSize;
mpAttributePathParamsList = other.mpAttributePathParamsList;
Expand All @@ -61,6 +63,7 @@ struct ReadPrepareParams
if (&other == this)
return *this;

mKeepSubscriptions = other.mKeepSubscriptions;
mSessionHandle = other.mSessionHandle;
mpEventPathParamsList = other.mpEventPathParamsList;
mEventPathParamsListSize = other.mEventPathParamsListSize;
Expand Down
4 changes: 2 additions & 2 deletions src/app/tests/TestMessageDef.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,7 @@ void BuildSubscribeRequest(nlTestSuite * apSuite, chip::TLV::TLVWriter & aWriter
subscribeRequestBuilder.MaxIntervalSeconds(3);
NL_TEST_ASSERT(apSuite, subscribeRequestBuilder.GetError() == CHIP_NO_ERROR);

subscribeRequestBuilder.KeepExistingSubscriptions(true);
subscribeRequestBuilder.KeepSubscriptions(true);
NL_TEST_ASSERT(apSuite, subscribeRequestBuilder.GetError() == CHIP_NO_ERROR);

subscribeRequestBuilder.IsProxy(true);
Expand Down Expand Up @@ -962,7 +962,7 @@ void ParseSubscribeRequest(nlTestSuite * apSuite, chip::TLV::TLVReader & aReader
err = subscribeRequestParser.GetMaxIntervalSeconds(&maxIntervalSeconds);
NL_TEST_ASSERT(apSuite, maxIntervalSeconds == 3 && err == CHIP_NO_ERROR);

err = subscribeRequestParser.GetKeepExistingSubscriptions(&keepExistingSubscription);
err = subscribeRequestParser.GetKeepSubscriptions(&keepExistingSubscription);
NL_TEST_ASSERT(apSuite, keepExistingSubscription && err == CHIP_NO_ERROR);

err = subscribeRequestParser.GetIsProxy(&isProxy);
Expand Down
23 changes: 20 additions & 3 deletions src/app/tests/TestReadInteraction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,24 @@ class MockInteractionModelApp : public chip::app::InteractionModelDelegate
CHIP_ERROR SubscriptionEstablished(const chip::app::ReadHandler * apReadHandler) override
{
mpReadHandler = const_cast<chip::app::ReadHandler *>(apReadHandler);
return CHIP_ERROR_NOT_IMPLEMENTED;
mNumSubscriptions++;
return CHIP_NO_ERROR;
}

CHIP_ERROR SubscriptionTerminated(const chip::app::ReadHandler * apReadHandler) override
{
if (apReadHandler->IsActiveSubscription())
{
mNumSubscriptions--;
}
return CHIP_NO_ERROR;
}

bool mGotEventResponse = false;
int mNumAttributeResponse = 0;
bool mGotReport = false;
bool mReadError = false;
uint32_t mNumSubscriptions = 0;
chip::app::ReadHandler * mpReadHandler = nullptr;
};
} // namespace
Expand Down Expand Up @@ -815,7 +826,7 @@ void TestReadInteraction::TestProcessSubscribeRequest(nlTestSuite * apSuite, voi
subscribeRequestBuilder.MaxIntervalSeconds(3);
NL_TEST_ASSERT(apSuite, subscribeRequestBuilder.GetError() == CHIP_NO_ERROR);

subscribeRequestBuilder.KeepExistingSubscriptions(true);
subscribeRequestBuilder.KeepSubscriptions(true);
NL_TEST_ASSERT(apSuite, subscribeRequestBuilder.GetError() == CHIP_NO_ERROR);

subscribeRequestBuilder.IsProxy(true);
Expand Down Expand Up @@ -890,10 +901,16 @@ void TestReadInteraction::TestSubscribeRoundtrip(nlTestSuite * apSuite, void * a

err = engine->SendSubscribeRequest(readPrepareParams);
NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);

delegate.mNumAttributeResponse = 0;
readPrepareParams.mKeepSubscriptions = false;
err = engine->SendSubscribeRequest(readPrepareParams);
NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);
delegate.mGotReport = false;
engine->GetReportingEngine().Run();
NL_TEST_ASSERT(apSuite, delegate.mGotReport);
NL_TEST_ASSERT(apSuite, delegate.mNumAttributeResponse == 2);
NL_TEST_ASSERT(apSuite, delegate.mNumSubscriptions == 1);

chip::app::ClusterInfo dirtyPath1;
dirtyPath1.mClusterId = kTestClusterId;
Expand Down Expand Up @@ -1011,7 +1028,7 @@ void TestReadInteraction::TestSubscribeRoundtrip(nlTestSuite * apSuite, void * a
engine->GetReportingEngine().Run();
NL_TEST_ASSERT(apSuite, delegate.mGotReport);
NL_TEST_ASSERT(apSuite, delegate.mNumAttributeResponse == 1);

NL_TEST_ASSERT(apSuite, delegate.mNumSubscriptions == 2);
// Test report with 1 path modification for 2 subscription
delegate.mpReadHandler->mHoldReport = false;
delegate.mGotReport = false;
Expand Down
1 change: 1 addition & 0 deletions src/controller/CHIPDevice.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,7 @@ CHIP_ERROR Device::SendSubscribeAttributeRequest(app::AttributePathParams aPath,
params.mAttributePathParamsListSize = 1;
params.mMinIntervalFloorSeconds = mMinIntervalFloorSeconds;
params.mMaxIntervalCeilingSeconds = mMaxIntervalCeilingSeconds;
params.mKeepSubscriptions = false;

CHIP_ERROR err =
chip::app::InteractionModelEngine::GetInstance()->SendSubscribeRequest(params, seqNum /* application context */);
Expand Down
4 changes: 4 additions & 0 deletions src/controller/python/test/test_scripts/mobile-device-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ def main():
FailIfNot(test.TestSubscription(nodeid=1, endpoint=LIGHTING_ENDPOINT_ID),
"Failed to subscribe attributes.")

logger.info("Testing another subscription that kills previous subscriptions")
FailIfNot(test.TestSubscription(nodeid=1, endpoint=LIGHTING_ENDPOINT_ID),
"Failed to subscribe attributes.")

logger.info("Testing closing sessions")
FailIfNot(test.TestCloseSession(nodeid=1), "Failed to close sessions")

Expand Down

0 comments on commit f5ff9b1

Please sign in to comment.