diff --git a/packages/at_client/lib/src/service/sync_service_impl.dart b/packages/at_client/lib/src/service/sync_service_impl.dart index 1dd282da0..5126ead22 100644 --- a/packages/at_client/lib/src/service/sync_service_impl.dart +++ b/packages/at_client/lib/src/service/sync_service_impl.dart @@ -26,7 +26,7 @@ class SyncServiceImpl implements SyncService, AtSignChangeListener { syncRunIntervalSeconds = 5, queueSize = 5; //#TODO move to config - static const int syncDeltaForSkipDeletes = 10; + static const int initialSyncDelta = 10; late final AtClient _atClient; late final RemoteSecondary _remoteSecondary; late final NotificationServiceImpl _statsNotificationListener; @@ -67,6 +67,7 @@ class SyncServiceImpl implements SyncService, AtSignChangeListener { /// A local AtKey to persist the last received server commitId late final AtKey _lastReceivedServerCommitIdAtKey; + late final AtKey _isInitialSyncDone; static Future create(AtClient atClient, {required AtClientManager atClientManager, @@ -94,6 +95,8 @@ class SyncServiceImpl implements SyncService, AtSignChangeListener { _statsNotificationListener = notificationService as NotificationServiceImpl; _lastReceivedServerCommitIdAtKey = AtKey.local('lastreceivedservercommitid', currentAtSign).build(); + _isInitialSyncDone = + AtKey.local('isinitialsyncdone', currentAtSign).build(); atKeyDecryptionManager = AtKeyDecryptionManager(_atClient); _atClientManager.listenToAtSignChange(this); } @@ -442,15 +445,28 @@ class SyncServiceImpl implements SyncService, AtSignChangeListener { // server has delete commit entry and the key is not present on local keystore List keyInfoList = []; try { + int? skipDeletesUntil; + AtValue? isInitialSyncDone; + try { + isInitialSyncDone = await _atClient.get(_isInitialSyncDone); + } on AtKeyNotFoundException { + //ignore + } + if (isInitialSyncDone == null && + _atClient.getPreferences()!.skipDeletes && + !_closeToIsInSync(serverCommitId, lastReceivedServerCommitId)) { + print('setting skipDeletes'); + skipDeletesUntil = serverCommitId; + } while (serverCommitId > lastReceivedServerCommitId) { _sendTelemetry('_syncFromServer.whileLoop', { "serverCommitId": serverCommitId, "lastReceivedServerCommitId": lastReceivedServerCommitId }); List listOfCommitEntriesFromServer = - await _getEntriesToSyncFromServer( - serverCommitId, lastReceivedServerCommitId, - localCommitIdBeforeSync: localCommitIdBeforeSync); + await _getEntriesToSyncFromServer(lastReceivedServerCommitId, + localCommitIdBeforeSync: localCommitIdBeforeSync, + skipDeletesUntil: skipDeletesUntil); if (listOfCommitEntriesFromServer.isEmpty) { _logger.finer(_logger.getLogMessageWithClientParticulars( _atClient.getPreferences()!.atClientParticulars, @@ -502,6 +518,10 @@ class SyncServiceImpl implements SyncService, AtSignChangeListener { 'Updating lastReceivedServerCommitId to $lastReceivedServerCommitId'); } } + if (_closeToIsInSync(serverCommitId, lastReceivedServerCommitId)) { + print('*** setting initial sync done'); + await _atClient.put(_isInitialSyncDone, AtData()..data = 'true'); + } } finally { // The put method persists the lastReceivedServerCommitId which will be used to // fetch the next set of entries to sync from server @@ -543,20 +563,18 @@ class SyncServiceImpl implements SyncService, AtSignChangeListener { /// Takes the last received server commit id and fetches the entries that are above the given /// commit-id to sync into the local keystore. Future> _getEntriesToSyncFromServer( - int serverCommitId, int lastReceivedServerCommitId, - {int? localCommitIdBeforeSync}) async { + int lastReceivedServerCommitId, + {int? localCommitIdBeforeSync, + int? skipDeletesUntil}) async { var syncBuilder = SyncVerbBuilder() ..commitId = lastReceivedServerCommitId ..regex = _atClient.getPreferences()!.syncRegex ..limit = _atClient.getPreferences()!.syncPageLimit ..isPaginated = true; - print('atclient skip flag: ${_atClient.getPreferences()!.skipDeletes}'); - if (_atClient.getPreferences()!.skipDeletes && - _shouldSetSkipDeletes(serverCommitId, localCommitIdBeforeSync!)) { - //#TODO remove print - print('setting skip deletes'); - syncBuilder.skipDeletes = true; + if (skipDeletesUntil != null) { + syncBuilder.skipDeletesUntil = skipDeletesUntil; } + _logger.finer(_logger.getLogMessageWithClientParticulars( _atClient.getPreferences()!.atClientParticulars, 'syncBuilder ${syncBuilder.buildCommand()}')); @@ -579,12 +597,12 @@ class SyncServiceImpl implements SyncService, AtSignChangeListener { return syncResponseJson; } - bool _shouldSetSkipDeletes(int serverCommitId, int localCommitIdBeforeSync) { + bool _closeToIsInSync(int serverCommitId, int lastReceivedServerCommitId) { //#TODO remove print print( - 'server commitId : $serverCommitId localCommitId: $localCommitIdBeforeSync'); - print('diff in commit id: ${serverCommitId - localCommitIdBeforeSync}'); - return serverCommitId - localCommitIdBeforeSync > syncDeltaForSkipDeletes; + 'server commitId : $serverCommitId lastReceivedServerCommitId: $lastReceivedServerCommitId'); + print('diff in commit id: ${serverCommitId - lastReceivedServerCommitId}'); + return serverCommitId - lastReceivedServerCommitId < initialSyncDelta; } Future _setConflictInfo(final serverCommitEntry) async { diff --git a/packages/at_client/test/sync_new_test.dart b/packages/at_client/test/sync_new_test.dart index 1a816fb38..d14b18619 100644 --- a/packages/at_client/test/sync_new_test.dart +++ b/packages/at_client/test/sync_new_test.dart @@ -1308,6 +1308,12 @@ void main() { mockAtClient.get(any(that: LastReceivedServerCommitIdMatcher()))) .thenAnswer((invocation) => throw AtKeyNotFoundException('key is not found in keystore')); + when(() => + mockAtClient.put(any(that: InitialSyncDoneFlagMatcher()), any())) + .thenAnswer((_) => Future.value(true)); + when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher()))) + .thenAnswer((invocation) => + throw AtKeyNotFoundException('key is not found in keystore')); SyncServiceImpl syncService = await SyncServiceImpl.create(mockAtClient, atClientManager: mockAtClientManager, @@ -2325,6 +2331,12 @@ void main() { mockAtClient.get(any(that: LastReceivedServerCommitIdMatcher()))) .thenAnswer((invocation) => throw AtKeyNotFoundException('key is not found in keystore')); + when(() => + mockAtClient.put(any(that: InitialSyncDoneFlagMatcher()), any())) + .thenAnswer((_) => Future.value(true)); + when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher()))) + .thenAnswer((invocation) => + throw AtKeyNotFoundException('key is not found in keystore')); when(() => mockNotificationService.subscribe(regex: 'statsNotification')) .thenAnswer( (_) => StreamController().stream); @@ -2456,6 +2468,12 @@ void main() { mockAtClient.get(any(that: LastReceivedServerCommitIdMatcher()))) .thenAnswer((invocation) => throw AtKeyNotFoundException('key is not found in keystore')); + when(() => + mockAtClient.put(any(that: InitialSyncDoneFlagMatcher()), any())) + .thenAnswer((_) => Future.value(true)); + when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher()))) + .thenAnswer((invocation) => + throw AtKeyNotFoundException('key is not found in keystore')); SyncServiceImpl syncService = await SyncServiceImpl.create(mockAtClient, atClientManager: mockAtClientManager, @@ -2655,6 +2673,12 @@ void main() { mockAtClient.get(any(that: LastReceivedServerCommitIdMatcher()))) .thenAnswer((invocation) => throw AtKeyNotFoundException('key is not found in keystore')); + when(() => + mockAtClient.put(any(that: InitialSyncDoneFlagMatcher()), any())) + .thenAnswer((_) => Future.value(true)); + when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher()))) + .thenAnswer((invocation) => + throw AtKeyNotFoundException('key is not found in keystore')); when(() => mockAtClient.getLocalSecondary()) .thenAnswer((_) => localSecondary); @@ -2806,6 +2830,12 @@ void main() { .thenAnswer((invocation) => Future.value('data:[{"value":"3"}]')); when(() => mockAtClient.getPreferences()) .thenAnswer((_) => AtClientPreference()); + when(() => + mockAtClient.put(any(that: InitialSyncDoneFlagMatcher()), any())) + .thenAnswer((_) => Future.value(true)); + when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher()))) + .thenAnswer((invocation) => + throw AtKeyNotFoundException('key is not found in keystore')); when(() => mockRemoteSecondary.executeVerb( any(that: SyncVerbBuilderMatcher()), sync: any( @@ -2941,6 +2971,12 @@ void main() { any(that: ConflictKeyMatcher()), 'phone_key_remote_value')) .thenAnswer((_) => throw AtPublicKeyNotFoundException( 'Encryption public key not found')); + when(() => + mockAtClient.put(any(that: InitialSyncDoneFlagMatcher()), any())) + .thenAnswer((_) => Future.value(true)); + when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher()))) + .thenAnswer((invocation) => + throw AtKeyNotFoundException('key is not found in keystore')); await localSecondary.putValue( '@alice:conflict_phone_key.demo${TestResources.atsign}', @@ -3009,6 +3045,12 @@ void main() { when(() => mockAtClient.get(any(that: ConflictKeyMatcher()))).thenAnswer( (invocation) => throw KeyNotFoundException('key is not found in keystore')); + when(() => + mockAtClient.put(any(that: InitialSyncDoneFlagMatcher()), any())) + .thenAnswer((_) => Future.value(true)); + when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher()))) + .thenAnswer((invocation) => + throw AtKeyNotFoundException('key is not found in keystore')); await localSecondary.executeVerb(DeleteVerbBuilder() ..atKey = (AtKey() @@ -3206,6 +3248,12 @@ void main() { .get(any(that: LastReceivedServerCommitIdMatcher()))) .thenAnswer((invocation) => throw AtKeyNotFoundException('key is not found in keystore')); + when(() => mockAtClient.put( + any(that: InitialSyncDoneFlagMatcher()), any())) + .thenAnswer((_) => Future.value(true)); + when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher()))) + .thenAnswer((invocation) => + throw AtKeyNotFoundException('key is not found in keystore')); when(() => mockAtClient.getPreferences()) .thenAnswer((_) => AtClientPreference()); @@ -3366,6 +3414,12 @@ void main() { .get(any(that: LastReceivedServerCommitIdMatcher()))) .thenAnswer((invocation) => throw AtKeyNotFoundException('key is not found in keystore')); + when(() => mockAtClient.put( + any(that: InitialSyncDoneFlagMatcher()), any())) + .thenAnswer((_) => Future.value(true)); + when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher()))) + .thenAnswer((invocation) => + throw AtKeyNotFoundException('key is not found in keystore')); when(() => mockAtClient.getPreferences()) .thenAnswer((_) => AtClientPreference()); @@ -3461,6 +3515,12 @@ void main() { .get(any(that: LastReceivedServerCommitIdMatcher()))) .thenAnswer((invocation) => throw AtKeyNotFoundException('key is not found in keystore')); + when(() => mockAtClient.put( + any(that: InitialSyncDoneFlagMatcher()), any())) + .thenAnswer((_) => Future.value(true)); + when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher()))) + .thenAnswer((invocation) => + throw AtKeyNotFoundException('key is not found in keystore')); when(() => mockNotificationService.subscribe(regex: 'statsNotification')) .thenAnswer((_) => @@ -3660,6 +3720,12 @@ void main() { .get(any(that: LastReceivedServerCommitIdMatcher()))) .thenAnswer((invocation) => throw AtKeyNotFoundException('key is not found in keystore')); + when(() => mockAtClient.put( + any(that: InitialSyncDoneFlagMatcher()), any())) + .thenAnswer((_) => Future.value(true)); + when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher()))) + .thenAnswer((invocation) => + throw AtKeyNotFoundException('key is not found in keystore')); //-----------------preconditions setup----------------- await localSecondary.putValue( @@ -3807,6 +3873,12 @@ void main() { .get(any(that: LastReceivedServerCommitIdMatcher()))) .thenAnswer((invocation) => throw AtKeyNotFoundException('key is not found in keystore')); + when(() => mockAtClient.put( + any(that: InitialSyncDoneFlagMatcher()), any())) + .thenAnswer((_) => Future.value(true)); + when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher()))) + .thenAnswer((invocation) => + throw AtKeyNotFoundException('key is not found in keystore')); //------------------------ preconditions setup ------------------------ await localSecondary.putValue( @@ -3924,6 +3996,12 @@ void main() { 'A test to verify a listener is removed from sync progress call back', () async { //-------------------Setup------------------- + when(() => mockAtClient.put( + any(that: InitialSyncDoneFlagMatcher()), any())) + .thenAnswer((_) => Future.value(true)); + when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher()))) + .thenAnswer((invocation) => + throw AtKeyNotFoundException('key is not found in keystore')); var syncServiceImpl = await SyncServiceImpl.create(mockAtClient, atClientManager: mockAtClientManager, notificationService: mockNotificationService) as SyncServiceImpl; @@ -4407,6 +4485,21 @@ class LastReceivedServerCommitIdMatcher extends Matcher { } } +class InitialSyncDoneFlagMatcher extends Matcher { + @override + Description describe(Description description) { + return description; + } + + @override + bool matches(item, Map matchState) { + if (item is AtKey && item.key.startsWith('isinitialsyncdone')) { + return true; + } + return false; + } +} + class ConflictKeyMatcher extends Matcher { @override Description describe(Description description) { diff --git a/packages/at_client/test/sync_service_test.dart b/packages/at_client/test/sync_service_test.dart index 3bdec2daf..854f07c8e 100644 --- a/packages/at_client/test/sync_service_test.dart +++ b/packages/at_client/test/sync_service_test.dart @@ -114,6 +114,9 @@ void main() async { when(() => mockAtClient.put( any(that: LastReceivedServerCommitIdMatcher()), any())) .thenAnswer((_) => Future.value(true)); + when(() => + mockAtClient.put(any(that: InitialSyncDoneFlagMatcher()), any())) + .thenAnswer((_) => Future.value(true)); when(() => mockRemoteSecondary.executeVerb(any())) .thenAnswer((_) => Future.value('data:${jsonEncode([ { @@ -156,6 +159,9 @@ void main() async { mockAtClient.get(any(that: LastReceivedServerCommitIdMatcher()))) .thenAnswer((invocation) => throw AtKeyNotFoundException('key is not found in keystore')); + when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher()))) + .thenAnswer((invocation) => + throw AtKeyNotFoundException('key is not found in keystore')); var serverCommitId = 2; var syncRequest = SyncRequest()..result = SyncResult(); @@ -198,6 +204,9 @@ void main() async { mockAtClient.get(any(that: LastReceivedServerCommitIdMatcher()))) .thenAnswer((invocation) => throw AtKeyNotFoundException('key is not found in keystore')); + when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher()))) + .thenAnswer((invocation) => + throw AtKeyNotFoundException('key is not found in keystore')); // ignore: prefer_typing_uninitialized_variables var actualSyncException; @@ -285,6 +294,9 @@ void main() async { mockAtClient.get(any(that: LastReceivedServerCommitIdMatcher()))) .thenAnswer((invocation) => throw AtKeyNotFoundException('key is not found in keystore')); + when(() => mockAtClient.get(any(that: InitialSyncDoneFlagMatcher()))) + .thenAnswer((invocation) => + throw AtKeyNotFoundException('key is not found in keystore')); var serverCommitId = 2; var syncRequest = SyncRequest()..result = SyncResult(); @@ -327,3 +339,18 @@ class LastReceivedServerCommitIdMatcher extends Matcher { return false; } } + +class InitialSyncDoneFlagMatcher extends Matcher { + @override + Description describe(Description description) { + return description; + } + + @override + bool matches(item, Map matchState) { + if (item is AtKey && item.key.startsWith('isinitialsyncdone')) { + return true; + } + return false; + } +}