Skip to content

Commit

Permalink
Switch LevelDB MutationQueue and RemoteDocumentCache to use transacti…
Browse files Browse the repository at this point in the history
…ons (#968)

* Start work on leveldb transactions

* Style

* Working API. Not plumbed in yet

* Move files into correct place

* Wrangling file locations and associations

* Tests pass

* Add some comments

* style

* Fix copyright

* Rewrite iterator internals to handle deletion-while-iterating. Also add tests for same

* Switch to strings instead of slices

* Style

* More style fixes

* Start switching writegroup over

* Swap out write group tracking for transaction usage

* Style

* Response to feedback before updating docs

* Style

* Add comment

* Initialize version_

* Satisfy the linter

* Start switching writegroup over

* Swap out write group tracking for transaction usage

* Style

* Checkpoint before implementing BatchDescription

* Style

* Initial plumbing for leveldb local parts

* Add model::BatchId

* Port leveldb_key.{h,cc}

* Add string StartsWith

* Add leveldb_key_test.cc to the project

* Revert back to using leveldb::Slice for read/describe

These operations universally operate on keys obtained from leveldb so
it's actually unhelpful to force all the callers to make
absl::string_views from them.

* Everything passing

* Drop unused function

* Style

* STart work on reads

* Swap reads in queryCache to use transactions

* Fix up tests of querycache

* Drop commented out code

* Cleanup

* Style

* Fix up for passing tests

* style

* Renaming

* Style

* Start work on ToString for transactions

* Add ToString() method to LevelDbTransaction

* Style

* lint

* Fix includes, drop runTransaction

* current_transaction -> currentTransaction

* LevelDbTransaction::NewIterator now returns a unique_ptr

* Style

* Revert addition of util::StartsWith

* Add log line

* Style

* Add log line

* Style

* Add debug log line for commits, drop unused BatchDescription

* STart work on reads

* Swap reads in queryCache to use transactions

* Start on remote documents

* Transition mutation queue and remote documents to use transactions

* Style

* Make everything pass

* Make everything pass

* Make it compile

* Style

* Style

* Revert name change, use DefaultReadOptions()

* Style

* Handle iterators returning bad statuses
  • Loading branch information
Greg Soltis authored Mar 26, 2018
1 parent acc5c45 commit f77ec68
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 134 deletions.
3 changes: 1 addition & 2 deletions Firestore/Example/Tests/Local/FSTLevelDBMigrationsTests.mm
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
#import "Firestore/Source/Local/FSTLevelDBKey.h"
#import "Firestore/Source/Local/FSTLevelDBMigrations.h"
#import "Firestore/Source/Local/FSTLevelDBQueryCache.h"
#include "leveldb/db.h"

#include "Firestore/core/src/firebase/firestore/util/ordered_code.h"
#include "leveldb/db.h"

#import "Firestore/Example/Tests/Local/FSTPersistenceTestHelpers.h"

Expand Down
64 changes: 61 additions & 3 deletions Firestore/Example/Tests/Local/FSTMutationQueueTests.mm
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,32 @@ - (BOOL)isTestBaseClass {
- (void)testCountBatches {
if ([self isTestBaseClass]) return;

FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Count batches"];
XCTAssertEqual(0, [self batchCount]);
XCTAssertTrue([self.mutationQueue isEmpty]);
[self.persistence commitGroup:group];

FSTMutationBatch *batch1 = [self addMutationBatch];
group = [self.persistence startGroupWithAction:@"Count batches"];
XCTAssertEqual(1, [self batchCount]);
XCTAssertFalse([self.mutationQueue isEmpty]);
[self.persistence commitGroup:group];

FSTMutationBatch *batch2 = [self addMutationBatch];
group = [self.persistence startGroupWithAction:@"Count batches"];
XCTAssertEqual(2, [self batchCount]);
[self.persistence commitGroup:group];

[self removeMutationBatches:@[ batch2 ]];
group = [self.persistence startGroupWithAction:@"Count batches"];
XCTAssertEqual(1, [self batchCount]);
[self.persistence commitGroup:group];

[self removeMutationBatches:@[ batch1 ]];
group = [self.persistence startGroupWithAction:@"Count batches"];
XCTAssertEqual(0, [self batchCount]);
XCTAssertTrue([self.mutationQueue isEmpty]);
[self.persistence commitGroup:group];
}

- (void)testAcknowledgeBatchID {
Expand Down Expand Up @@ -114,10 +124,10 @@ - (void)testAcknowledgeThenRemove {
FSTWriteGroup *group = [self.persistence startGroupWithAction:NSStringFromSelector(_cmd)];
[self.mutationQueue acknowledgeBatch:batch1 streamToken:nil group:group];
[self.mutationQueue removeMutationBatches:@[ batch1 ] group:group];
[self.persistence commitGroup:group];

XCTAssertEqual([self batchCount], 0);
XCTAssertEqual([self.mutationQueue highestAcknowledgedBatchID], batch1.batchID);
[self.persistence commitGroup:group];
}

- (void)testHighestAcknowledgedBatchIDNeverExceedsNextBatchID {
Expand Down Expand Up @@ -166,13 +176,16 @@ - (void)testLookupMutationBatch {
if ([self isTestBaseClass]) return;

// Searching on an empty queue should not find a non-existent batch
FSTWriteGroup *group = [self.persistence startGroupWithAction:@"LookupMutationBatch"];
FSTMutationBatch *notFound = [self.mutationQueue lookupMutationBatch:42];
[self.persistence commitGroup:group];
XCTAssertNil(notFound);

NSMutableArray<FSTMutationBatch *> *batches = [self createBatches:10];
NSArray<FSTMutationBatch *> *removed = [self makeHoles:@[ @2, @6, @7 ] inBatches:batches];

// After removing, a batch should not be found
group = [self.persistence startGroupWithAction:@"LookupMutationBatch"];
for (NSUInteger i = 0; i < removed.count; i++) {
notFound = [self.mutationQueue lookupMutationBatch:removed[i].batchID];
XCTAssertNil(notFound);
Expand All @@ -187,6 +200,7 @@ - (void)testLookupMutationBatch {
// Even on a nonempty queue searching should not find a non-existent batch
notFound = [self.mutationQueue lookupMutationBatch:42];
XCTAssertNil(notFound);
[self.persistence commitGroup:group];
}

- (void)testNextMutationBatchAfterBatchID {
Expand All @@ -198,6 +212,7 @@ - (void)testNextMutationBatchAfterBatchID {
NSArray<FSTMutationBatch *> *afters = @[ batches[3], batches[8], batches[8] ];
NSArray<FSTMutationBatch *> *removed = [self makeHoles:@[ @2, @6, @7 ] inBatches:batches];

FSTWriteGroup *group = [self.persistence startGroupWithAction:@"NextMutationBatchAfterBatchID"];
for (NSUInteger i = 0; i < batches.count - 1; i++) {
FSTMutationBatch *current = batches[i];
FSTMutationBatch *next = batches[i + 1];
Expand All @@ -219,22 +234,29 @@ - (void)testNextMutationBatchAfterBatchID {
FSTMutationBatch *last = batches[batches.count - 1];
FSTMutationBatch *notFound = [self.mutationQueue nextMutationBatchAfterBatchID:last.batchID];
XCTAssertNil(notFound);
[self.persistence commitGroup:group];
}

- (void)testNextMutationBatchAfterBatchIDSkipsAcknowledgedBatches {
if ([self isTestBaseClass]) return;

NSMutableArray<FSTMutationBatch *> *batches = [self createBatches:3];
FSTWriteGroup *group = [self.persistence
startGroupWithAction:@"NextMutationBatchAfterBatchIDSkipsAcknowledgedBatches"];
XCTAssertEqualObjects([self.mutationQueue nextMutationBatchAfterBatchID:kFSTBatchIDUnknown],
batches[0]);
[self.persistence commitGroup:group];

[self acknowledgeBatch:batches[0]];
group = [self.persistence
startGroupWithAction:@"NextMutationBatchAfterBatchIDSkipsAcknowledgedBatches"];
XCTAssertEqualObjects([self.mutationQueue nextMutationBatchAfterBatchID:kFSTBatchIDUnknown],
batches[1]);
XCTAssertEqualObjects([self.mutationQueue nextMutationBatchAfterBatchID:batches[0].batchID],
batches[1]);
XCTAssertEqualObjects([self.mutationQueue nextMutationBatchAfterBatchID:batches[1].batchID],
batches[2]);
[self.persistence commitGroup:group];
}

- (void)testAllMutationBatchesThroughBatchID {
Expand All @@ -245,6 +267,8 @@ - (void)testAllMutationBatchesThroughBatchID {

NSArray<FSTMutationBatch *> *found, *expected;

FSTWriteGroup *group =
[self.persistence startGroupWithAction:@"AllMutationBatchesThroughBatchID"];
found = [self.mutationQueue allMutationBatchesThroughBatchID:batches[0].batchID - 1];
XCTAssertEqualObjects(found, (@[]));

Expand All @@ -253,6 +277,7 @@ - (void)testAllMutationBatchesThroughBatchID {
expected = [batches subarrayWithRange:NSMakeRange(0, i + 1)];
XCTAssertEqualObjects(found, expected, @"for index %lu", (unsigned long)i);
}
[self.persistence commitGroup:group];
}

- (void)testAllMutationBatchesAffectingDocumentKey {
Expand Down Expand Up @@ -283,12 +308,12 @@ - (void)testAllMutationBatchesAffectingDocumentKey {
group:group];
[batches addObject:batch];
}
[self.persistence commitGroup:group];

NSArray<FSTMutationBatch *> *expected = @[ batches[1], batches[2] ];
NSArray<FSTMutationBatch *> *matches =
[self.mutationQueue allMutationBatchesAffectingDocumentKey:testutil::Key("foo/bar")];

[self.persistence commitGroup:group];
XCTAssertEqualObjects(matches, expected);
}

Expand Down Expand Up @@ -320,12 +345,12 @@ - (void)testAllMutationBatchesAffectingQuery {
group:group];
[batches addObject:batch];
}
[self.persistence commitGroup:group];

NSArray<FSTMutationBatch *> *expected = @[ batches[1], batches[2], batches[4] ];
FSTQuery *query = FSTTestQuery("foo");
NSArray<FSTMutationBatch *> *matches =
[self.mutationQueue allMutationBatchesAffectingQuery:query];
[self.persistence commitGroup:group];

XCTAssertEqualObjects(matches, expected);
}
Expand All @@ -338,48 +363,68 @@ - (void)testRemoveMutationBatches {

[self removeMutationBatches:@[ batches[0] ]];
[batches removeObjectAtIndex:0];
FSTWriteGroup *group = [self.persistence startGroupWithAction:@"RemoveMutationBatches"];
XCTAssertEqual([self batchCount], 9);
[self.persistence commitGroup:group];

NSArray<FSTMutationBatch *> *found;

group = [self.persistence startGroupWithAction:@"RemoveMutationBatches"];
found = [self.mutationQueue allMutationBatchesThroughBatchID:last.batchID];
XCTAssertEqualObjects(found, batches);
XCTAssertEqual(found.count, 9);
[self.persistence commitGroup:group];

[self removeMutationBatches:@[ batches[0], batches[1], batches[2] ]];
[batches removeObjectsInRange:NSMakeRange(0, 3)];
group = [self.persistence startGroupWithAction:@"RemoveMutationBatches"];
XCTAssertEqual([self batchCount], 6);
[self.persistence commitGroup:group];

group = [self.persistence startGroupWithAction:@"RemoveMutationBatches"];
found = [self.mutationQueue allMutationBatchesThroughBatchID:last.batchID];
XCTAssertEqualObjects(found, batches);
XCTAssertEqual(found.count, 6);
[self.persistence commitGroup:group];

[self removeMutationBatches:@[ batches[batches.count - 1] ]];
[batches removeObjectAtIndex:batches.count - 1];
group = [self.persistence startGroupWithAction:@"RemoveMutationBatches"];
XCTAssertEqual([self batchCount], 5);
[self.persistence commitGroup:group];

group = [self.persistence startGroupWithAction:@"RemoveMutationBatches"];
found = [self.mutationQueue allMutationBatchesThroughBatchID:last.batchID];
XCTAssertEqualObjects(found, batches);
XCTAssertEqual(found.count, 5);
[self.persistence commitGroup:group];

[self removeMutationBatches:@[ batches[3] ]];
[batches removeObjectAtIndex:3];
group = [self.persistence startGroupWithAction:@"RemoveMutationBatches"];
XCTAssertEqual([self batchCount], 4);
[self.persistence commitGroup:group];

[self removeMutationBatches:@[ batches[1] ]];
[batches removeObjectAtIndex:1];
group = [self.persistence startGroupWithAction:@"RemoveMutationBatches"];
XCTAssertEqual([self batchCount], 3);
[self.persistence commitGroup:group];

group = [self.persistence startGroupWithAction:@"RemoveMutationBatches"];
found = [self.mutationQueue allMutationBatchesThroughBatchID:last.batchID];
XCTAssertEqualObjects(found, batches);
XCTAssertEqual(found.count, 3);
XCTAssertFalse([self.mutationQueue isEmpty]);
[self.persistence commitGroup:group];

[self removeMutationBatches:batches];
group = [self.persistence startGroupWithAction:@"RemoveMutationBatches"];
found = [self.mutationQueue allMutationBatchesThroughBatchID:last.batchID];
XCTAssertEqualObjects(found, @[]);
XCTAssertEqual(found.count, 0);
XCTAssertTrue([self.mutationQueue isEmpty]);
[self.persistence commitGroup:group];
}

- (void)testRemoveMutationBatchesEmitsGarbageEvents {
Expand All @@ -399,29 +444,42 @@ - (void)testRemoveMutationBatchesEmitsGarbageEvents {
]];

[self removeMutationBatches:@[ batches[0] ]];
FSTWriteGroup *group =
[self.persistence startGroupWithAction:@"RemoveMutationBatchesEmitsGarbageEvents"];
std::set<DocumentKey> garbage = [garbageCollector collectGarbage];
XCTAssertEqual(garbage, std::set<DocumentKey>({}));
[self.persistence commitGroup:group];

[self removeMutationBatches:@[ batches[1] ]];
group = [self.persistence startGroupWithAction:@"RemoveMutationBatchesEmitsGarbageEvents"];
garbage = [garbageCollector collectGarbage];
XCTAssertEqual(garbage, std::set<DocumentKey>({testutil::Key("foo/ba")}));
[self.persistence commitGroup:group];

[self removeMutationBatches:@[ batches[5] ]];
group = [self.persistence startGroupWithAction:@"RemoveMutationBatchesEmitsGarbageEvents"];
garbage = [garbageCollector collectGarbage];
XCTAssertEqual(garbage, std::set<DocumentKey>({testutil::Key("bar/baz")}));
[self.persistence commitGroup:group];

[self removeMutationBatches:@[ batches[2], batches[3] ]];
group = [self.persistence startGroupWithAction:@"RemoveMutationBatchesEmitsGarbageEvents"];
garbage = [garbageCollector collectGarbage];
XCTAssertEqual(garbage,
std::set<DocumentKey>({testutil::Key("foo/bar"), testutil::Key("foo/bar2")}));
[self.persistence commitGroup:group];

[batches addObject:[self addMutationBatchWithKey:@"foo/bar/suffix/baz"]];
group = [self.persistence startGroupWithAction:@"RemoveMutationBatchesEmitsGarbageEvents"];
garbage = [garbageCollector collectGarbage];
XCTAssertEqual(garbage, std::set<DocumentKey>({}));
[self.persistence commitGroup:group];

[self removeMutationBatches:@[ batches[4], batches[6] ]];
group = [self.persistence startGroupWithAction:@"RemoveMutationBatchesEmitsGarbageEvents"];
garbage = [garbageCollector collectGarbage];
XCTAssertEqual(garbage, std::set<DocumentKey>({testutil::Key("foo/bar/suffix/baz")}));
[self.persistence commitGroup:group];
}

- (void)testStreamToken {
Expand Down
7 changes: 6 additions & 1 deletion Firestore/Example/Tests/Local/FSTRemoteDocumentCacheTests.mm
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,15 @@ - (void)testDocumentsMatchingQuery {
[self setTestDocumentAtPath:"c/1"];

FSTQuery *query = FSTTestQuery("b");
FSTWriteGroup *group = [self.persistence startGroupWithAction:@"DocumentsMatchingQuery"];
FSTDocumentDictionary *results = [self.remoteDocumentCache documentsMatchingQuery:query];
NSArray *expected =
@[ FSTTestDoc("b/1", kVersion, _kDocData, NO), FSTTestDoc("b/2", kVersion, _kDocData, NO) ];
XCTAssertEqual([results count], [expected count]);
for (FSTDocument *doc in expected) {
XCTAssertEqualObjects([results objectForKey:doc.key], doc);
}
[self.persistence commitGroup:group];
}

#pragma mark - Helpers
Expand All @@ -144,7 +146,10 @@ - (void)addEntry:(FSTMaybeDocument *)maybeDoc {
}

- (FSTMaybeDocument *_Nullable)readEntryAtPath:(const absl::string_view)path {
return [self.remoteDocumentCache entryForKey:testutil::Key(path)];
FSTWriteGroup *group = [self.persistence startGroupWithAction:@"ReadEntryAtPath"];
FSTMaybeDocument *result = [self.remoteDocumentCache entryForKey:testutil::Key(path)];
[self.persistence commitGroup:group];
return result;
}

- (void)removeEntryAtPath:(const absl::string_view)path {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ - (void)tearDown {
}

- (void)testReadUnchangedEntry {
FSTWriteGroup *group = [_db startGroupWithAction:@"ReadUnchangedEntry"];
XCTAssertEqualObjects([_remoteDocumentBuffer entryForKey:FSTTestDocKey(@"coll/a")],
_kInitialADoc);
[_db commitGroup:group];
}

- (void)testAddEntryAndReadItBack {
Expand All @@ -78,24 +80,28 @@ - (void)testAddEntryAndReadItBack {
XCTAssertEqualObjects([_remoteDocumentBuffer entryForKey:FSTTestDocKey(@"coll/a")], newADoc);

// B should still be unchanged.
FSTWriteGroup *group = [_db startGroupWithAction:@"AddEntryAndReadItBack"];
XCTAssertEqualObjects([_remoteDocumentBuffer entryForKey:FSTTestDocKey(@"coll/b")],
_kInitialBDoc);
[_db commitGroup:group];
}

- (void)testApplyChanges {
FSTMaybeDocument *newADoc = FSTTestDoc("coll/a", 43, @{@"new" : @"data"}, NO);
[_remoteDocumentBuffer addEntry:newADoc];
FSTWriteGroup *group = [_db startGroupWithAction:@"Apply Changes"];
XCTAssertEqualObjects([_remoteDocumentBuffer entryForKey:FSTTestDocKey(@"coll/a")], newADoc);

// Reading directly against the cache should still yield the old result.
XCTAssertEqualObjects([_remoteDocumentCache entryForKey:FSTTestDocKey(@"coll/a")], _kInitialADoc);
[_db commitGroup:group];

FSTWriteGroup *group = [_db startGroupWithAction:@"Apply changes"];
group = [_db startGroupWithAction:@"Apply changes"];
[_remoteDocumentBuffer applyToWriteGroup:group];
[_db commitGroup:group];

// Reading against the cache should now yield the new result.
XCTAssertEqualObjects([_remoteDocumentCache entryForKey:FSTTestDocKey(@"coll/a")], newADoc);
[_db commitGroup:group];
}

- (void)testMethodsThrowAfterApply {
Expand Down
4 changes: 2 additions & 2 deletions Firestore/Source/Local/FSTLevelDB.mm
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,15 @@ - (LevelDbTransaction *)currentTransaction {
#pragma mark - Persistence Factory methods

- (id<FSTMutationQueue>)mutationQueueForUser:(const User &)user {
return [FSTLevelDBMutationQueue mutationQueueWithUser:user db:_ptr serializer:self.serializer];
return [FSTLevelDBMutationQueue mutationQueueWithUser:user db:self serializer:self.serializer];
}

- (id<FSTQueryCache>)queryCache {
return [[FSTLevelDBQueryCache alloc] initWithDB:self serializer:self.serializer];
}

- (id<FSTRemoteDocumentCache>)remoteDocumentCache {
return [[FSTLevelDBRemoteDocumentCache alloc] initWithDB:_ptr serializer:self.serializer];
return [[FSTLevelDBRemoteDocumentCache alloc] initWithDB:self serializer:self.serializer];
}

- (FSTWriteGroup *)startGroupWithAction:(NSString *)action {
Expand Down
2 changes: 1 addition & 1 deletion Firestore/Source/Local/FSTLevelDBMutationQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ NS_ASSUME_NONNULL_BEGIN
* @param db The LevelDB in which to create the queue.
*/
+ (instancetype)mutationQueueWithUser:(const firebase::firestore::auth::User &)user
db:(std::shared_ptr<leveldb::DB>)db
db:(FSTLevelDB *)db
serializer:(FSTLocalSerializer *)serializer;

/**
Expand Down
Loading

0 comments on commit f77ec68

Please sign in to comment.