Skip to content

Commit

Permalink
Fix KeepSubscription flag handling (#22805)
Browse files Browse the repository at this point in the history
* Fix KeepSubscription flag handling

This fixes the KeepSubscription flag handling to be done right at the
onset of processing a SubscribeRequest message. This ensures that
matching existing subscriptions are evicted before we attempt to
continue further processing and allocation of ReadHandlers.

Testing:

In addition to the new test in TestRead, validated setting up a sub in
chip-tool and then cancelling it by sending another sub to an invalid
endpoint:

basic subscribe location  0 5 2 0
any subscribe-by-id 0xFFFFFFFF 0xFFFFFFFF 0 5 2 100

Also tested by sending an empty subscribe request with the REPL.

* Fixup printf specifier
  • Loading branch information
mrjerryjohns authored and pull[bot] committed Oct 7, 2022
1 parent af89c48 commit 4355297
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 29 deletions.
49 changes: 27 additions & 22 deletions src/app/InteractionModelEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,29 @@ Protocols::InteractionModel::Status InteractionModelEngine::OnReadInitialRequest

SubscribeRequestMessage::Parser subscribeRequestParser;
VerifyOrReturnError(subscribeRequestParser.Init(reader) == CHIP_NO_ERROR, Status::InvalidAction);

VerifyOrReturnError(subscribeRequestParser.GetKeepSubscriptions(&keepExistingSubscriptions) == CHIP_NO_ERROR,
Status::InvalidAction);
if (!keepExistingSubscriptions)
{
//
// Walk through all existing subscriptions and shut down those whose subscriber matches
// that which just came in.
//
mReadHandlers.ForEachActiveObject([this, apExchangeContext](ReadHandler * handler) {
if (handler->IsFromSubscriber(*apExchangeContext))
{
ChipLogProgress(InteractionModel,
"Deleting previous subscription from NodeId: " ChipLogFormatX64 ", FabricIndex: %u",
ChipLogValueX64(apExchangeContext->GetSessionHandle()->AsSecureSession()->GetPeerNodeId()),
apExchangeContext->GetSessionHandle()->GetFabricIndex());
mReadHandlers.ReleaseObject(handler);
}

return Loop::Continue;
});
}

{
size_t requestedAttributePathCount = 0;
size_t requestedEventPathCount = 0;
Expand Down Expand Up @@ -492,28 +515,6 @@ Protocols::InteractionModel::Status InteractionModelEngine::OnReadInitialRequest
return Status::PathsExhausted;
}
}

