Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBL-4428 : Fix crash when starting multiple live queries concurrently #3209

Merged
merged 1 commit into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 16 additions & 21 deletions Objective-C/CBLQuery.mm
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,6 @@ - (void) dealloc {
}];
}

- (NSString*) description {
if (_language == kC4JSONQuery) {
NSString* desc = [[NSString alloc] initWithData: _json encoding: NSUTF8StringEncoding];
return [NSString stringWithFormat: @"%@[json=%@]", self.class, desc];
} else {
return [NSString stringWithFormat: @"%@[n1ql=%@]", self.class, _expressions];
}
}

#pragma mark - Parameters

- (CBLQueryParameters*) parameters {
Expand Down Expand Up @@ -263,7 +254,7 @@ - (nullable CBLQueryResultSet*) execute: (NSError**)outError {
}];

if (!e) {
CBLWarnError(Query, @"CBLQuery failed: %d/%d", c4Err.domain, c4Err.code);
CBLWarnError(Query, @"%@: Failed to execute with error: %d/%d", self, c4Err.domain, c4Err.code);
convertError(c4Err, outError);
return nullptr;
}
Expand All @@ -283,20 +274,25 @@ - (nullable CBLQueryResultSet*) execute: (NSError**)outError {
CBLAssertNotNil(listener);

CBL_LOCK(self) {
if (!_changeNotifier)
// Only use CBLChangeNotifier for creating and maintaining the tokens. The CBLQueryObserver object will
// be created per token and will post change to its token directly instead of using the CBLChangeNotifier.
if (!_changeNotifier) {
_changeNotifier = [CBLChangeNotifier new];
}

CBLChangeListenerToken* token = [_changeNotifier addChangeListenerWithQueue: queue
listener: listener
delegate: self];
CBLChangeListenerToken<CBLQueryChange*>* token = [_changeNotifier addChangeListenerWithQueue: queue
listener: listener
delegate: self];

// create c4queryobs & start immediately
CBLQueryObserver* obs = [[CBLQueryObserver alloc] initWithQuery: self
columnNames: _columnNames
token: token];
// The CBLQueryObserver retains both query (self) and the token. Two circular retain references will happen:
// * query (self) -> _changeNotifier -> obs -> query
// * obs -> token -> obs (_token.context)
// With the current design which is not obvious, the two circular retain references will be broken when the
// obs is stopped. An alternative more obvious approach would be using a stopped callback or delegate to
// break the circles.
CBLQueryObserver* obs = [[CBLQueryObserver alloc] initWithQuery: self columnNames: _columnNames token: token];
[obs start];
token.context = obs;

return token;
}
}
Expand All @@ -306,8 +302,7 @@ - (void) removeChangeListenerWithToken: (id<CBLListenerToken>)token {

CBL_LOCK(self) {
CBLChangeListenerToken* t = (CBLChangeListenerToken*)token;
[(CBLQueryObserver*)t.context stopAndFree];

[(CBLQueryObserver*)t.context stop];
[_changeNotifier removeChangeListenerWithToken: token];
}
}
Expand Down
9 changes: 5 additions & 4 deletions Objective-C/Internal/CBLQueryObserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
#import <Foundation/Foundation.h>

@protocol CBLListenerToken;
@class CBLQuery;
@class CBLChangeListenerToken;
@class CBLChangeNotifier;
@class CBLQuery;
@class CBLQueryChange;

NS_ASSUME_NONNULL_BEGIN
Expand All @@ -32,14 +33,14 @@ NS_ASSUME_NONNULL_BEGIN

/** Initialize with a Query. */
- (instancetype) initWithQuery: (CBLQuery*)query
columnNames: (NSDictionary *)columnNames
token: (id<CBLListenerToken>)token;
columnNames: (NSDictionary*)columnNames
token: (CBLChangeListenerToken*)token;

/** Starts the observer */
- (void) start;

/** Stops and frees the observer */
- (void) stopAndFree;
- (void) stop;

@end

Expand Down
105 changes: 49 additions & 56 deletions Objective-C/Internal/CBLQueryObserver.m
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,23 @@

@interface CBLQueryObserver () <CBLStoppable>

@property (nonatomic, readonly) CBLQuery* query;
/** The query object will be set to nil when the replicator is stopped to break the circular retain references. */
@property (nonatomic, readonly, nullable) CBLQuery* query;

@end

