Skip to content

Commit

Permalink
feat: sync skip deletes until
Browse files Browse the repository at this point in the history
  • Loading branch information
murali-shris committed Nov 13, 2024
1 parent b6d8ccc commit 570096f
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 16 deletions.
50 changes: 34 additions & 16 deletions packages/at_client/lib/src/service/sync_service_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class SyncServiceImpl implements SyncService, AtSignChangeListener {
syncRunIntervalSeconds = 5,
queueSize = 5;
//#TODO move to config
static const int syncDeltaForSkipDeletes = 10;
static const int initialSyncDelta = 10;
late final AtClient _atClient;
late final RemoteSecondary _remoteSecondary;
late final NotificationServiceImpl _statsNotificationListener;
Expand Down Expand Up @@ -67,6 +67,7 @@ class SyncServiceImpl implements SyncService, AtSignChangeListener {

/// A local AtKey to persist the last received server commitId
late final AtKey _lastReceivedServerCommitIdAtKey;
late final AtKey _isInitialSyncDone;

static Future<SyncService> create(AtClient atClient,
{required AtClientManager atClientManager,
Expand Down Expand Up @@ -94,6 +95,8 @@ class SyncServiceImpl implements SyncService, AtSignChangeListener {
_statsNotificationListener = notificationService as NotificationServiceImpl;
_lastReceivedServerCommitIdAtKey =
AtKey.local('lastreceivedservercommitid', currentAtSign).build();
_isInitialSyncDone =
AtKey.local('isinitialsyncdone', currentAtSign).build();
atKeyDecryptionManager = AtKeyDecryptionManager(_atClient);
_atClientManager.listenToAtSignChange(this);
}
Expand Down Expand Up @@ -442,15 +445,28 @@ class SyncServiceImpl implements SyncService, AtSignChangeListener {
// server has delete commit entry and the key is not present on local keystore
List<KeyInfo> keyInfoList = [];
try {
int? skipDeletesUntil;
AtValue? isInitialSyncDone;
try {
isInitialSyncDone = await _atClient.get(_isInitialSyncDone);
} on AtKeyNotFoundException {
//ignore
}
if (isInitialSyncDone == null &&
_atClient.getPreferences()!.skipDeletes &&
!_closeToIsInSync(serverCommitId, lastReceivedServerCommitId)) {
print('setting skipDeletes');
skipDeletesUntil = serverCommitId;
}
while (serverCommitId > lastReceivedServerCommitId) {
_sendTelemetry('_syncFromServer.whileLoop', {
"serverCommitId": serverCommitId,
"lastReceivedServerCommitId": lastReceivedServerCommitId
});
List<dynamic> listOfCommitEntriesFromServer =
await _getEntriesToSyncFromServer(
serverCommitId, lastReceivedServerCommitId,
localCommitIdBeforeSync: localCommitIdBeforeSync);
await _getEntriesToSyncFromServer(lastReceivedServerCommitId,
localCommitIdBeforeSync: localCommitIdBeforeSync,
skipDeletesUntil: skipDeletesUntil);
if (listOfCommitEntriesFromServer.isEmpty) {
_logger.finer(_logger.getLogMessageWithClientParticulars(
_atClient.getPreferences()!.atClientParticulars,
Expand Down Expand Up @@ -502,6 +518,10 @@ class SyncServiceImpl implements SyncService, AtSignChangeListener {
'Updating lastReceivedServerCommitId to $lastReceivedServerCommitId');
}
}
if (_closeToIsInSync(serverCommitId, lastReceivedServerCommitId)) {
print('*** setting initial sync done');
await _atClient.put(_isInitialSyncDone, AtData()..data = 'true');
}
} finally {
// The put method persists the lastReceivedServerCommitId which will be used to
// fetch the next set of entries to sync from server
Expand Down Expand Up @@ -543,20 +563,18 @@ class SyncServiceImpl implements SyncService, AtSignChangeListener {
/// Takes the last received server commit id and fetches the entries that are above the given
/// commit-id to sync into the local keystore.
Future<List<dynamic>> _getEntriesToSyncFromServer(
int serverCommitId, int lastReceivedServerCommitId,
{int? localCommitIdBeforeSync}) async {
int lastReceivedServerCommitId,
{int? localCommitIdBeforeSync,
int? skipDeletesUntil}) async {
var syncBuilder = SyncVerbBuilder()
..commitId = lastReceivedServerCommitId
..regex = _atClient.getPreferences()!.syncRegex
..limit = _atClient.getPreferences()!.syncPageLimit
..isPaginated = true;
print('atclient skip flag: ${_atClient.getPreferences()!.skipDeletes}');
if (_atClient.getPreferences()!.skipDeletes &&
_shouldSetSkipDeletes(serverCommitId, localCommitIdBeforeSync!)) {
//#TODO remove print
print('setting skip deletes');
syncBuilder.skipDeletes = true;
if (skipDeletesUntil != null) {
syncBuilder.skipDeletesUntil = skipDeletesUntil;
}

_logger.finer(_logger.getLogMessageWithClientParticulars(
_atClient.getPreferences()!.atClientParticulars,
'syncBuilder ${syncBuilder.buildCommand()}'));
Expand All @@ -579,12 +597,12 @@ class SyncServiceImpl implements SyncService, AtSignChangeListener {
return syncResponseJson;
}

bool _shouldSetSkipDeletes(int serverCommitId, int localCommitIdBeforeSync) {
bool _closeToIsInSync(int serverCommitId, int lastReceivedServerCommitId) {
//#TODO remove print
print(
'server commitId : $serverCommitId localCommitId: $localCommitIdBeforeSync');
print('diff in commit id: ${serverCommitId - localCommitIdBeforeSync}');
return serverCommitId - localCommitIdBeforeSync > syncDeltaForSkipDeletes;
'server commitId : $serverCommitId lastReceivedServerCommitId: $lastReceivedServerCommitId');
print('diff in commit id: ${serverCommitId - lastReceivedServerCommitId}');
return serverCommitId - lastReceivedServerCommitId < initialSyncDelta;
}

Future<ConflictInfo?> _setConflictInfo(final serverCommitEntry) async {
Expand Down
93 changes: 93 additions & 0 deletions packages/at_client/test/sync_new_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,12 @@ void main() {
mockAtClient.get(any(that: LastReceivedServerCommitIdMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));
when(() =>
mockAtClient.put(any(that: InitialSyncDoneFlagMatcher()), any()))
.thenAnswer((_) => Future.value(true));
when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));

SyncServiceImpl syncService = await SyncServiceImpl.create(mockAtClient,
atClientManager: mockAtClientManager,
Expand Down Expand Up @@ -2325,6 +2331,12 @@ void main() {
mockAtClient.get(any(that: LastReceivedServerCommitIdMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));
when(() =>
mockAtClient.put(any(that: InitialSyncDoneFlagMatcher()), any()))
.thenAnswer((_) => Future.value(true));
when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));
when(() => mockNotificationService.subscribe(regex: 'statsNotification'))
.thenAnswer(
(_) => StreamController<at_notification.AtNotification>().stream);
Expand Down Expand Up @@ -2456,6 +2468,12 @@ void main() {
mockAtClient.get(any(that: LastReceivedServerCommitIdMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));
when(() =>
mockAtClient.put(any(that: InitialSyncDoneFlagMatcher()), any()))
.thenAnswer((_) => Future.value(true));
when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));

SyncServiceImpl syncService = await SyncServiceImpl.create(mockAtClient,
atClientManager: mockAtClientManager,
Expand Down Expand Up @@ -2655,6 +2673,12 @@ void main() {
mockAtClient.get(any(that: LastReceivedServerCommitIdMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));
when(() =>
mockAtClient.put(any(that: InitialSyncDoneFlagMatcher()), any()))
.thenAnswer((_) => Future.value(true));
when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));

when(() => mockAtClient.getLocalSecondary())
.thenAnswer((_) => localSecondary);
Expand Down Expand Up @@ -2806,6 +2830,12 @@ void main() {
.thenAnswer((invocation) => Future.value('data:[{"value":"3"}]'));
when(() => mockAtClient.getPreferences())
.thenAnswer((_) => AtClientPreference());
when(() =>
mockAtClient.put(any(that: InitialSyncDoneFlagMatcher()), any()))
.thenAnswer((_) => Future.value(true));
when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));
when(() => mockRemoteSecondary.executeVerb(
any(that: SyncVerbBuilderMatcher()),
sync: any(
Expand Down Expand Up @@ -2941,6 +2971,12 @@ void main() {
any(that: ConflictKeyMatcher()), 'phone_key_remote_value'))
.thenAnswer((_) => throw AtPublicKeyNotFoundException(
'Encryption public key not found'));
when(() =>
mockAtClient.put(any(that: InitialSyncDoneFlagMatcher()), any()))
.thenAnswer((_) => Future.value(true));
when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));

await localSecondary.putValue(
'@alice:conflict_phone_key.demo${TestResources.atsign}',
Expand Down Expand Up @@ -3009,6 +3045,12 @@ void main() {
when(() => mockAtClient.get(any(that: ConflictKeyMatcher()))).thenAnswer(
(invocation) =>
throw KeyNotFoundException('key is not found in keystore'));
when(() =>
mockAtClient.put(any(that: InitialSyncDoneFlagMatcher()), any()))
.thenAnswer((_) => Future.value(true));
when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));

await localSecondary.executeVerb(DeleteVerbBuilder()
..atKey = (AtKey()
Expand Down Expand Up @@ -3206,6 +3248,12 @@ void main() {
.get(any(that: LastReceivedServerCommitIdMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));
when(() => mockAtClient.put(
any(that: InitialSyncDoneFlagMatcher()), any()))
.thenAnswer((_) => Future.value(true));
when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));
when(() => mockAtClient.getPreferences())
.thenAnswer((_) => AtClientPreference());

Expand Down Expand Up @@ -3366,6 +3414,12 @@ void main() {
.get(any(that: LastReceivedServerCommitIdMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));
when(() => mockAtClient.put(
any(that: InitialSyncDoneFlagMatcher()), any()))
.thenAnswer((_) => Future.value(true));
when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));
when(() => mockAtClient.getPreferences())
.thenAnswer((_) => AtClientPreference());

Expand Down Expand Up @@ -3461,6 +3515,12 @@ void main() {
.get(any(that: LastReceivedServerCommitIdMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));
when(() => mockAtClient.put(
any(that: InitialSyncDoneFlagMatcher()), any()))
.thenAnswer((_) => Future.value(true));
when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));
when(() =>
mockNotificationService.subscribe(regex: 'statsNotification'))
.thenAnswer((_) =>
Expand Down Expand Up @@ -3660,6 +3720,12 @@ void main() {
.get(any(that: LastReceivedServerCommitIdMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));
when(() => mockAtClient.put(
any(that: InitialSyncDoneFlagMatcher()), any()))
.thenAnswer((_) => Future.value(true));
when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));

//-----------------preconditions setup-----------------
await localSecondary.putValue(
Expand Down Expand Up @@ -3807,6 +3873,12 @@ void main() {
.get(any(that: LastReceivedServerCommitIdMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));
when(() => mockAtClient.put(
any(that: InitialSyncDoneFlagMatcher()), any()))
.thenAnswer((_) => Future.value(true));
when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));

//------------------------ preconditions setup ------------------------
await localSecondary.putValue(
Expand Down Expand Up @@ -3924,6 +3996,12 @@ void main() {
'A test to verify a listener is removed from sync progress call back',
() async {
//-------------------Setup-------------------
when(() => mockAtClient.put(
any(that: InitialSyncDoneFlagMatcher()), any()))
.thenAnswer((_) => Future.value(true));
when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));
var syncServiceImpl = await SyncServiceImpl.create(mockAtClient,
atClientManager: mockAtClientManager,
notificationService: mockNotificationService) as SyncServiceImpl;
Expand Down Expand Up @@ -4407,6 +4485,21 @@ class LastReceivedServerCommitIdMatcher extends Matcher {
}
}

class InitialSyncDoneFlagMatcher extends Matcher {
@override
Description describe(Description description) {
return description;
}

@override
bool matches(item, Map matchState) {
if (item is AtKey && item.key.startsWith('isinitialsyncdone')) {
return true;
}
return false;
}
}

class ConflictKeyMatcher extends Matcher {
@override
Description describe(Description description) {
Expand Down
27 changes: 27 additions & 0 deletions packages/at_client/test/sync_service_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ void main() async {
when(() => mockAtClient.put(
any(that: LastReceivedServerCommitIdMatcher()), any()))
.thenAnswer((_) => Future.value(true));
when(() =>
mockAtClient.put(any(that: InitialSyncDoneFlagMatcher()), any()))
.thenAnswer((_) => Future.value(true));
when(() => mockRemoteSecondary.executeVerb(any()))
.thenAnswer((_) => Future.value('data:${jsonEncode([
{
Expand Down Expand Up @@ -156,6 +159,9 @@ void main() async {
mockAtClient.get(any(that: LastReceivedServerCommitIdMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));
when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));

var serverCommitId = 2;
var syncRequest = SyncRequest()..result = SyncResult();
Expand Down Expand Up @@ -198,6 +204,9 @@ void main() async {
mockAtClient.get(any(that: LastReceivedServerCommitIdMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));
when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));

// ignore: prefer_typing_uninitialized_variables
var actualSyncException;
Expand Down Expand Up @@ -285,6 +294,9 @@ void main() async {
mockAtClient.get(any(that: LastReceivedServerCommitIdMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));
when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher())))
.thenAnswer((invocation) =>
throw AtKeyNotFoundException('key is not found in keystore'));

var serverCommitId = 2;
var syncRequest = SyncRequest()..result = SyncResult();
Expand Down Expand Up @@ -327,3 +339,18 @@ class LastReceivedServerCommitIdMatcher extends Matcher {
return false;
}
}

class InitialSyncDoneFlagMatcher extends Matcher {
@override
Description describe(Description description) {
return description;
}

@override
bool matches(item, Map matchState) {
if (item is AtKey && item.key.startsWith('isinitialsyncdone')) {
return true;
}
return false;
}
}

0 comments on commit 570096f

Please sign in to comment.