VerifyOrReturnError(subscribeRequestParser.GetKeepSubscriptions(&keepExistingSubscriptions) == CHIP_NO_ERROR,
Status::InvalidAction);
if (!keepExistingSubscriptions)
{
//
// Walk through all existing subscriptions and shut down those whose subscriber matches
// that which just came in.
//
mReadHandlers.ForEachActiveObject([this, apExchangeContext](ReadHandler * handler) {
if (handler->IsFromSubscriber(*apExchangeContext))
{
ChipLogProgress(InteractionModel,
"Deleting previous subscription from NodeId: " ChipLogFormatX64 ", FabricIndex: %u",
ChipLogValueX64(apExchangeContext->GetSessionHandle()->AsSecureSession()->GetPeerNodeId()),
apExchangeContext->GetSessionHandle()->GetFabricIndex());
mReadHandlers.ReleaseObject(handler);
}

return Loop::Continue;
});
}
}
else
{
Expand Down Expand Up @@ -790,6 +791,10 @@ bool InteractionModelEngine::TrimFabricForSubscriptions(FabricIndex aFabricIndex
eventPathsSubscribedByCurrentFabric > perFabricPathCapacity ||
subscriptionsEstablishedByCurrentFabric > perFabricSubscriptionCapacity))
{
SubscriptionId subId;
candidate->GetSubscriptionId(subId);
ChipLogProgress(DataManagement, "Evicting Subscription ID %u:0x%" PRIx32, candidate->GetSubjectDescriptor().fabricIndex,
subId);
candidate->Close();
return true;
}
Expand Down
3 changes: 0 additions & 3 deletions src/app/ReadClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -952,9 +952,6 @@ CHIP_ERROR ReadClient::SendSubscribeRequestImpl(const ReadPrepareParams & aReadP
Span<DataVersionFilter> dataVersionFilters(aReadPrepareParams.mpDataVersionFilterList,
aReadPrepareParams.mDataVersionFilterListSize);

VerifyOrReturnError(aReadPrepareParams.mAttributePathParamsListSize != 0 || aReadPrepareParams.mEventPathParamsListSize != 0,
CHIP_ERROR_INVALID_ARGUMENT);

System::PacketBufferHandle msgBuf;
System::PacketBufferTLVWriter writer;
SubscribeRequestMessage::Builder request;
Expand Down
5 changes: 1 addition & 4 deletions src/controller/python/chip/clusters/Attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -955,10 +955,7 @@ def Read(future: Future, eventLoop, device, devCtrl, attributes: List[AttributeP
if (not attributes) and dataVersionFilters:
raise ValueError(
"Must provide valid attribute list when data version filters is not null")
if (not attributes) and (not events):
raise ValueError(
"Must read some something"
)

handle = chip.native.GetLibraryHandle()
transaction = AsyncReadTransaction(
future, eventLoop, devCtrl, returnClusterObject)
Expand Down
48 changes: 48 additions & 0 deletions src/controller/tests/data_model/TestRead.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ class TestReadInteraction : public app::ReadHandler::ApplicationCallback
static void TestReadAttribute_ManyDataValuesWrongPath(nlTestSuite * apSuite, void * apContext);
static void TestReadAttribute_ManyErrors(nlTestSuite * apSuite, void * apContext);
static void TestSubscribeAttributeDeniedNotExistPath(nlTestSuite * apSuite, void * apContext);
static void TestReadHandler_KeepSubscriptionTest(nlTestSuite * apSuite, void * apContext);

private:
static uint16_t mMaxInterval;
Expand Down Expand Up @@ -1675,6 +1676,7 @@ void TestReadInteraction::TestSubscribeAttributeTimeout(nlTestSuite * apSuite, v

app::InteractionModelEngine::GetInstance()->ShutdownActiveReads();
NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);
ctx.GetLoopback().mNumMessagesToDrop = 0;
}

void TestReadInteraction::TestReadHandler_MultipleSubscriptions(nlTestSuite * apSuite, void * apContext)
Expand Down Expand Up @@ -4451,6 +4453,51 @@ void TestReadInteraction::TestReadAttribute_ManyErrors(nlTestSuite * apSuite, vo
NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);
}

//
// This validates the KeepSubscriptions flag by first setting up a valid subscription, then sending
// a subsequent SubcribeRequest with empty attribute AND event paths with KeepSubscriptions = false.
//
// This should evict the previous subscription before sending back an error.
//
void TestReadInteraction::TestReadHandler_KeepSubscriptionTest(nlTestSuite * apSuite, void * apContext)
{
using namespace SubscriptionPathQuotaHelpers;

TestContext & ctx = *static_cast<TestContext *>(apContext);
TestReadCallback readCallback;
app::AttributePathParams pathParams(kTestEndpointId, TestCluster::Id, TestCluster::Attributes::Int16u::Id);

app::ReadPrepareParams readParam(ctx.GetSessionAliceToBob());
readParam.mpAttributePathParamsList = &pathParams;
readParam.mAttributePathParamsListSize = 1;
readParam.mMaxIntervalCeilingSeconds = 1;
readParam.mKeepSubscriptions = false;

std::unique_ptr<app::ReadClient> readClient = std::make_unique<app::ReadClient>(
app::InteractionModelEngine::GetInstance(), app::InteractionModelEngine::GetInstance()->GetExchangeManager(), readCallback,
app::ReadClient::InteractionType::Subscribe);
NL_TEST_ASSERT(apSuite, readClient->SendRequest(readParam) == CHIP_NO_ERROR);

ctx.DrainAndServiceIO();

NL_TEST_ASSERT(apSuite, app::InteractionModelEngine::GetInstance()->GetNumActiveReadHandlers() == 1);

ChipLogProgress(DataManagement, "Issue another subscription that will evict the first sub...");

readParam.mAttributePathParamsListSize = 0;
readClient = std::make_unique<app::ReadClient>(app::InteractionModelEngine::GetInstance(),
app::InteractionModelEngine::GetInstance()->GetExchangeManager(), readCallback,
app::ReadClient::InteractionType::Subscribe);
NL_TEST_ASSERT(apSuite, readClient->SendRequest(readParam) == CHIP_NO_ERROR);

ctx.DrainAndServiceIO();

NL_TEST_ASSERT(apSuite, app::InteractionModelEngine::GetInstance()->GetNumActiveReadHandlers() == 0);
NL_TEST_ASSERT(apSuite, readCallback.mOnError != 0);
app::InteractionModelEngine::GetInstance()->ShutdownActiveReads();
ctx.DrainAndServiceIO();
}

// clang-format off
const nlTest sTests[] =
{
Expand Down Expand Up @@ -4488,6 +4535,7 @@ const nlTest sTests[] =
NL_TEST_DEF("TestSubscribeAttributeDeniedNotExistPath", TestReadInteraction::TestSubscribeAttributeDeniedNotExistPath),
NL_TEST_DEF("TestResubscribeAttributeTimeout", TestReadInteraction::TestResubscribeAttributeTimeout),
NL_TEST_DEF("TestSubscribeAttributeTimeout", TestReadInteraction::TestSubscribeAttributeTimeout),
NL_TEST_DEF("TestReadHandler_KeepSubscriptionTest", TestReadInteraction::TestReadHandler_KeepSubscriptionTest),
NL_TEST_SENTINEL()
};
// clang-format on
Expand Down

0 comments on commit 4355297

Please sign in to comment.