@implementation CBLQueryObserver {
CBLQuery* _query;
NSDictionary* _columnNames;
C4QueryObserver* _obs;
CBLChangeNotifier* _listenerToken;
CBLChangeListenerToken<CBLQueryChange*>* _token;
}

#pragma mark - Constructor

- (instancetype) initWithQuery: (CBLQuery*)query
columnNames: (NSDictionary *)columnNames
token: (id<CBLListenerToken>)token {
columnNames: (NSDictionary*)columnNames
token: (CBLChangeListenerToken<CBLQueryChange*>*)token {
NSParameterAssert(query);
NSParameterAssert(columnNames);
NSParameterAssert(token);
Expand All @@ -49,7 +50,10 @@ - (instancetype) initWithQuery: (CBLQuery*)query
if (self) {
_query = query;
_columnNames = columnNames;
_listenerToken = token;
_token = token;

// https://github.com/couchbase/couchbase-lite-core/wiki/Thread-Safety
// c4queryobs_create is thread-safe.
_obs = c4queryobs_create(query.c4query, liveQueryCallback, (__bridge void *)self);

[query.database addActiveStoppable: self];
Expand All @@ -58,91 +62,80 @@ - (instancetype) initWithQuery: (CBLQuery*)query
}

- (void) dealloc {
if (_obs) {
[self stopAndFree];
}
[self stop];
}

#pragma mark - Methods

- (void) start {
[self observerEnable: YES];
}

- (void) stopAndFree {
CBL_LOCK(self) {
if (_obs) {
[self observerEnable: NO];
c4queryobs_free(_obs);
_obs = nil;
}
Assert(_query, @"QueryObserver cannot be restarted.");
[_query.database safeBlock: ^{
c4queryobs_setEnabled(_obs, true);
}];
}

[_query.database removeActiveStoppable: self];

_query = nil;
_columnNames = nil;
_listenerToken = nil;
}

#pragma mark - Internal

- (CBLQuery*) query {
return _query;
}

- (NSString*) description {
return [NSString stringWithFormat:@"%@[%@:%@]%@", self.class, [_query description], _obs, [_listenerToken description]];
CBL_LOCK(self) {
return _query;
}
}

- (void) stop {
[self stopAndFree];
CBL_LOCK(self) {
if (!_query) {
return;
}

[_query.database safeBlock: ^{
c4queryobs_setEnabled(_obs, false);
c4queryobs_free(_obs);
_obs = nil;
[_query.database removeActiveStoppable: self];
}];

_query = nil; // Break circular reference cycle
_token = nil; // Break circular reference cycle
}
}

#pragma mark - Private

static void liveQueryCallback(C4QueryObserver *obs, C4Query *query, void *context) {
CBLQueryObserver *queryObs = (__bridge CBLQueryObserver*)context;
dispatch_queue_t queue = [queryObs query].database.queryQueue;
if (!queue)
static void liveQueryCallback(C4QueryObserver *obs, C4Query *c4query, void *context) {
CBLQueryObserver* queryObs = (__bridge CBLQueryObserver*)context;
CBLQuery* query = queryObs.query;
if (!query) {
return;
}

dispatch_async(queue, ^{
dispatch_async(query.database.queryQueue, ^{
[queryObs postQueryChange: obs];
});
};

- (void) postQueryChange: (C4QueryObserver*)obs {
CBL_LOCK(self) {
C4Error c4error = {};

// Note: enumerator('e') will be released in ~QueryResultContext; no need to release it
C4QueryEnumerator* e = c4queryobs_getEnumerator(obs, true, &c4error);
if (!e) {
CBLLogInfo(Query, @"%@: C4QueryEnumerator returns empty (%d/%d)",
self, c4error.domain, c4error.code);
if (!_query) {
return;
}

CBLQueryResultSet *rs = [[CBLQueryResultSet alloc] initWithQuery: self.query
enumerator: e
columnNames: _columnNames];
// Note: enumerator('result') will be released in ~QueryResultContext; no need to release it
__block C4Error c4error = {};
__block C4QueryEnumerator* result = NULL;
[_query.database safeBlock: ^{
result = c4queryobs_getEnumerator(obs, true, &c4error);
}];

if (!rs) {
CBLLogInfo(Query, @"%@: Result set returns empty", self);
if (!result) {
CBLLogVerbose(Query, @"%@: Ignore an empty result (%d/%d)", self, c4error.domain, c4error.code);
return;
}

NSError* error = nil;
[_listenerToken postChange: [[CBLQueryChange alloc] initWithQuery: self.query
results: rs
error: error]];
}
}

- (void) observerEnable: (BOOL)enable {
CBL_LOCK(self) {
c4queryobs_setEnabled(_obs, enable);
CBLQueryResultSet* rs = [[CBLQueryResultSet alloc] initWithQuery: _query enumerator: result columnNames: _columnNames];
[_token postChange: [[CBLQueryChange alloc] initWithQuery: _query results: rs error: nil]];
}
}

Expand Down
31 changes: 29 additions & 2 deletions Objective-C/Tests/QueryTest+Main.m
Original file line number Diff line number Diff line change
Expand Up @@ -1878,6 +1878,35 @@ - (void) testLiveQueryNoUpdate {
[q removeChangeListenerWithToken: token];
}

// CBSE-15957: Crash when creating multiple live queries concurrently
- (void) testCreateLiveQueriesConcurrently {
// Create 1000 docs:
[self loadNumbers: 1000];

// Create two live queries then remove them concurrently:
XCTestExpectation* remExp = [self expectationWithDescription: @"Listener Removed"];
remExp.expectedFulfillmentCount = 2;

dispatch_queue_t queue = dispatch_queue_create("query-queue", DISPATCH_QUEUE_CONCURRENT);
for (int i = 0; i < 2; i++) {
dispatch_async(queue, ^{
XCTestExpectation* exp = [self expectationWithDescription: @"Change Received"];
NSError* error;
NSString* str = [NSString stringWithFormat: @"select * from _ where number1 < %d", (i * 200)];
CBLQuery* query = [self.db createQuery: str error: &error];
AssertNotNil(query);
id token = [query addChangeListener:^(CBLQueryChange* change) {
[exp fulfill];
}];
[self waitForExpectations: @[exp] timeout: 5.0];
[token remove];
[remExp fulfill];
});
}

[self waitForExpectations: @[remExp] timeout: 5.0];
}

/**
When adding a second listener after the first listener is notified, the second listener
should get the change (current result).
Expand Down Expand Up @@ -1921,9 +1950,7 @@ - (void) testLiveQueryReturnsEmptyResultSet {
CBLQuery* q = [CBLQueryBuilder select: @[[CBLQuerySelectResult property: @"number1"]]
from: [CBLQueryDataSource database: self.db]];
XCTestExpectation* first = [self expectationWithDescription: @"1st change"];
__block int count = 0;
id token = [q addChangeListener: ^(CBLQueryChange* change) {
count++;
NSArray<CBLQueryResult*>* rows = [change.results allObjects];
AssertEqual(rows.count, 0);
[first fulfill];
Expand Down
11 changes: 10 additions & 1 deletion Swift/ListenerToken.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,23 @@ public class ListenerToken {
/// Remove the listener associated with the token.
public func remove() {
impl.remove()

if let removedBlock = self.removedBlock {
removedBlock(self)
}
}

// MARK: Internal

init(_ impl: CBLListenerToken) {
typealias RemovedBlock = (_ token: ListenerToken) -> Void

init(_ impl: CBLListenerToken, removeBlock: RemovedBlock? = nil) {
self.impl = impl
self.removedBlock = removeBlock
}

let impl: CBLListenerToken

let removedBlock: RemovedBlock?

}
19 changes: 12 additions & 7 deletions Swift/Query.swift
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ public class Query {
database.addQuery(self)
}

let listenerToken = ListenerToken(token)
let listenerToken = ListenerToken(token) { token in
self.didRemoveChangeListener(withToken: token)
}

tokens.add(listenerToken)
return listenerToken
}
Expand All @@ -124,15 +127,19 @@ public class Query {
///
/// - Parameter token: The listener token.
public func removeChangeListener(withToken token: ListenerToken) {
token.remove()
}

// MARK: Internal

func didRemoveChangeListener(withToken token: ListenerToken) {
lock.lock()
prepareQuery()
queryImpl!.removeChangeListener(with: token.impl)
tokens.remove(token)
defer { lock.unlock() }

tokens.remove(token)
if tokens.count == 0 {
database.removeQuery(self)
}
lock.unlock()
}

/// Encoded JSON representation of the query.
Expand Down Expand Up @@ -160,8 +167,6 @@ public class Query {
self.database = database
queryImpl = try database.impl.createQuery(expressions)
}

// MARK: Internal

var params: Parameters?

Expand Down
Loading
Loading