Skip to content

Commit

Permalink
Better fix for crashes around MTRBaseSubscriptionCallback. (project-c…
Browse files Browse the repository at this point in the history
…hip#23076)

project-chip#22978 accidentally
reintroduced the crash that
project-chip#22324 had fixed.  To avoid
more issues along these lines:

1) Add unit tests that reproduce the crashes described in
   project-chip#22320 (with the
   changes from project-chip#22978) and
   project-chip#22935 (without those
   changes).
2) Change MTRBaseSubscriptionCallback to always invoke its callbacks
   synchronously, on the Matter queue, so that we can clean up the
   MTRClusterStateCacheContainer's pointer to the ClusterStateCache before it
   gets deleted on the Matter queue.
3) Move the queueing of callbacks to the client queue into the consumers of
   MTRBaseSubscriptionCallback, so they can do whatever sync work they need
   (like the above cleanup) before going async.
4) Update documentation.
  • Loading branch information
bzbarsky-apple authored Oct 11, 2022
1 parent c368ef1 commit a7475d3
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 132 deletions.
194 changes: 117 additions & 77 deletions src/darwin/Framework/CHIP/MTRBaseDevice.mm
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,11 @@ - (void)invalidateCASESession

class SubscriptionCallback final : public MTRBaseSubscriptionCallback {
public:
SubscriptionCallback(dispatch_queue_t queue, DataReportCallback attributeReportCallback, DataReportCallback eventReportCallback,
SubscriptionCallback(DataReportCallback attributeReportCallback, DataReportCallback eventReportCallback,
ErrorCallback errorCallback, MTRDeviceResubscriptionScheduledHandler _Nullable resubscriptionScheduledHandler,
MTRSubscriptionEstablishedHandler _Nullable subscriptionEstablishedHandler, OnDoneHandler _Nullable onDoneHandler)
: MTRBaseSubscriptionCallback(queue, attributeReportCallback, eventReportCallback, errorCallback,
resubscriptionScheduledHandler, subscriptionEstablishedHandler, onDoneHandler)
: MTRBaseSubscriptionCallback(attributeReportCallback, eventReportCallback, errorCallback, resubscriptionScheduledHandler,
subscriptionEstablishedHandler, onDoneHandler)
{
}

Expand Down Expand Up @@ -286,80 +286,120 @@ - (void)subscribeWithQueue:(dispatch_queue_t)queue
// Copy params before going async.
params = [params copy];

[self.deviceController
getSessionForNode:self.nodeID
completion:^(ExchangeManager * _Nullable exchangeManager, const Optional<SessionHandle> & session,
NSError * _Nullable error) {
if (error != nil) {
dispatch_async(queue, ^{
errorHandler(error);
});
return;
}

// Wildcard endpoint, cluster, attribute, event.
auto attributePath = std::make_unique<AttributePathParams>();
auto eventPath = std::make_unique<EventPathParams>();
ReadPrepareParams readParams(session.Value());
readParams.mMinIntervalFloorSeconds = [params.minInterval unsignedShortValue];
readParams.mMaxIntervalCeilingSeconds = [params.maxInterval unsignedShortValue];
readParams.mpAttributePathParamsList = attributePath.get();
readParams.mAttributePathParamsListSize = 1;
readParams.mpEventPathParamsList = eventPath.get();
readParams.mEventPathParamsListSize = 1;
readParams.mIsFabricFiltered = params.fabricFiltered;
readParams.mKeepSubscriptions = params.keepPreviousSubscriptions;

std::unique_ptr<SubscriptionCallback> callback;
std::unique_ptr<ReadClient> readClient;
std::unique_ptr<ClusterStateCache> clusterStateCache;
if (clusterStateCacheContainer) {
__weak MTRClusterStateCacheContainer * weakPtr = clusterStateCacheContainer;
callback = std::make_unique<SubscriptionCallback>(queue, attributeReportHandler, eventReportHandler,
errorHandler, resubscriptionScheduled, subscriptionEstablished, ^{
MTRClusterStateCacheContainer * container = weakPtr;
if (container) {
container.cppClusterStateCache = nullptr;
}
});
clusterStateCache = std::make_unique<ClusterStateCache>(*callback.get());
readClient = std::make_unique<ReadClient>(InteractionModelEngine::GetInstance(), exchangeManager,
clusterStateCache->GetBufferedCallback(), ReadClient::InteractionType::Subscribe);
} else {
callback = std::make_unique<SubscriptionCallback>(queue, attributeReportHandler, eventReportHandler,
errorHandler, resubscriptionScheduled, subscriptionEstablished, nil);
readClient = std::make_unique<ReadClient>(InteractionModelEngine::GetInstance(), exchangeManager,
callback->GetBufferedCallback(), ReadClient::InteractionType::Subscribe);
}

CHIP_ERROR err;
if (!params.autoResubscribe) {
err = readClient->SendRequest(readParams);
} else {
// SendAutoResubscribeRequest cleans up the params, even on failure.
attributePath.release();
eventPath.release();
err = readClient->SendAutoResubscribeRequest(std::move(readParams));
}

if (err != CHIP_NO_ERROR) {
dispatch_async(queue, ^{
errorHandler([MTRError errorForCHIPErrorCode:err]);
});

return;
}

if (clusterStateCacheContainer) {
clusterStateCacheContainer.cppClusterStateCache = clusterStateCache.get();
// ClusterStateCache will be deleted when OnDone is called or an error is encountered as well.
callback->AdoptClusterStateCache(std::move(clusterStateCache));
}
// Callback and ReadClient will be deleted when OnDone is called or an error is
// encountered.
callback->AdoptReadClient(std::move(readClient));
callback.release();
}];
[self.deviceController getSessionForNode:self.nodeID
completion:^(ExchangeManager * _Nullable exchangeManager, const Optional<SessionHandle> & session,
NSError * _Nullable error) {
if (error != nil) {
dispatch_async(queue, ^{
errorHandler(error);
});
return;
}

// Wildcard endpoint, cluster, attribute, event.
auto attributePath = std::make_unique<AttributePathParams>();
auto eventPath = std::make_unique<EventPathParams>();
ReadPrepareParams readParams(session.Value());
readParams.mMinIntervalFloorSeconds = [params.minInterval unsignedShortValue];
readParams.mMaxIntervalCeilingSeconds = [params.maxInterval unsignedShortValue];
readParams.mpAttributePathParamsList = attributePath.get();
readParams.mAttributePathParamsListSize = 1;
readParams.mpEventPathParamsList = eventPath.get();
readParams.mEventPathParamsListSize = 1;
readParams.mIsFabricFiltered = params.fabricFiltered;
readParams.mKeepSubscriptions = params.keepPreviousSubscriptions;

std::unique_ptr<ClusterStateCache> clusterStateCache;
ReadClient::Callback * callbackForReadClient = nullptr;
OnDoneHandler onDoneHandler = nil;

if (clusterStateCacheContainer) {
__weak MTRClusterStateCacheContainer * weakPtr = clusterStateCacheContainer;
onDoneHandler = ^{
// This, like all manipulation of cppClusterStateCache, needs to run on the Matter
// queue.
MTRClusterStateCacheContainer * container = weakPtr;
if (container) {
container.cppClusterStateCache = nullptr;
}
};
}

auto callback = std::make_unique<SubscriptionCallback>(
^(NSArray * value) {
dispatch_async(queue, ^{
if (attributeReportHandler != nil) {
attributeReportHandler(value);
}
});
},
^(NSArray * value) {
dispatch_async(queue, ^{
if (eventReportHandler != nil) {
eventReportHandler(value);
}
});
},
^(NSError * error) {
dispatch_async(queue, ^{
errorHandler(error);
});
},
^(NSError * error, NSNumber * resubscriptionDelay) {
dispatch_async(queue, ^{
if (resubscriptionScheduled != nil) {
resubscriptionScheduled(error, resubscriptionDelay);
}
});
},
^(void) {
dispatch_async(queue, ^{
if (subscriptionEstablished != nil) {
subscriptionEstablished();
}
});
},
onDoneHandler);

if (clusterStateCacheContainer) {
clusterStateCache = std::make_unique<ClusterStateCache>(*callback.get());
callbackForReadClient = &clusterStateCache->GetBufferedCallback();
} else {
callbackForReadClient = &callback->GetBufferedCallback();
}

auto readClient = std::make_unique<ReadClient>(InteractionModelEngine::GetInstance(),
exchangeManager, *callbackForReadClient, ReadClient::InteractionType::Subscribe);

CHIP_ERROR err;
if (!params.autoResubscribe) {
err = readClient->SendRequest(readParams);
} else {
// SendAutoResubscribeRequest cleans up the params, even on failure.
attributePath.release();
eventPath.release();
err = readClient->SendAutoResubscribeRequest(std::move(readParams));
}

if (err != CHIP_NO_ERROR) {
dispatch_async(queue, ^{
errorHandler([MTRError errorForCHIPErrorCode:err]);
});

return;
}

if (clusterStateCacheContainer) {
clusterStateCacheContainer.cppClusterStateCache = clusterStateCache.get();
// ClusterStateCache will be deleted when OnDone is called or an error is encountered as
// well.
callback->AdoptClusterStateCache(std::move(clusterStateCache));
}
// Callback and ReadClient will be deleted when OnDone is called or an error is
// encountered.
callback->AdoptReadClient(std::move(readClient));
callback.release();
}];
}

// Convert TLV data into data-value dictionary as described in MTRDeviceResponseHandler
Expand Down
30 changes: 19 additions & 11 deletions src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,16 @@
/**
* This file defines a base class for subscription callbacks used by
* MTRBaseDevice and MTRDevice. This base class handles everything except the
* actual conversion from the incoming data to the desired data.
* actual conversion from the incoming data to the desired data and the dispatch
* of callbacks to the relevant client queues. Its callbacks are called on the
* Matter queue. This allows MTRDevice and MTRBaseDevice to do any necessary
* sync cleanup work before dispatching to the client callbacks on the client
* queue.
*
* After onDoneHandler is invoked, this object will at some point delete itself
* and destroy anything it owns (such as the ReadClient or the
* ClusterStateCache). Consumers should drop references to all the relevant
* objects in that handler. This deletion will happen on the Matter queue.
*
* The desired data is assumed to be NSObjects that can be stored in NSArray.
*/
Expand All @@ -49,12 +58,10 @@ typedef void (^OnDoneHandler)(void);

class MTRBaseSubscriptionCallback : public chip::app::ClusterStateCache::Callback {
public:
MTRBaseSubscriptionCallback(dispatch_queue_t queue, DataReportCallback attributeReportCallback,
DataReportCallback eventReportCallback, ErrorCallback errorCallback,
MTRDeviceResubscriptionScheduledHandler _Nullable resubscriptionCallback,
MTRBaseSubscriptionCallback(DataReportCallback attributeReportCallback, DataReportCallback eventReportCallback,
ErrorCallback errorCallback, MTRDeviceResubscriptionScheduledHandler _Nullable resubscriptionCallback,
SubscriptionEstablishedHandler _Nullable subscriptionEstablishedHandler, OnDoneHandler _Nullable onDoneHandler)
: mQueue(queue)
, mAttributeReportCallback(attributeReportCallback)
: mAttributeReportCallback(attributeReportCallback)
, mEventReportCallback(eventReportCallback)
, mErrorCallback(errorCallback)
, mResubscriptionCallback(resubscriptionCallback)
Expand Down Expand Up @@ -117,10 +124,9 @@ class MTRBaseSubscriptionCallback : public chip::app::ClusterStateCache::Callbac
NSMutableArray * _Nullable mEventReports = nil;

private:
dispatch_queue_t mQueue;
DataReportCallback _Nullable mAttributeReportCallback = nil;
DataReportCallback _Nullable mEventReportCallback = nil;
// We set mErrorCallback to nil when queueing error reports, so we
// We set mErrorCallback to nil before calling the error callback, so we
// make sure to only report one error.
ErrorCallback _Nullable mErrorCallback = nil;
MTRDeviceResubscriptionScheduledHandler _Nullable mResubscriptionCallback = nil;
Expand All @@ -138,9 +144,11 @@ class MTRBaseSubscriptionCallback : public chip::app::ClusterStateCache::Callbac
// To handle this, enforce the following rules:
//
// 1) We guarantee that mErrorCallback is only invoked with an error once.
// 2) We ensure that we delete ourselves and the passed in ReadClient only from OnDone or a queued-up
// error callback, but not both, by tracking whether we have a queued-up
// deletion.
// 2) We guarantee that mOnDoneHandler is only invoked once, and always
// invoked before we delete ourselves.
// 3) We ensure that we delete ourselves and the passed in ReadClient only
// from OnDone or from an error callback but not both, by tracking whether
// we have a queued-up deletion.
std::unique_ptr<chip::app::ReadClient> mReadClient;
std::unique_ptr<chip::app::ClusterStateCache> mClusterStateCache;
bool mHaveQueuedDeletion = false;
Expand Down
44 changes: 15 additions & 29 deletions src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.mm
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,17 @@
{
__block NSArray * attributeReports = mAttributeReports;
mAttributeReports = nil;
__block auto attributeCallback = mAttributeReportCallback;
auto attributeCallback = mAttributeReportCallback;

__block NSArray * eventReports = mEventReports;
mEventReports = nil;
__block auto eventCallback = mEventReportCallback;
auto eventCallback = mEventReportCallback;

if (attributeCallback != nil && attributeReports.count) {
dispatch_async(mQueue, ^{
attributeCallback(attributeReports);
});
attributeCallback(attributeReports);
}
if (eventCallback != nil && eventReports.count) {
dispatch_async(mQueue, ^{
eventCallback(eventReports);
});
eventCallback(eventReports);
}
}

Expand Down Expand Up @@ -96,7 +92,8 @@
void MTRBaseSubscriptionCallback::OnSubscriptionEstablished(SubscriptionId aSubscriptionId)
{
if (mSubscriptionEstablishedHandler) {
dispatch_async(mQueue, mSubscriptionEstablishedHandler);
auto subscriptionEstablishedHandler = mSubscriptionEstablishedHandler;
subscriptionEstablishedHandler();
}
}

Expand All @@ -109,9 +106,7 @@
auto callback = mResubscriptionCallback;
auto error = [MTRError errorForCHIPErrorCode:aTerminationCause];
auto delayMs = @(apReadClient->ComputeTimeTillNextSubscription());
dispatch_async(mQueue, ^{
callback(error, delayMs);
});
callback(error, delayMs);
}
return CHIP_NO_ERROR;
}
Expand All @@ -129,30 +124,21 @@
return;
}

__block ErrorCallback callback = mErrorCallback;
__block auto * myself = this;

auto errorCallback = mErrorCallback;
mErrorCallback = nil;
mAttributeReportCallback = nil;
mEventReportCallback = nil;
__auto_type onDoneHandler = mOnDoneHandler;
auto onDoneHandler = mOnDoneHandler;
mOnDoneHandler = nil;
dispatch_async(mQueue, ^{
callback(err);
});

errorCallback(err);
if (onDoneHandler) {
// To guarantee the async onDoneHandler call is made before
// deletion, so that clean up can happen while the callback
// object is still alive (and therefore cluster cache), queue
// deletion after calling the onDoneHandler
mHaveQueuedDeletion = true;
dispatch_async(mQueue, ^{
onDoneHandler();
dispatch_async(DeviceLayer::PlatformMgrImpl().GetWorkQueue(), ^{
delete myself;
});
});
} else if (aCancelSubscription) {
onDoneHandler();
}

if (aCancelSubscription) {
// We can't synchronously delete ourselves, because we're inside one of
// the ReadClient callbacks and we need to outlive the callback's
// execution. Queue an async deletion on the Matter queue (where we are
Expand Down
Loading

0 comments on commit a7475d3

Please sign in to comment.